本文共 21175 字,大约阅读时间需要 70 分钟。
目的
利用zookeeper的特性,来控制quartz实现分布式调度,保证quartz的单点运行,同时解除quartz自身分布式部署对数据库的依赖,保证同一时刻只有一个quartz应用在执行任务。
实现方式
利用zk的分布式独占锁,控制quartz应用执行节点,让拿到独占锁的quartz应用执行调度,没有拿到独占锁的quartz处理等待状态。
类图
核心代码
public class TriggerBean { /** * 标识 */ private String key; /** * 所属组 */ private String group; /** * 描述 */ private String description; /** * 启动时间 */ private String startTime; /** * 结束时间 */ private String endTime; /** * 优先级 */ private Integer priority; /** * 日历名称 */ private String calendarName; /** * 失火指令(参数0,1,2) * MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1 * MISFIRE_INSTRUCTION_SMART_POLICY = 0 (默认) * MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1 * MISFIRE_INSTRUCTION_DO_NOTHING = 2 */ private Integer misfireInstruction; /** * 任务代理类 */ private JobDetailProxyBean jobDetail; public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getEndTime() { return endTime; } public void setEndTime(String endTime) { this.endTime = endTime; } public Integer getPriority() { return priority; } public void setPriority(Integer priority) { this.priority = priority; } public String getCalendarName() { return calendarName; } public void setCalendarName(String calendarName) { this.calendarName = calendarName; } public Integer getMisfireInstruction() { return misfireInstruction; } public void setMisfireInstruction(Integer misfireInstruction) { this.misfireInstruction = misfireInstruction; } public JobDetailProxyBean getJobDetail() { return jobDetail; } public void setJobDetail(JobDetailProxyBean jobDetail) { this.jobDetail = jobDetail; }}
public class CronTriggerBean extends TriggerBean { /** * CRON表达式 */ private String cronExpression; public String getCronExpression() { return cronExpression; } public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; }}
public class SimpleTriggerBean extends TriggerBean { /** * 时间间隔(秒) */ private Integer interval; /** * 重复次数(默认:-1为无限循环) */ private Integer repeatCount; public Integer getInterval() { return interval; } public void setInterval(Integer interval) { this.interval = interval; } public Integer getRepeatCount() { return repeatCount; } public void setRepeatCount(Integer repeatCount) { this.repeatCount = repeatCount; }}
public class SchedulerFactoryBean implements InitializingBean { protected static Logger logger = Logger.getLogger(SchedulerFactoryBean.class); /** * 触发器列表 */ private List
package com.ab.scheduling.quartz;import com.ab.scheduling.quartz.constant.Constant;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.framework.state.ConnectionStateListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.log4j.Logger;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.ZooDefs;import org.springframework.beans.factory.InitializingBean;import java.util.Collections;import java.util.List;/** * Zookeeper 工厂类 * Date: 14-4-2 * Time: 下午4:03 */public class ZookeeperFactory implements InitializingBean{ public static Logger logger = Logger.getLogger(ZookeeperFactory.class); /** * zookeeper服务地址 */ private String hosts; /** * 回话的超时时间(毫秒) */ private Integer sessionTimeOut; /** * 连接的超时时间(毫秒) */ private Integer connectionTimeOut; /** * 命名空间 **/ private String nameSpace; /** * zookeeper管理对象 */ private CuratorFramework zkTools; /** * 独享队列节点 */ private String monopolyQueueNode; /** * 连接状态 */ private String connectionState; /** * 会话ID */ private long sessionId; /** * Spring初始化方法 */ public void afterPropertiesSet(){ this.connection(); this.addListener(); } /** * 连接 */ public void connection(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, Integer.MAX_VALUE); zkTools = CuratorFrameworkFactory .builder() .connectString(hosts) .namespace(nameSpace) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeOut == null ? 30000 : connectionTimeOut) .sessionTimeoutMs(sessionTimeOut == null ? 30000 : sessionTimeOut) .build(); zkTools.start(); } /** * 连接状态监听 */ public void addListener(){ zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState.equals(ConnectionState.CONNECTED)) { logger.info("连接"); connectionState = "CONNECTED"; try { sessionId = zkTools.getZookeeperClient().getZooKeeper().getSessionId(); registerMonopolyQueue(); } catch (Exception e) { logger.error("注册独占队列失败"); } } if (newState.equals(ConnectionState.RECONNECTED)) { logger.info("重新连接"); connectionState = "CONNECTED"; try { if(sessionId != zkTools.getZookeeperClient().getZooKeeper().getSessionId()) { registerMonopolyQueue(); } } catch (Exception e) { logger.error("注册独占队列失败"); } } if (newState.equals(ConnectionState.LOST)) { logger.info("丢失"); connectionState = "LOST"; } if (newState.equals(ConnectionState.SUSPENDED)) { logger.info("暂停"); connectionState = "SUSPENDED"; } if (newState.equals(ConnectionState.READ_ONLY)) { logger.info("只读"); connectionState = "READ_ONLY"; } } }); } /** * 注册独占队列 */ private void registerMonopolyQueue() throws Exception { if(zkTools.checkExists().watched().forPath(Constant.MONOPOLY) == null){ zkTools.create() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(Constant.MONOPOLY); logger.info("创建独享锁队列节点成功!"); } if(monopolyQueueNode == null || (monopolyQueueNode != null && zkTools.checkExists().forPath(monopolyQueueNode)==null)) { monopolyQueueNode = zkTools.create() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(Constant.MONOPOLY + Constant.SEPARATOR + Constant.QUEUE_NODE); logger.info("成功加入独享锁队列"); } } /** * 获得独占锁的执行权限 * @return 执行权限标识 * @throws KeeperException * @throws InterruptedException */ public boolean getMonopolyLock() throws Exception { boolean flag = false; if(connectionState != null && (connectionState.equals("CONNECTED") || connectionState.equals("RECONNECTED"))){ Listnodes = zkTools.getChildren().watched().forPath(Constant.MONOPOLY); if(nodes.size() > 0){ Collections.sort(nodes); //判断当前应用是否在队列的第一位 if((Constant.SEPARATOR + Constant.MONOPOLY + Constant.SEPARATOR + nodes.get(0)).equals(monopolyQueueNode)){ flag = true; } } } return flag; } /** * 关闭连接 */ public void close(){ if(zkTools != null){ zkTools.close(); zkTools = null; } } public String getHosts() { return hosts; } public void setHosts(String hosts) { this.hosts = hosts; } public Integer getSessionTimeOut() { return sessionTimeOut; } public void setSessionTimeOut(Integer sessionTimeOut) { this.sessionTimeOut = sessionTimeOut; } public Integer getConnectionTimeOut() { return connectionTimeOut; } public void setConnectionTimeOut(Integer connectionTimeOut) { this.connectionTimeOut = connectionTimeOut; } public String getNameSpace() { return nameSpace; } public void setNameSpace(String nameSpace) { this.nameSpace = nameSpace; }}
package com.ab.scheduling.quartz.common;import com.ab.scheduling.quartz.JobDetailProxyBean;import com.ab.scheduling.quartz.CronTriggerBean;import com.ab.scheduling.quartz.SimpleTriggerBean;import org.apache.commons.lang.StringUtils;import org.apache.log4j.Logger;import org.quartz.*;import org.quartz.impl.StdSchedulerFactory;import org.quartz.simpl.SimpleThreadPool;import java.util.Properties;/** * Quartz调度工具类 * Date: 14-5-15 * Time: 下午6:10 */public class SchedulerUtils { protected static Logger logger = Logger.getLogger(SchedulerUtils.class); /** * 初始化StdSchedulerFactory * @return StdSchedulerFactory */ public static StdSchedulerFactory initStdSchedulerFactory() { StdSchedulerFactory schedulerFactory = null; try{ schedulerFactory = (StdSchedulerFactory) Class.forName(StdSchedulerFactory.class.getName()).newInstance(); Properties mergedProps = new Properties(); // 设置Quartz线程池设置 mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName()); mergedProps.setProperty("org.quartz.threadPool.threadCount", Integer.toString(10)); schedulerFactory.initialize(mergedProps); } catch (Exception e){ logger.error("初始化StdSchedulerFactory失败"); logger.error(e); } return schedulerFactory; } /** * 装配任务 * @param jobDetail 任务代理类 * @return JobDetail */ public static JobDetail assemblyJobDetail(JobDetailProxyBean jobDetail){ JobBuilder jobBuilder = JobBuilder.newJob(jobDetail.getClass()); //设置JobDetail身份标识与所属组 String key = jobDetail.getKey(); if(StringUtils.isNotBlank(key)){ jobBuilder = jobBuilder.withIdentity(key, jobDetail.getGroup()); }else{ jobBuilder = jobBuilder.withIdentity(IdentityUtils.generatorUUID("JOB"), jobDetail.getGroup()); } //设置任务描述 if(StringUtils.isNotBlank(jobDetail.getDescription())){ jobBuilder = jobBuilder.withDescription(jobDetail.getDescription()); } //设置JobDetail数据参数 JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put("targetObject",jobDetail.getTargetObject()); //目标对象 jobDataMap.put("targetMethod",jobDetail.getTargetMethod()); //目标方法 jobDataMap.put("mode", jobDetail.getMode()); //运行模式 jobBuilder = jobBuilder.usingJobData(jobDataMap); return jobBuilder.build(); } /** * 装配表达式触发器 * @param cronTriggerBean 表达式触发器 * @return 表达式触发器 */ public static CronTrigger assemblyCronTrigger(CronTriggerBean cronTriggerBean){ TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger(); //设置触发器身份标识与所属组 String key = cronTriggerBean.getKey(); if(StringUtils.isNotBlank(key)){ triggerBuilder = triggerBuilder.withIdentity(key, cronTriggerBean.getGroup()); }else{ triggerBuilder = triggerBuilder.withIdentity(IdentityUtils.generatorUUID("CronTrigger"), cronTriggerBean.getGroup()); } //设置描述 if(StringUtils.isNotBlank(cronTriggerBean.getDescription())){ triggerBuilder = triggerBuilder.withDescription(cronTriggerBean.getDescription()); } //设置启动时间 if(StringUtils.isNotBlank(cronTriggerBean.getStartTime())){ triggerBuilder = triggerBuilder.startAt(DateUtils.StringToDate(cronTriggerBean.getStartTime(), "yyyy-MM-dd HH:mm:ss")); }else{ triggerBuilder = triggerBuilder.startNow(); //当启动时间为空默认立即启动调度器 } //设置结束时间 if(StringUtils.isNotBlank(cronTriggerBean.getEndTime())){ triggerBuilder = triggerBuilder.endAt(DateUtils.StringToDate(cronTriggerBean.getEndTime(), "yyyy-MM-dd HH:mm:ss")); } //设置优先级 if(cronTriggerBean.getPriority() != null){ triggerBuilder = triggerBuilder.withPriority(cronTriggerBean.getPriority()); } //设置Cron表达式(不允许为空)与集火指令 CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronTriggerBean.getCronExpression()); if(cronTriggerBean.getMisfireInstruction() != null){ if(cronTriggerBean.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) { cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires(); } if(cronTriggerBean.getMisfireInstruction() == CronTrigger.MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) { cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionFireAndProceed(); } if(cronTriggerBean.getMisfireInstruction() == CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING) { cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionDoNothing(); } } triggerBuilder = triggerBuilder.withSchedule(cronScheduleBuilder); return (CronTrigger)triggerBuilder.build(); } /** * 装配简单触发器 * @param simpleTriggerBean 简单触发器 * @return 简单触发器 */ public static SimpleTrigger assemblySimpleTrigger(SimpleTriggerBean simpleTriggerBean){ TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger(); //设置触发器身份标识与所属组 String key = simpleTriggerBean.getKey(); if(StringUtils.isNotBlank(key)){ triggerBuilder = triggerBuilder.withIdentity(key, simpleTriggerBean.getGroup()); }else{ triggerBuilder = triggerBuilder.withIdentity(IdentityUtils.generatorUUID("SimpleTrigger"), simpleTriggerBean.getGroup()); } //设置描述 if(StringUtils.isNotBlank(simpleTriggerBean.getDescription())){ triggerBuilder = triggerBuilder.withDescription(simpleTriggerBean.getDescription()); } //设置启动时间 if(StringUtils.isNotBlank(simpleTriggerBean.getStartTime())){ triggerBuilder = triggerBuilder.startAt(DateUtils.StringToDate(simpleTriggerBean.getStartTime(), "yyyy-MM-dd HH:mm:ss")); }else{ triggerBuilder = triggerBuilder.startNow(); //当启动时间为空默认立即启动调度器 } //设置结束时间 if(StringUtils.isNotBlank(simpleTriggerBean.getEndTime())){ triggerBuilder = triggerBuilder.endAt(DateUtils.StringToDate(simpleTriggerBean.getEndTime(), "yyyy-MM-dd HH:mm:ss")); } //设置优先级 if(simpleTriggerBean.getPriority() != null){ triggerBuilder = triggerBuilder.withPriority(simpleTriggerBean.getPriority()); } //设置简单触发器 时间间隔(不允许为空)、执行次数(默认为-1)与集火指令 SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(20).withRepeatCount(-1); simpleScheduleBuilder = simpleScheduleBuilder.withIntervalInSeconds(simpleTriggerBean.getInterval()); if(simpleTriggerBean.getRepeatCount() != null){ simpleScheduleBuilder = simpleScheduleBuilder.withRepeatCount(simpleTriggerBean.getRepeatCount()); }else{ simpleScheduleBuilder = simpleScheduleBuilder.withRepeatCount(-1); } if(simpleTriggerBean.getMisfireInstruction() != null){ if(simpleTriggerBean.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) { simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires(); } if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW) { simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionFireNow(); } if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT) { simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNextWithExistingCount(); } if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) { simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNextWithRemainingCount(); } if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT) { simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNowWithExistingCount(); } if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT) { simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNowWithRemainingCount(); } } triggerBuilder = triggerBuilder.withSchedule(simpleScheduleBuilder); return (SimpleTrigger)triggerBuilder.build(); }}
spring配置
往期精彩回顾
对CSDNJava架构技术,对架构技术感兴趣的同学,欢迎加QQ群,一起学习,相互讨论。
群内已经有小伙伴将知识体系整理好(源码,笔记,PPT,学习视频),欢迎?加群免费领取 分享给喜欢Java,喜欢编程,有梦想成为架构师的程序员们,希望能够帮助到你们。
转载地址:http://vvaai.baihongyu.com/