egistryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
调度中心
// RPC 注册服务
AdminBizImpl.java->registry();
数据库
调度中心调用执行器
/* 调度中心执行步骤 */
// 1. 调用执行器
XxlJobTrigger.java->runExecutor();
// 2. 获取执行器
XxlJobScheduler.java->getExecutorBiz();
// 3. 调用
ExecutorBizImpl.java->run();
/* 执行器执行步骤 */
// 1. 执行器接口
ExecutorBiz.java->run();
// 2. 执行器实现
ExecutorBizImpl.java->run();
// 3. 把jobInfo 从 jobThreadRepository (ConcurrentMap) 中获取一个新线程,并开启新线程
XxlJobExecutor.java->registJobThread();
// 4. 保存到当前线程队列
JobThread.java->pushTriggerQueue();
// 5. 执行
JobThread.java->handler.execute(triggerParam.getExecutorParams());
调度中心(Admin)
实现 org.springframework.beans.factory.InitializingBean类,重写 afterPropertiesSet 方法,在初始化bean的时候都会执行该方法
DisposableBean spring停止时执行
结束加载项
- 停止定时任务调度器(中断scheduleThread,中断ringThread)
- 停止触发线程池(JobTriggerPoolHelper)
- 停止注册监控器(registryThread)
- 停止失败日志监控器(monitorThread)
- 停止RPC服务(stopRpcProvider)
手动执行方式
JobInfoController.java
@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam) {
// force cover job param
if (executorParam == null) {
executorParam = "";
}
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam);
return ReturnT.SUCCESS;
}
定时调度策略
调度策略执行图
调度策略源码
JobScheduleHelper.java->start();
路由策略
第一个
固定选择第一个机器
ExecutorRouteFirst.java->route();
最后一个
固定选择最后一个机器
ExecutorRouteLast.java->route();
轮询
随机选择在线的机器
ExecutorRouteRound.java->route();
private static int count(int jobId) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// count++
Integer count = routeCountEachJob.get(jobId);
count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次,缓解首次压力
routeCountEachJob.put(jobId, count);
return count;
}
随机
随机获取地址列表中的一个
ExecutorRouteRandom.java->route();
一致性HASH
一个job通过hash算法固定使用一台机器,且所有任务均匀散列在不同机器
ExecutorRouteConsistentHash.java->route();
public String hashJob(int jobId, List<String> addressList) {
// ------A1------A2-------A3------
// -----------J1------------------
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
long jobHash = hash(String.valueOf(jobId));
// 取出键值 >= jobHash
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
最不经常使用
使用频率最低的机器优先被选举
把地址列表加入到内存中,等下次执行时剔除无效的地址,判断地址列表中执行次数