根本参数:
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-adminxxl.job.accessToken=default_tokenxxl.job.executor.appname=xxl-job-executor-samplexxl.job.executor.address=xxl.job.executor.ip=xxl.job.executor.port=9999xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandlerxxl.job.executor.logretentiondays=30
拓展参数,非必填
xxl.job.i18n=zh_CNxxl.job.triggerpool.fast.max=200xxl.job.triggerpool.slow.max=100
3 代码创建实行函数任务开拓:在Spring Bean实例中,开拓Job方法;表明配置:为Job方法添加表明 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",表明value值对应的是调度中央新建任务的JobHandler属性的值。实行日志:须要通过 "XxlJobHelper.log" 打印实行日志;任务结果:默认任务结果为 "成功" 状态,不须要主动设置;如有诉求,比如设置任务结果为失落败,可以通过:"XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
@XxlJob(“demoJobHandler”)public void demoJobHandler() throws Exception { XxlJobHelper.log(“XXL-JOB, Hello World.”); for (int i = 0; i < 5; i++) { XxlJobHelper.log(“beat at:” + i); TimeUnit.SECONDS.sleep(2); } // default success}
4 客户端配置参数解释:
二、做事端配置流程1 实行器管理
AppName:实行组Name,Name相同的实行器视为同一个实行组名称:实行组中文名。注册办法:自动注册:IP地址由实行器上报,常日这样利用;手动注册:手动输入实行器地址IP,不建议利用。2 任务管理
根本配置:
实行器:任务的绑定的实行器,任务触发调度时将会自动创造注册成功的实行器, 实现任务自动创造功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个实行器, 可在 "实行器管理" 进行设置;任务描述:任务的描述信息,便于任务管理;卖力人:任务的卖力人;报警邮件:任务调度失落败时邮件关照的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔。
触发配置:
CRON:触发任务实行的Cron表达式;固定速率:固件速率的韶光间隔,单位为秒;固定延迟:固件延迟的韶光间隔,单位为秒。
任务配置:
运行模式:
(1)BEAN模式:任务以JobHandler办法掩护在实行器端;须要结合 "JobHandler" 属性匹配实行器中任务;(2)GLUE模式(Java):任务以源码办法掩护在调度中央;该模式的任务实际上是一段继续自IJobHandler的Java类代码并 "groovy" 源码办法掩护,它在实行器项目中运行,可利用@Resource/@Autowire注入实行器里中的其他做事;(3)GLUE模式(Shell):任务以源码办法掩护在调度中央;该模式的任务实际上是一段 "shell" 脚本;(4)GLUE模式(Python):任务以源码办法掩护在调度中央;该模式的任务实际上是一段 "python" 脚本;(5)GLUE模式(PHP):任务以源码办法掩护在调度中央;该模式的任务实际上是一段 "php" 脚本;(6)GLUE模式(NodeJS):任务以源码办法掩护在调度中央;该模式的任务实际上是一段 "nodejs" 脚本;(7)GLUE模式(PowerShell):任务以源码办法掩护在调度中央;该模式的任务实际上是一段 "PowerShell" 脚本;
高等配置
路由策略:当实行器集群支配时,供应丰富的路由策略,包括;
(1)FIRST(第一个):固定选择第一个机器;(2)LAST(末了一个):固定选择末了一个机器;(3)ROUND(轮询):;(4)RANDOM(随机):随机选择在线的机器;(5)CONSISTENT\_HASH(同等性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。(6)LEAST\_FREQUENTLY\_USED(最不常常利用):利用频率最低的机器优先当选举;(7)LEAST\_RECENTLY\_USED(最近最久未利用):最久未利用的机器优先当选举;(8)FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标实行器并发起调度;(9)BUSYOVER(劳碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标实行器并发起调度;(10)SHARDING\_BROADCAST(分片广播):广播触发对应集群中所有机器实行一次任务,同时系统自动通报分片参数;可根据分片参数开拓分片任务;
(1)忽略:调度过期后,忽略过期的任务,从当前韶光开始重新打算下次触发韶光;(2)立即实行一次:调度过期后,立即实行一次,并从当前韶光开始重新打算下次触发韶光;
(1)单机串行(默认):调度要求进入单机实行器后,调度要求进入FIFO行列步队并以串行办法运行;(2)丢弃后续调度:调度要求进入单机实行器后,创造实行器存在运行的调度任务,本次要求将会被丢弃并标记为失落败;(3)覆盖之前调度:调度要求进入单机实行器后,创造实行器存在运行的调度任务,将会终止运行中的调度任务并清空行列步队,然后运行本地调度任务;
上面我们讲解了客户端接入流程、做事端配置流程和路由策略参数详解,接下来我们讲一下《xxj-job做事端架构流程》
四、xxl-job的调度流程任务调度器和实行器利用http协议通信,各自有轮询线程处理不同业务。
五、xxl-job的调度中央详解1 XXL-JOB的启动和销毁逻辑:如代码可见,xxl-job调度中央的启动和销毁,核心是处理几个线程池的创建和销毁。对每一个业务线程池,后续有详细讲解。
public class XxlJobScheduler { private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class); public void init() throws Exception { // init i18n initI18n(); // admin trigger pool start JobTriggerPoolHelper.toStart(); // admin registry monitor run JobRegistryHelper.getInstance().start(); // admin fail-monitor run JobFailMonitorHelper.getInstance().start(); // admin lose-monitor run ( depend on JobTriggerPoolHelper ) JobCompleteHelper.getInstance().start(); // admin log report start JobLogReportHelper.getInstance().start(); // start-schedule ( depend on JobTriggerPoolHelper ) JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); } public void destroy() throws Exception { // stop-schedule JobScheduleHelper.getInstance().toStop(); // admin log report stop JobLogReportHelper.getInstance().toStop(); // admin lose-monitor stop JobCompleteHelper.getInstance().toStop(); // admin fail-monitor stop JobFailMonitorHelper.getInstance().toStop(); // admin registry stop JobRegistryHelper.getInstance().toStop(); // admin trigger pool stop JobTriggerPoolHelper.toStop(); }}
2 任务触发线程池任务触发线程池:JobTriggerPoolHelper.toStart();
启动两个实行任务的线程池,常日任务在fastTriggerPool,统计一分钟内超时10次的任务,对超时任务再实行放进slowTriggerPool。
// job-timeout 10 times in 1 min
public class JobTriggerPoolHelper { private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); // ---------------------- trigger pool --------------------- // fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null; public void start(){ fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } }); }}
3 实行器管理线程实行器管理线程:
JobRegistryHelper.getInstance().start();
担保任务实行的时候拿到的实行器列表都是运行状态的
启动一个守护线程;每隔三十秒,查询一次数据库 注册表 中自动注册的实行器;删除超过90秒未再次注册(心跳)的实行器;将实行器注册信息加载到内存Map中;更新注册上了的实行器地址信息到 任务实行表 中。
public void start(){ // for monitor registryMonitorThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { // auto registry group List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0); if (groupList!=null && !groupList.isEmpty()) { // remove dead address (admin/executor) List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date()); if (ids!=null && ids.size()>0) { XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids); } // fresh online address (admin/executor) HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>(); List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date()); if (list != null) { for (XxlJobRegistry item: list) { if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) { String appname = item.getRegistryKey(); List<String> registryList = appAddressMap.get(appname); if (!registryList.contains(item.getRegistryValue())) { registryList.add(item.getRegistryValue()); } appAddressMap.put(appname, registryList); } } } // fresh group address for (XxlJobGroup group: groupList) { XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); } } } logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); } }); registryMonitorThread.setDaemon(true); registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread"); registryMonitorThread.start(); }}
4 失落败任务管理线程失落败任务管理线程:JobFailMonitorHelper.getInstance().start();
管理实行失落败的任务,重试或者发送告警
每隔10s查询实行失落败的任务;如果设置重试次数,就进行重试操作;如未设置重试次数,或已经重试超过重试次数,就发送告警信息(邮件或短信等)。5 完成任务管理线程
完成任务管理线程:JobCompleteHelper.getInstance().start();
管理超时任务,或者实行器宕机的任务,做轮询补偿。
每隔1min查询查询状态未结束的任务;如果距任务开始韶光10min 并且 注册实行器不在线,那么就标记任务实行结束。6 日志管理线程
日志管理线程:JobLogReportHelper.getInstance().start();
统计任务实行成功率,删除过期日志。
每隔 1min 实行一次;按天统计总任务数,成功和失落败的个数,可通过 xxl.job.logretentiondays 配置天数 默认30天。7 任务实行调度线程
任务实行调度线程:
JobScheduleHelper.getInstance().start();
scheduleThread:任务查询并打算实行韶光线程
每一秒 查询数据库中实行韶光在 当前韶光 至 (当前韶光 + 5s)区间的任务;根据CronHelp类打算出下次实行韶光;将任务的下次实行韶光写入数据库;加载这次实行任务Id到缓存中。
利用ConcurrentHashMap缓存,Key是分钟内的秒数(0-59),Value是任务Id组成的数组
{ "1":[ 251, 172 ], "2":[ 643, 172 ], "39":[ 273 ], "59":[ 188, 175 ]}
ringThread: 任务实行线程
每一秒轮询一次,查找当前秒的任务Id ;根据任务Id,查出任务详情,并投递到发送线程池;发送线程池查询到实行器地址列表,根据配置的发送策略,通过http要求发送到实行器。
发送策略:(对应页面的路由策略)
六、附录
同等性哈希算法详解
private static int VIRTUAL_NODE_NUM = 100;public String hashJob(int jobId, List<String> addressList) { // ------A1------A2-------A3------ // -----------J1------------------ // Address的hashCode为Key,address本身为Value; TreeMap<Long, String> addressRing = new TreeMap<Long, String>(); for (String address: addressList) { // 对Address进行 100 次取模,每次对Key+1, for (int i = 0; i < VIRTUAL_NODE_NUM; i++) { long addressHash = hash("SHARD-" + address + "-NODE-" + i); addressRing.put(addressHash, address); } } // 对任务Id取模,以Hash树最近的Address作为选定的 long jobHash = hash(String.valueOf(jobId)); SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash); return addressRing.firstEntry().getValue();}
程序员的核心竞争力实在还是技能,因此对技能还是要不断的学习,关注 “IT顶峰技能” "大众号 ,该"大众年夜众号内容定位:中高等开拓、架构师、中层管理职员等中高端岗位做事的,除了技能互换外还有很多架构思想和实战案例。
作者是 《 中间件 RocketMQ 技能底细》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构卖力人,紧张卖力开拓框架的搭建、中间件干系技能的二次开拓和运维管理、稠浊云及根本做事平台的培植。