要想获取使用指定注解的类信息,可借助工具:
org.reflections.Reflections
此工具将Java反射进行了高级封装,Reflections 通过扫描 classpath,索引元数据,允许在运行时查询这些元数据,也可以保存收集项目中多个模块的元数据信息。
使用 Reflections 可以查询以下元数据信息:
1)获得某个类型的所有子类型2)获得标记了某个注解的所有类型/成员变量,支持注解参数匹配。3)使用正则表达式获得所有匹配的资源文件4)获得所有特定签名(包括参数,参数注解,返回值)的方法
Reflections 依赖 Google 的 库和 库。
Maven引入方式:
org.reflections reflections 0.9.11
sbt引入方式:
"org.reflections" % "reflections" % "0.9.11"
首先自定义注解:
package com.today.service.financetask.job
import java.lang.annotation.*;/** * 类功能描述:job 信息注解 * * @author WangXueXing create at 19-5-4 上午9:14 * @version 1.0.0 */@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Inheritedpublic @interface JobInfo { /** * job id * @return */ String jobId(); /** * job name * @return */ String jobName(); /** * default cron * @return */ String defaultCron();}
将某些类添加注解:
package com.today.service.financetask.job
import java.util.Calendarimport com.today.api.checkaccount.scala.CheckAccountServiceClientimport com.today.api.checkaccount.scala.enums.FlatFormTypeEnumimport com.today.api.checkaccount.scala.request.ReconciliationRequestimport com.today.service.financetask.job.define.AbstractJobimport com.today.service.financetask.utils.JobInfoimport org.quartz.JobExecutionContext/** * 自动对账 */@JobInfo(jobId="CHECK_ACCOUNT_PROCESS", jobName="对账系统自动对账定时任务", defaultCron="0 0 13 * * ?")class CheckAccountJob extends AbstractJob{ /** * start up the scheduled task * * @param context JobExecutionContext */ override def run(context: JobExecutionContext): Unit = { val cal = Calendar.getInstance cal.add(Calendar.DATE, -1) new CheckAccountServiceClient().appReconciliation(new ReconciliationRequest(FlatFormTypeEnum.TODAY_APP,None)) }}
import com.today.service.financetask.action.DailyStatementActionimport com.today.service.financetask.job.define.AbstractJobimport com.today.service.financetask.utils.JobInfoimport org.quartz.JobExecutionContextimport org.springframework.stereotype.Service/** * 日次处理定时任务处理 * * @author zhangc create at 2018/5/11 14:08 * @version 0.0.1 */@Service@JobInfo(jobId="DAY_TIME_PROCESS", jobName="日次处理定时任务", defaultCron="0 30 2 * * ?")class DayTimeProcessJob extends AbstractJob{ /** * start up the scheduled task * * @param context JobExecutionContext */ override def run(context: JobExecutionContext): Unit = { new DailyStatementAction().execute }}
通过Java反射及Reflections工具类实现被JobInfo注解的所有类信息:
import java.utilimport com.today.service.financetask.utils.JobInfoimport org.quartz.Jobimport org.reflections.Reflectionsimport scala.collection.JavaConverters._import scala.collection.mutable/** * 通过注解获取所有通用Job信息 * * @author BarryWang create at 2018/5/12 10:45 * @version 0.0.1 */object JobEnum { /** * 获取添加JobInfo注解的类信息 */ val jobWithAnnotation: util.Set[Class[_]] = new Reflections("com.today.service.financetask.job").getTypesAnnotatedWith(classOf[JobInfo]) /** * 获取所有job枚举值 * @return */ def values : mutable.Set[JobInfo] = jobWithAnnotation.asScala.map(getJobInfo(_)) /** * 根据job class 获取job 信息 * @param jobClass * @return */ def getJobInfo(jobClass : Class[_]): JobInfo = jobClass.getAnnotation(classOf[JobInfo]) /** * jobId与jobName映射关系 * @return */ def jobIdNameMap : Map[String, String]={ jobWithAnnotation.asScala.map{sub => val jobInfo = getJobInfo(sub) Map(jobInfo.jobId() -> jobInfo.jobName()) }.fold(Map())((i,j) => i++j) } /** * JobObject与JobEnum映射关系 * @return */ def jobClassInfoMap: Map[String, JobInfo] = { jobWithAnnotation.asScala.map{sub => Map(sub.getName -> getJobInfo(sub)) }.fold(Map())((i,j) => i++j) } /** * jobId与JobEnum映射关系 * @return */ def jobIdInfoMap: Map[String, JobInfo] = { jobWithAnnotation.asScala.map{sub => val jobInfo = getJobInfo(sub) Map(jobInfo.jobId() -> jobInfo) }.fold(Map())((i,j) => i++j) } /** * jobId与JobObject映射关系 * @return */ def jobIdClassMap: Map[String, Class[_ <: Job]] = { jobWithAnnotation.asScala.map{sub => Map(getJobInfo(sub).jobId() -> sub.asInstanceOf[Class[_ <: Job]]) }.fold(Map[String, Class[_ <: Job]]())((i,j) => i++j) } def main(args: Array[String]): Unit = { println(jobIdClassMap) }}
至此,我们就可以获取所有被特定注解引用的类信息及注解信息,我们就可以全局管理特定类信息。
实现JobEnum后就可以统一对定时任务管理实现:
1.新添加定时任务完全可以制定一个Job子类,其他操作自动维护进去;
2.每个Job子类里面都需要实现 override def getJobAndApiInfo(context: JobExecutionContext): (String, String, JobEnum) 方法,这个也可以省掉,直接在父类统一实现;
3.统一修改定时任务开关及时间接口;
4.统一对定时任务启动时重启,就可以统一重启,不需要单独添加代码启动;
1上面代码片段“将某些类添加注解”
2父类代码如下:
import java.io.{PrintWriter, StringWriter}import com.github.dapeng.core.helper.MasterHelperimport com.today.api.financetask.scala.enums.TScheduledTaskLogEnumimport com.today.service.financetask.action.sql.ScheduledTaskLogSqlimport com.today.service.financetask.util.{AppContextHolder, Debug}import com.today.service.financetask.utils.JobInfoimport org.quartz.{Job, JobExecutionContext}import org.slf4j.LoggerFactoryimport org.springframework.transaction.TransactionStatusimport org.springframework.transaction.support.TransactionTemplateimport scala.util.{Failure, Success, Try}/** * the abstract class for job */trait AbstractJob extends Job{ /** 日志 */ val logger = LoggerFactory.getLogger(getClass) /** * execute the job * @param context */ override def execute(context: JobExecutionContext): Unit = { getJobAndApiInfo(context) match { case Some(x) => execute(context, x.jobId, x.jobName) case None => logger.error("没有定义JobEnum及对应JobObject,请检查") } } /** * execute the job * @param context * @param jobId * @param jobName */ def execute(context: JobExecutionContext, jobId : String, jobName: String): Unit = { //判断是否是选中的master节点,不是master节点不执行定时任务 if (!MasterHelper.isMaster("com.today.api.financetask.service.FinanceScheduledService", "1.0.0")) { logger.info(s"Can't select master to run the job ${jobId}: ${jobName}") return } //记录开始日志 val logId = ScheduledTaskLogSql.insertScheduledTaskLog(jobId) context.put("logId", logId) logger.info(s"Starting the job ${jobId}: ${jobName} ...") //事物处理 val transactionTemplate: TransactionTemplate = AppContextHolder.getBean("transactionTemplate") transactionTemplate.execute((status: TransactionStatus) =>{ Debug.reset() //执行任务 Try(Debug.trace(s"${jobId}:${jobName}")(run(context))) match { case Success(x) => { logger.info(s"Successfully execute the job ${jobId}: ${jobName}") successLog(logId) } case Failure(e) => { logger.error(s"Failure execute the job ${jobId}: ${jobName}", e) failureLog(logId, status, e) } } Debug.info() }) } /** * get the api information * @return (interface name, interface version, JobEnum) */ def getJobAndApiInfo(context: JobExecutionContext) : Option[JobInfo] ={ JobEnum.jobClassInfoMap.get(this.getClass.getName) } /** * start up the scheduled task * @param context JobExecutionContext */ def run(context: JobExecutionContext) /** * 成功日志记录 * @param logId */ def successLog(logId: Long): Unit ={ ScheduledTaskLogSql.updateExportReportRecord(logId, TScheduledTaskLogEnum.SUCCESS, "Success") } /** * 失败日志记录 * @param logId */ def failureLog(logId: Long, status: TransactionStatus, e: Throwable): Unit ={ status.setRollbackOnly() ScheduledTaskLogSql.updateExportReportRecord(logId, TScheduledTaskLogEnum.FAILURE, getExceptionStack(e)) } /** * * 功能说明:在日志文件中 ,打印异常堆栈 * @param e : Throwable * @return : String */ def getExceptionStack(e: Throwable): String = { val errorsWriter = new StringWriter e.printStackTrace(new PrintWriter(errorsWriter)) errorsWriter.toString }}
3 统一修改定时任务开关及时间代码如下:
import com.today.api.financetask.scala.enums.{TScheduledTaskHasDeletedEnum, TScheduledTaskIsStartEnum}import com.today.api.financetask.scala.request.StartOrStopByNameRequestimport com.today.api.financetask.scala.response.ServiceResponseimport com.today.service.commons.Actionimport com.today.service.commons.Assert.assertimport com.today.service.commons.exception.CommonException.illegalArgumentExceptionimport com.today.service.financetask.action.sql.ScheduledTaskActionSqlimport com.today.service.financetask.dto.TScheduledTaskimport com.today.service.financetask.job.define.JobEnumimport com.today.service.financetask.query.sql.ScheduledTaskQuerySqlimport com.today.service.financetask.util.{CronConverter, QuartzManager}/** * @description: 启停定时任务 * @author zhangc * @date 2018\8\1 0001 15:02 * @version 1.0.0 */class StartOrStopByNameAction (request: StartOrStopByNameRequest) extends Action[ServiceResponse] { override def preCheck: Unit = { assert(!TScheduledTaskIsStartEnum.isUndefined(request.flag.id), illegalArgumentException("错误的启停标志!")) } /** * 根据传入的定时任务名称和启停标志,来判断启动或者停止定时任务 * 如果是1则更新数据库,删除定时任务,重新添加定时任务 * 如果是0则更新数据库,删除定时任务 * @return */ override def action: ServiceResponse = { val scheduledTask = ScheduledTaskQuerySql.queryByJobId(request.processName) val cron = CronConverter.convertHourMinuteToCron(request.processTime) val jobInfo = JobEnum.jobIdInfoMap(request.processName) //修改定时任务时间, 保存入库 if(scheduledTask.isEmpty){ ScheduledTaskActionSql.insertTScheduledTask( TScheduledTask(jobInfo.jobName, request.processName, cron, None, request.flag.id, None, null, null, TScheduledTaskHasDeletedEnum.NO.id)) } else { ScheduledTaskActionSql.updateTaskIsStart(request.flag.id,cron, request.processName) } request.flag match { case TScheduledTaskIsStartEnum.YES => QuartzManager.modifyJobTime(request.processName, cron, JobEnum.jobIdClassMap(jobInfo.jobId())) case _ => QuartzManager.removeJob(request.processName) } ServiceResponse("200","success") }}
4下面就是统一对定时任务启动时重启,就可以统一重启,不需要单独添加代码启动:
import com.today.api.financetask.scala.enums.TScheduledTaskIsStartEnumimport com.today.api.financetask.scala.request.QueryAutoConfigRequestimport com.today.service.financetask.job._import com.today.service.financetask.job.define.JobEnumimport com.today.service.financetask.query.sql.{AutoConfigQuerySql, ScheduledTaskQuerySql}import com.today.service.financetask.util.QuartzManagerimport com.today.service.financetask.utils.JobInfoimport org.slf4j.LoggerFactoryimport org.springframework.context.ApplicationListenerimport org.springframework.context.event.ContextRefreshedEventimport org.springframework.stereotype.Service/** * 类功能描述: 定时器监听器, 服务启动时启动定时器 * * @author BarryWang create at 2018/5/11 12:04 * @version 0.0.1 */@Serviceclass ScheduleStartListener extends ApplicationListener[ContextRefreshedEvent] { /** 日志 */ val logger = LoggerFactory.getLogger(getClass) /** * 启动加载执行定时任务 */ override def onApplicationEvent(event: ContextRefreshedEvent): Unit = { logger.info("=======服务器重启定时任务启动start=======") //启动服务时恢复常规定时任务 JobEnum.values.foreach(recoveryCommonJob(_)) logger.info("=======服务器重启定时任务启动end=======") } /** * 恢复通用定时任务 * @param jobInfo 定时任务枚举 */ private def recoveryCommonJob(jobInfo: JobInfo)={ try { val jobClass = JobEnum.jobIdClassMap(jobInfo.jobId) ScheduledTaskQuerySql.queryByJobId(jobInfo.jobId) match { case Some(x) => { x.isStart match { case TScheduledTaskIsStartEnum.YES.id => { QuartzManager.addJobByCron(jobInfo.jobId, x.jobCron, jobClass) logger.info(s"定时任务:'${jobInfo.jobName}'启动成功!") } case _ => logger.info(s"定时任务:'${jobInfo.jobName}'is_start标志为0,不启动") } } case None => QuartzManager.addJobByCron(jobInfo.jobId, jobInfo.defaultCron(), jobClass) } } catch { case e : Exception => logger.error(s"定时任务:'${jobInfo.jobName}'启动失败, 失败原因:", e) } }}
本部分也是对Quartz实现可配置的分布式定时任务的优化重构,可详见: