博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于zookeeper和quartz实现分布式定时调度
阅读量:4178 次
发布时间:2019-05-26

本文共 21175 字,大约阅读时间需要 70 分钟。

目的

利用zookeeper的特性,来控制quartz实现分布式调度,保证quartz的单点运行,同时解除quartz自身分布式部署对数据库的依赖,保证同一时刻只有一个quartz应用在执行任务。

 

实现方式

利用zk的分布式独占锁,控制quartz应用执行节点,让拿到独占锁的quartz应用执行调度,没有拿到独占锁的quartz处理等待状态。

 

类图

640?wx_fmt=png

 

核心代码

 

public class TriggerBean {    /**     * 标识     */    private String key;    /**     * 所属组     */    private String group;    /**     * 描述     */    private String description;    /**     * 启动时间     */    private String startTime;    /**     * 结束时间     */    private String endTime;    /**     * 优先级     */    private Integer priority;    /**     * 日历名称     */    private String calendarName;    /**     * 失火指令(参数0,1,2)     * MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1     * MISFIRE_INSTRUCTION_SMART_POLICY = 0 (默认)     * MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1     * MISFIRE_INSTRUCTION_DO_NOTHING = 2     */    private Integer misfireInstruction;    /**     * 任务代理类     */    private JobDetailProxyBean jobDetail;    public String getKey() {        return key;    }    public void setKey(String key) {        this.key = key;    }    public String getGroup() {        return group;    }    public void setGroup(String group) {        this.group = group;    }    public String getDescription() {        return description;    }    public void setDescription(String description) {        this.description = description;    }    public String getStartTime() {        return startTime;    }    public void setStartTime(String startTime) {        this.startTime = startTime;    }    public String getEndTime() {        return endTime;    }    public void setEndTime(String endTime) {        this.endTime = endTime;    }    public Integer getPriority() {        return priority;    }    public void setPriority(Integer priority) {        this.priority = priority;    }    public String getCalendarName() {        return calendarName;    }    public void setCalendarName(String calendarName) {        this.calendarName = calendarName;    }    public Integer getMisfireInstruction() {        return misfireInstruction;    }    public void setMisfireInstruction(Integer misfireInstruction) {        this.misfireInstruction = misfireInstruction;    }    public JobDetailProxyBean getJobDetail() {        return jobDetail;    }    public void setJobDetail(JobDetailProxyBean jobDetail) {        this.jobDetail = jobDetail;    }}

 

public class CronTriggerBean extends TriggerBean {    /**     * CRON表达式     */    private String cronExpression;    public String getCronExpression() {        return cronExpression;    }    public void setCronExpression(String cronExpression) {        this.cronExpression = cronExpression;    }}

 

public class SimpleTriggerBean extends TriggerBean {
    /**     * 时间间隔(秒)     */    private Integer interval;    /**     * 重复次数(默认:-1为无限循环)     */    private Integer repeatCount;    public Integer getInterval() {        return interval;    }    public void setInterval(Integer interval) {        this.interval = interval;    }    public Integer getRepeatCount() {        return repeatCount;    }    public void setRepeatCount(Integer repeatCount) {        this.repeatCount = repeatCount;    }}

 

public class SchedulerFactoryBean implements InitializingBean {    protected static Logger logger = Logger.getLogger(SchedulerFactoryBean.class);    /**     * 触发器列表     */    private List triggers;    /**     * zooKeeper工厂     */    private ZookeeperFactory zooKeeperFactory;    /**     * Spring初始化方法     * @throws SchedulerException     */    public void afterPropertiesSet() throws SchedulerException {        this.initSchedulerFactory();    }    /**     * 初始化调度器工厂     * @throws SchedulerException     */    public void initSchedulerFactory() throws SchedulerException {        //初始化StdSchedulerFactory        StdSchedulerFactory schedulerFactory = SchedulerUtils.initStdSchedulerFactory();        //获取调度器        Scheduler scheduler = schedulerFactory.getScheduler();        //装载调度器        for(Object triggerObject : this.getTriggers()){            if(triggerObject instanceof CronTriggerBean){                CronTriggerBean cronTriggerBean = (CronTriggerBean)triggerObject;                //获取任务代理类对象                JobDetailProxyBean jobDetailProxyBean = cronTriggerBean.getJobDetail();                //装配任务                JobDetail jobDetail = SchedulerUtils.assemblyJobDetail(jobDetailProxyBean);                //设置zooKeeper连接工厂                jobDetail.getJobDataMap().put("zooKeeperFactory",this.getZooKeeperFactory());                //装配触发器                CronTrigger cronTrigger =  SchedulerUtils.assemblyCronTrigger(cronTriggerBean);                scheduler.scheduleJob(jobDetail, cronTrigger);//                System.out.println("CronTriggerBean");            }else{                SimpleTriggerBean simpleTriggerBean = (SimpleTriggerBean)triggerObject;                //获取任务代理类对象                JobDetailProxyBean jobDetailProxyBean = simpleTriggerBean.getJobDetail();                //装配任务                JobDetail jobDetail = SchedulerUtils.assemblyJobDetail(jobDetailProxyBean);                //设置zooKeeper连接工厂                jobDetail.getJobDataMap().put("zooKeeperFactory",this.getZooKeeperFactory());                //装配触发器                SimpleTrigger simpleTrigger =  SchedulerUtils.assemblySimpleTrigger(simpleTriggerBean);                scheduler.scheduleJob(jobDetail, simpleTrigger);//                System.out.println("SimpleTriggerBean");            }        }        scheduler.start();        logger.info("调度器已启动");    }    public List getTriggers() {        return triggers;    }    public void setTriggers(List triggers) {        this.triggers = triggers;    }    public ZookeeperFactory getZooKeeperFactory() {        return zooKeeperFactory;    }    public void setZooKeeperFactory(ZookeeperFactory zooKeeperFactory) {        this.zooKeeperFactory = zooKeeperFactory;    }}
package com.ab.scheduling.quartz;import com.ab.scheduling.quartz.constant.Constant;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.framework.state.ConnectionStateListener;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.log4j.Logger;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.ZooDefs;import org.springframework.beans.factory.InitializingBean;import java.util.Collections;import java.util.List;/** * Zookeeper 工厂类 * Date: 14-4-2 * Time: 下午4:03 */public class ZookeeperFactory implements InitializingBean{    public static Logger logger = Logger.getLogger(ZookeeperFactory.class);    /**     * zookeeper服务地址     */    private String hosts;    /**     * 回话的超时时间(毫秒)     */    private Integer sessionTimeOut;    /**     * 连接的超时时间(毫秒)     */    private Integer connectionTimeOut;    /**     * 命名空间     **/    private String nameSpace;    /**     * zookeeper管理对象     */    private CuratorFramework zkTools;    /**     * 独享队列节点     */    private String monopolyQueueNode;    /**     * 连接状态     */    private String connectionState;    /**     * 会话ID     */    private long sessionId;    /**     * Spring初始化方法     */    public void afterPropertiesSet(){        this.connection();        this.addListener();    }    /**     * 连接     */    public void connection(){        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, Integer.MAX_VALUE);        zkTools = CuratorFrameworkFactory                .builder()                .connectString(hosts)                .namespace(nameSpace)                .retryPolicy(retryPolicy)                .connectionTimeoutMs(connectionTimeOut == null ? 30000 : connectionTimeOut)                .sessionTimeoutMs(sessionTimeOut == null ? 30000 : sessionTimeOut)                .build();        zkTools.start();    }    /**     * 连接状态监听     */    public void addListener(){        zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() {            public void stateChanged(CuratorFramework client, ConnectionState newState) {                if (newState.equals(ConnectionState.CONNECTED)) {                    logger.info("连接");                    connectionState = "CONNECTED";                    try {                        sessionId = zkTools.getZookeeperClient().getZooKeeper().getSessionId();                        registerMonopolyQueue();                    } catch (Exception e) {                        logger.error("注册独占队列失败");                    }                }                if (newState.equals(ConnectionState.RECONNECTED)) {                    logger.info("重新连接");                    connectionState = "CONNECTED";                    try {                        if(sessionId != zkTools.getZookeeperClient().getZooKeeper().getSessionId()) {                            registerMonopolyQueue();                        }                    } catch (Exception e) {                        logger.error("注册独占队列失败");                    }                }                if (newState.equals(ConnectionState.LOST)) {                    logger.info("丢失");                    connectionState = "LOST";                }                if (newState.equals(ConnectionState.SUSPENDED)) {                    logger.info("暂停");                    connectionState = "SUSPENDED";                }                if (newState.equals(ConnectionState.READ_ONLY)) {                    logger.info("只读");                    connectionState = "READ_ONLY";                }            }        });    }    /**     * 注册独占队列     */    private void registerMonopolyQueue() throws Exception {        if(zkTools.checkExists().watched().forPath(Constant.MONOPOLY) == null){            zkTools.create()                    .withMode(CreateMode.PERSISTENT)                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)                    .forPath(Constant.MONOPOLY);            logger.info("创建独享锁队列节点成功!");        }        if(monopolyQueueNode == null || (monopolyQueueNode != null && zkTools.checkExists().forPath(monopolyQueueNode)==null)) {            monopolyQueueNode = zkTools.create()                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)                    .forPath(Constant.MONOPOLY + Constant.SEPARATOR + Constant.QUEUE_NODE);            logger.info("成功加入独享锁队列");        }    }    /**     * 获得独占锁的执行权限     * @return 执行权限标识     * @throws KeeperException     * @throws InterruptedException     */    public boolean getMonopolyLock() throws Exception {        boolean flag = false;        if(connectionState != null && (connectionState.equals("CONNECTED") || connectionState.equals("RECONNECTED"))){            List
 nodes = zkTools.getChildren().watched().forPath(Constant.MONOPOLY);            if(nodes.size() > 0){                Collections.sort(nodes);                //判断当前应用是否在队列的第一位                if((Constant.SEPARATOR + Constant.MONOPOLY + Constant.SEPARATOR + nodes.get(0)).equals(monopolyQueueNode)){                    flag = true;                }            }        }        return flag;    }    /**     * 关闭连接     */    public void close(){        if(zkTools != null){            zkTools.close();            zkTools = null;        }    }    public String getHosts() {        return hosts;    }    public void setHosts(String hosts) {        this.hosts = hosts;    }    public Integer getSessionTimeOut() {        return sessionTimeOut;    }    public void setSessionTimeOut(Integer sessionTimeOut) {        this.sessionTimeOut = sessionTimeOut;    }    public Integer getConnectionTimeOut() {        return connectionTimeOut;    }    public void setConnectionTimeOut(Integer connectionTimeOut) {        this.connectionTimeOut = connectionTimeOut;    }    public String getNameSpace() {        return nameSpace;    }    public void setNameSpace(String nameSpace) {        this.nameSpace = nameSpace;    }}

 

package com.ab.scheduling.quartz.common;import com.ab.scheduling.quartz.JobDetailProxyBean;import com.ab.scheduling.quartz.CronTriggerBean;import com.ab.scheduling.quartz.SimpleTriggerBean;import org.apache.commons.lang.StringUtils;import org.apache.log4j.Logger;import org.quartz.*;import org.quartz.impl.StdSchedulerFactory;import org.quartz.simpl.SimpleThreadPool;import java.util.Properties;/** * Quartz调度工具类 * Date: 14-5-15 * Time: 下午6:10 */public class SchedulerUtils {    protected static Logger logger = Logger.getLogger(SchedulerUtils.class);    /**     * 初始化StdSchedulerFactory     * @return StdSchedulerFactory     */    public static StdSchedulerFactory initStdSchedulerFactory() {        StdSchedulerFactory schedulerFactory = null;        try{            schedulerFactory = (StdSchedulerFactory) Class.forName(StdSchedulerFactory.class.getName()).newInstance();            Properties mergedProps = new Properties();            // 设置Quartz线程池设置            mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());            mergedProps.setProperty("org.quartz.threadPool.threadCount", Integer.toString(10));            schedulerFactory.initialize(mergedProps);        } catch (Exception e){            logger.error("初始化StdSchedulerFactory失败");            logger.error(e);        }        return schedulerFactory;    }    /**     * 装配任务     * @param jobDetail 任务代理类     * @return JobDetail     */    public static JobDetail assemblyJobDetail(JobDetailProxyBean jobDetail){        JobBuilder jobBuilder = JobBuilder.newJob(jobDetail.getClass());        //设置JobDetail身份标识与所属组        String key = jobDetail.getKey();        if(StringUtils.isNotBlank(key)){            jobBuilder = jobBuilder.withIdentity(key, jobDetail.getGroup());        }else{            jobBuilder = jobBuilder.withIdentity(IdentityUtils.generatorUUID("JOB"), jobDetail.getGroup());        }        //设置任务描述        if(StringUtils.isNotBlank(jobDetail.getDescription())){            jobBuilder = jobBuilder.withDescription(jobDetail.getDescription());        }        //设置JobDetail数据参数        JobDataMap jobDataMap = new JobDataMap();        jobDataMap.put("targetObject",jobDetail.getTargetObject());                   //目标对象        jobDataMap.put("targetMethod",jobDetail.getTargetMethod());                   //目标方法        jobDataMap.put("mode", jobDetail.getMode());                                  //运行模式        jobBuilder = jobBuilder.usingJobData(jobDataMap);        return jobBuilder.build();    }    /**     * 装配表达式触发器     * @param cronTriggerBean 表达式触发器     * @return 表达式触发器     */    public static CronTrigger assemblyCronTrigger(CronTriggerBean cronTriggerBean){        TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger();        //设置触发器身份标识与所属组        String key = cronTriggerBean.getKey();        if(StringUtils.isNotBlank(key)){            triggerBuilder = triggerBuilder.withIdentity(key, cronTriggerBean.getGroup());        }else{            triggerBuilder = triggerBuilder.withIdentity(IdentityUtils.generatorUUID("CronTrigger"), cronTriggerBean.getGroup());        }        //设置描述        if(StringUtils.isNotBlank(cronTriggerBean.getDescription())){            triggerBuilder = triggerBuilder.withDescription(cronTriggerBean.getDescription());        }        //设置启动时间        if(StringUtils.isNotBlank(cronTriggerBean.getStartTime())){            triggerBuilder = triggerBuilder.startAt(DateUtils.StringToDate(cronTriggerBean.getStartTime(), "yyyy-MM-dd HH:mm:ss"));        }else{            triggerBuilder = triggerBuilder.startNow();      //当启动时间为空默认立即启动调度器        }        //设置结束时间        if(StringUtils.isNotBlank(cronTriggerBean.getEndTime())){            triggerBuilder = triggerBuilder.endAt(DateUtils.StringToDate(cronTriggerBean.getEndTime(), "yyyy-MM-dd HH:mm:ss"));        }        //设置优先级        if(cronTriggerBean.getPriority() != null){            triggerBuilder = triggerBuilder.withPriority(cronTriggerBean.getPriority());        }        //设置Cron表达式(不允许为空)与集火指令        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronTriggerBean.getCronExpression());        if(cronTriggerBean.getMisfireInstruction() != null){            if(cronTriggerBean.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) {                cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires();            }            if(cronTriggerBean.getMisfireInstruction() == CronTrigger.MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {                cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionFireAndProceed();            }            if(cronTriggerBean.getMisfireInstruction() == CronTrigger.MISFIRE_INSTRUCTION_DO_NOTHING) {                cronScheduleBuilder = cronScheduleBuilder.withMisfireHandlingInstructionDoNothing();            }        }        triggerBuilder = triggerBuilder.withSchedule(cronScheduleBuilder);        return (CronTrigger)triggerBuilder.build();    }    /**     * 装配简单触发器     * @param simpleTriggerBean 简单触发器     * @return 简单触发器     */    public static SimpleTrigger assemblySimpleTrigger(SimpleTriggerBean simpleTriggerBean){        TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger();        //设置触发器身份标识与所属组        String key = simpleTriggerBean.getKey();        if(StringUtils.isNotBlank(key)){            triggerBuilder = triggerBuilder.withIdentity(key, simpleTriggerBean.getGroup());        }else{            triggerBuilder = triggerBuilder.withIdentity(IdentityUtils.generatorUUID("SimpleTrigger"), simpleTriggerBean.getGroup());        }        //设置描述        if(StringUtils.isNotBlank(simpleTriggerBean.getDescription())){            triggerBuilder = triggerBuilder.withDescription(simpleTriggerBean.getDescription());        }        //设置启动时间        if(StringUtils.isNotBlank(simpleTriggerBean.getStartTime())){            triggerBuilder = triggerBuilder.startAt(DateUtils.StringToDate(simpleTriggerBean.getStartTime(), "yyyy-MM-dd HH:mm:ss"));        }else{            triggerBuilder = triggerBuilder.startNow();      //当启动时间为空默认立即启动调度器        }        //设置结束时间        if(StringUtils.isNotBlank(simpleTriggerBean.getEndTime())){            triggerBuilder = triggerBuilder.endAt(DateUtils.StringToDate(simpleTriggerBean.getEndTime(), "yyyy-MM-dd HH:mm:ss"));        }        //设置优先级        if(simpleTriggerBean.getPriority() != null){            triggerBuilder = triggerBuilder.withPriority(simpleTriggerBean.getPriority());        }        //设置简单触发器 时间间隔(不允许为空)、执行次数(默认为-1)与集火指令        SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(20).withRepeatCount(-1);        simpleScheduleBuilder = simpleScheduleBuilder.withIntervalInSeconds(simpleTriggerBean.getInterval());        if(simpleTriggerBean.getRepeatCount() != null){            simpleScheduleBuilder = simpleScheduleBuilder.withRepeatCount(simpleTriggerBean.getRepeatCount());        }else{            simpleScheduleBuilder = simpleScheduleBuilder.withRepeatCount(-1);        }        if(simpleTriggerBean.getMisfireInstruction() != null){            if(simpleTriggerBean.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) {                simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionIgnoreMisfires();            }            if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW) {                simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionFireNow();            }            if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT) {                simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNextWithExistingCount();            }            if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT) {                simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNextWithRemainingCount();            }            if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT) {                simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNowWithExistingCount();            }            if(simpleTriggerBean.getMisfireInstruction() == SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT) {                simpleScheduleBuilder = simpleScheduleBuilder.withMisfireHandlingInstructionNowWithRemainingCount();            }        }        triggerBuilder = triggerBuilder.withSchedule(simpleScheduleBuilder);        return (SimpleTrigger)triggerBuilder.build();    }}

spring配置

    
    
    
    
        
        
        
    
        
        
        
    
        
        
        
        
    
        
            
                
            
                
    

往期精彩回顾

对CSDNJava架构技术,对架构技术感兴趣的同学,欢迎加QQ群,一起学习,相互讨论。

群内已经有小伙伴将知识体系整理好(源码,笔记,PPT,学习视频),欢迎?加群免费领取
分享给喜欢Java,喜欢编程,有梦想成为架构师的程序员们,希望能够帮助到你们。
 

 

转载地址:http://vvaai.baihongyu.com/

你可能感兴趣的文章
Hibernate的悲观锁并发控制机制及LockMode
查看>>
Hibernate中的数据的获取策略(fetching)
查看>>
Hibernate中通过HQL/JPQL查询的方式实现动态数据获取
查看>>
Hibernate中通过FetchProfile的方式实现动态数据获取
查看>>
Hibernate应用中通过JPA配置Entity缓存
查看>>
Hibernate中配置二级缓存的并发策略
查看>>
Hibernate的Entity cache(实体缓存)
查看>>
Hibernate中的Query cache(查询缓存)
查看>>
Hibernate的interceptors与events
查看>>
Android常用代码
查看>>
Cardboard虚拟现实开发初步(二)
查看>>
60个优秀的免费3D模型下载网站
查看>>
Cardboard虚拟现实开发初步(三)
查看>>
Android native和h5混合开发几种常见的hybrid通信方式
查看>>
Vista/Win7 UAC兼容程序开发指南
查看>>
IOS程序开发框架
查看>>
安装jdk的步骤
查看>>
简述JAVA运算符
查看>>
简易ATM源代码及运行结果
查看>>
简述Java中的简单循环
查看>>