PowerJob(原OhMyScheduler)是全新一代分布式任务调度与计算框架,其主要功能特性如下:
备注:固定延迟和固定频率任务统称秒级任务,这两种任务无法被停止,只有任务被关闭或删除时才能真正停止任务。
git clone https://Github.com/PowerJob/PowerJob.git
导入 IDE,源码结构如下,我们需要启动调度服务器(powerjob-server),同时在 samples 工程中编写自己的处理器代码
powerjob-server 日常环境配置文件:application-daily.properties
oms.env=DAILY
logging.cnotallow=classpath:logback-dev.xml
####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimeznotallow=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 资源清理配置 #######
oms.instanceinfo.retentinotallow=1
oms.container.retention.local=1
oms.container.retention.remote=-1
####### 缓存配置 #######
oms.instance.metadata.cache.size=1024
图片
本地需要安装docker和docker-compose
git clone --depth=1 https://github.com/PowerJob/PowerJob.git
cd PowerJob
docker-compose up
docker-compose up -d
刚开始启动时,powerjob-worker-samples会启动失败,等powerjob-server启动成功后,powerjob-worker-samples才会启动成功。这大概需要几分钟。
运行成功后,浏览器访问 http://127.0.0.1:7700/
应用名称:powerjob-worker-samples
密码:powerjob123
docker-compose down
Stopping powerjob-worker-samples ... done
Stopping powerjob-server ... done
Stopping powerjob-mysql ... done
Removing powerjob-worker-samples ... done
Removing powerjob-server ... done
Removing powerjob-mysql ... done
cd PowerJob
rm -rf powerjob-data
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>${latest.powerjob.version}</version>
</dependency>
powerjob:
worker:
# akka 工作端口,可选,默认 27777
akka-port: 27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
app-name: ${spring.application.name}
# 调度服务器地址,IP:Port 或 域名,多值逗号分隔
server-address: 81.70.117.188:7700
# 持久化方式,可选,默认 disk
store-strategy: disk
# 任务返回结果信息的最大长度,超过这个长度的信息会被截断,默认值 8192
max-result-length: 8192
# 单个任务追加的工作流上下文最大长度,超过这个长度的会被直接丢弃,默认值 8192
max-appended-wf-context-length: 8192
处理器(Processor)开发
PowerJob 支持 Python、Shell、HTTP、SQL 等众多通用任务的处理,开发者只需要引入依赖,在控制台配置好相关参数即可,关于这部分详见 官方处理器 ,此处不再赘述。本章将重点阐述 Java 处理器开发方法与使用技巧。
单机处理器(BasicProcessor)对应了单机任务,即某个任务的某次运行只会有某一台机器的某一个线程参与运算。
广播处理器(BroadcastProcessor)对应了广播任务,即某个任务的某次运行会调动集群内所有机器参与运算。
Map处理器(MapProcessor)对应了Map任务,即某个任务在运行过程中,允许产生子任务并分发到其他机器进行运算。
MapReduce 处理器(MapReduceProcessor)对应了 MapReduce 任务,在 Map 任务的基础上,增加了所有任务结束后的汇总统计。
TaskContext 包含了本次任务的上下文信息,具体信息如下
属性列表(红色标注的为常用属性) |
|
属性名称 |
意义/用法 |
jobId |
任务 ID,开发者一般无需关心此参数 |
instanceId |
任务实例 ID,全局唯一,开发者一般无需关心此参数 |
subInstanceId |
子任务实例 ID,秒级任务使用,开发者一般无需关心此参数 |
taskId |
采用链式命名法的 ID,在某个任务实例内唯一,开发者一般无需关心此参数 |
taskName |
task 名称,Map/MapReduce 任务的子任务的值为开发者指定,否则为系统默认值,开发者一般无需关心此参数 |
jobParams |
任务参数 对于非工作流中的任务其值等同于控制台录入的任务参数;如果该任务为工作流中的任务且有配置节点参数信息,那么接收到的是节点配置的参数信息 |
instanceParams |
任务实例参数 对于非工作流中的任务 其值 等同于 OpenAPI 传递的实例参数,非 OpenAPI 触发的任务则一定为空。如果该任务为工作流中的任务那么这里实际接收到的是工作流上下文信息,建议使用 getWorkflowContext 方法获取上下文信息
|
maxRetryTimes |
Task 的最大重试次数 |
currentRetryTimes |
Task 的当前重试次数,和 maxRetryTimes 联合起来可以判断当前是否为该 Task 的最后一次运行机会 |
subTask |
子 Task,Map/MapReduce 处理器专属,开发者调用map方法时传递的子任务列表中的某一个 |
omsLogger |
在线日志,用法同 Slf4J,记录的日志可以直接通过控制台查看,非常便捷和强大!不过使用过程中需要注意频率,滥用在线日志会对 Server 造成巨大的压力 |
userContext |
用户在 PowerJobWorkerConfig 中设置的自定义上下文 |
workflowContext |
工作流上下文,更多信息见下方说明 |
该属性是 v4.0.0 版本的重大变更之一,移除了原来的参数传递机制,提供了 API 让开发者可以更加灵活便捷地在工作流中实现信息的传递。
属性列表 |
|
属性名称 |
意义/用法 |
wfInstanceId |
工作流实例 ID |
data |
工作流上下文数据,键值对 |
appendedContextData |
当前任务向工作流上下文中追加的数据。在任务执行完成后 ProcessorTracker 会将其上报给 TaskTracker,TaskTracker 在当前任务执行完成后会将这个信息上报给 server ,追加到当前的工作流上下文中,供下游任务消费 |
上游任务通过 WorkflowContext#appendData2WfContext(String key,Object value) 方法向工作流上下文中追加数据,下游任务便可以通过 WorkflowContext#fetchWorkflowContext() 方法获取到相应的数据进行消费。注意,当追加的上下文信息的 key 已经存在于当前的上下文中时,新的 value 会覆盖之前的值。另外,每次任务实例追加的上下文数据大小也会受到 worker 的配置项 powerjob.worker.max-appended-wf-context-length 的限制,超过这个长度的会被直接丢弃。
方法的返回值为 ProcessResult,代表了本次 Task 执行的结果,包含 success 和 msg 两个属性,分别用于传递 Task 是否执行成功和 Task 需要返回的信息。
单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。单机执行任务需要实现接口 BasicProcessor,代码示例如下:
/**
* @Author iron.guo
* @Date 2023/1/7
* @Description
*/
@Component
@Slf4j
public class StandaloneProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("处理器启动成功,context 是 {}.", context);
log.info("单机处理器正在处理");
log.info(context.getJobParams());
omsLogger.info("处理器执行结束");
boolean success = true;
return new ProcessResult(success, context + ": " + success);
}
}
广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在BasicProcessor 的基础上额外增加了 preProcess 和 postProcess 方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。代码示例如下:
@Slf4j
@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("预执行,会在所有 worker 执行 process 方法前调用");
log.info("预执行,会在所有 worker 执行 process 方法前调用");
// 预执行,会在所有 worker 执行 process 方法前调用
return new ProcessResult(true, "init success");
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
// 撰写整个worker集群都会执行的代码逻辑
omsLogger.info("撰写整个worker集群都会执行的代码逻辑");
log.info("撰写整个worker集群都会执行的代码逻辑");
return new ProcessResult(true, "release resource success");
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
// taskResults 存储了所有worker执行的结果(包括preProcess)
// 收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果");
log.info("收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果");
return new ProcessResult(true, "process success");
}
}
MapReduce 是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分,将子任务派发到集群中其他Worker 执行,是执行大批量处理任务的不二之选!实现 MapReduce 处理器需要继承 MapReduceProcessor类,具体用法如下示例代码所示:
@Slf4j
@Component
public class MapReduceProcessorDemo implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
final OmsLogger omsLogger = context.getOmsLogger();
// 判断是否为根任务
if (isRootTask()) {
// 构造子任务
List<SubTask> subTaskList = Lists.newLinkedList();
SubTask subTask=new SubTask();
subTask.setSiteId(1L);
subTask.setName("iron.guo");
subTaskList.add(subTask);
/*
* 子任务的构造由开发者自己定义
* eg. 现在需要从文件中读取100W个ID,并处理数据库中这些ID对应的数据,那么步骤如下:
* 1. 根任务(RootTask)读取文件,流式拉取100W个ID,并按1000个一批的大小组装成子任务进行派发
* 2. 非根任务获取子任务,完成业务逻辑的处理
*/
// 调用 map 方法,派发子任务(map 可能会失败并抛出异常,做好业务操作)
map(subTaskList, "DATA_PROCESS_TASK");
omsLogger.info("执行根任务-派发子任务");
return new ProcessResult(true, "ROOT_PROCESS_SUCCESS");
}
// 非子任务,可根据 subTask 的类型 或 TaskName 来判断分支
if (context.getSubTask() instanceof SubTask) {
omsLogger.info("执行子任务开始");
omsLogger.info("Get from SubTask : name is {} and id is {}",((SubTask) context.getSubTask()).getName(),((SubTask) context.getSubTask()).getSiteId());
// 执行子任务,注:子任务人可以 map 产生新的子任务,可以构建任意级的 MapReduce 处理器
return new ProcessResult(true, "PROCESS_SUB_TASK_SUCCESS");
}
return new ProcessResult(false, "UNKNOWN_BUG");
}
@Override
public ProcessResult reduce(TaskContext taskContext, List<TaskResult> taskResults) {
// 所有 Task 执行结束后,reduce 将会被执行
// taskResults 保存了所有子任务的执行结果
// 用法举例,统计执行结果
AtomicLong successCnt = new AtomicLong(0);
taskResults.forEach(tr -> {
if (tr.isSuccess()) {
successCnt.incrementAndGet();
}
});
// 该结果将作为任务最终的执行结果
return new ProcessResult(true, "success task num:" + successCnt.get());
}
// 自定义的子任务
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
private static class SubTask {
private Long siteId;
private String name;
}
}
注:Map 处理器相当于 MapReduce 处理器的阉割版本(阉割了 reduce 方法),此处不再单独举例。
点击右上角按钮 新建工作流,即可录入新的工作流,具体界面和说明如下所示。
CRON -> 填写 CRON 表达式(在线生成网站)
API -> 不需要填写任何参数,表明该任务由 OpenAPI 触发
v4.0.0 以后支持节点的自由拖拉拽,不用再点点点了,哈哈哈 ~
任务为之前创建的任务,可用工作流形式串联起来执行。
点击需要编辑的节点,在右侧会弹出一个编辑框,如下图所示
判断节点 不允许失败跳过以及禁用,节点参数中存储的是 Groovy 代码(执行 Groovy 代码时会将当前工作流上下文作为 context 变量注入到代码执行的上下文中),其执行结果仅能返回 "true" 或者 "false",同时判断节点仅有且必须有两条“输出”路径。会根据该代码的执行结果决定下游需要执行的节点。这里处理的原则是, 仅 cancel 那些只能通过被 disable 掉的边可达的节点
举个两个栗子,灰色代表相应的边 或者 节点被 disable 或 cancel,菱形代表判断节点,假定执行结果为 true
case 3 以及 case 4 中的节点 3 都会被 cancel ,因为它只能通过节点 1 -> 节点 3 的边可达(该边的属性为 false),但对于节点 5 而言,在 case 4 中因为判断节点 2 的执行结果为 true ,那么其可以通过节点 2 -> 节点 5 的边可达,所以不会被 disable 。
备注:如果需要根据上游节点的执行结果决定下游节点,可以将上游节点的执行结果注入上下文中,再在判断节点中做相应的判断。
该节点代表对某个工作流的引用,节点的 jobId 属性存储的是工作流 id,其他属性和普通的任务节点一致。不允许出现循环引用以及多级嵌套的情况,即嵌套节点中指向的工作流一定是一个不含嵌套节点的工作流。
执行到该节点时,如果该节点处于启用状态,那么将启动该节点所引用工作流的一个新实例,待该实例执行完成后再同步更新该节点的状态。
注意,创建子工作流时,会透传当前的上下文作为工作流的实例参数,在子工作流执行完成时会合并子工作流的上下文至父工作流的上下文中。
重试子工作流不会联动重试父工作流,但失败的子工作流会随着父工作流的重试而原地重试(不会生成新的实例)