设为首页 加入收藏

TOP

xxl-job源码分析(二)
2019-09-20 11:45:59 】 浏览:119
Tags:xxl-job 源码 分析
egistryParam, registryResult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); } }

调度中心

// RPC 注册服务
AdminBizImpl.java->registry();

数据库

调度中心调用执行器

/* 调度中心执行步骤 */
// 1. 调用执行器
XxlJobTrigger.java->runExecutor();

// 2. 获取执行器
XxlJobScheduler.java->getExecutorBiz();

// 3. 调用
ExecutorBizImpl.java->run();

/* 执行器执行步骤 */
// 1. 执行器接口
ExecutorBiz.java->run();

// 2. 执行器实现
ExecutorBizImpl.java->run();

// 3. 把jobInfo 从 jobThreadRepository (ConcurrentMap) 中获取一个新线程,并开启新线程
XxlJobExecutor.java->registJobThread();

// 4. 保存到当前线程队列
JobThread.java->pushTriggerQueue();

// 5. 执行
JobThread.java->handler.execute(triggerParam.getExecutorParams());

调度中心(Admin)

实现 org.springframework.beans.factory.InitializingBean类,重写 afterPropertiesSet 方法,在初始化bean的时候都会执行该方法

DisposableBean spring停止时执行

结束加载项

  1. 停止定时任务调度器(中断scheduleThread,中断ringThread)
  2. 停止触发线程池(JobTriggerPoolHelper)
  3. 停止注册监控器(registryThread)
  4. 停止失败日志监控器(monitorThread)
  5. 停止RPC服务(stopRpcProvider)

手动执行方式

JobInfoController.java

@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam) {
    // force cover job param
    if (executorParam == null) {
        executorParam = "";
    }

    JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam);
    return ReturnT.SUCCESS;
}

定时调度策略

调度策略执行图

调度策略源码

JobScheduleHelper.java->start();

路由策略

第一个

固定选择第一个机器

ExecutorRouteFirst.java->route();
最后一个

固定选择最后一个机器

ExecutorRouteLast.java->route();
轮询

随机选择在线的机器

ExecutorRouteRound.java->route();

private static int count(int jobId) {
    // cache clear
    if (System.currentTimeMillis() > CACHE_VALID_TIME) {
        routeCountEachJob.clear();
        CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
    }

    // count++
    Integer count = routeCountEachJob.get(jobId);
    count = (count==null || count>1000000)?(new Random().nextInt(100)):++count;  // 初始化时主动Random一次,缓解首次压力
    routeCountEachJob.put(jobId, count);
    return count;
}
随机

随机获取地址列表中的一个

ExecutorRouteRandom.java->route();
一致性HASH

一个job通过hash算法固定使用一台机器,且所有任务均匀散列在不同机器

ExecutorRouteConsistentHash.java->route();

public String hashJob(int jobId, List<String> addressList) {

    // ------A1------A2-------A3------
    // -----------J1------------------
    TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
    for (String address: addressList) {
        for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
            long addressHash = hash("SHARD-" + address + "-NODE-" + i);
            addressRing.put(addressHash, address);
        }
    }

    long jobHash = hash(String.valueOf(jobId));
    // 取出键值 >= jobHash
    SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
    if (!lastRing.isEmpty()) {
        return lastRing.get(lastRing.firstKey());
    }
    return addressRing.firstEntry().getValue();
}
最不经常使用

使用频率最低的机器优先被选举
把地址列表加入到内存中,等下次执行时剔除无效的地址,判断地址列表中执行次数

首页 上一页 1 2 3 4 5 下一页 尾页 2/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇结对编程:中小学自动试卷生成程.. 下一篇设计模式-抽象工厂

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目