基于模板设计模式对数据迁移封装

2023/08/23

1、数据迁移任务封装

请求控制层DataTransferController

通过Reqeust请求,启动一次数据迁移任务

@RequiredArgsConstructor(onConstructor_={@Autowired})
@RestController
@RequestMapping("/v1/dataTransfer")
public class DataTransferController {

    private final DataTransferJobManager dataTransferJobManager;

    //用户名
    @Value("${business.data-transfer.user:aa64865487710aba549516e5c81fa2a1}")
    private String startUser;
    //用户密码
    @Value("${business.data-transfer.pwd:518be6ab336bccd6cc7e93c227fd1ac6}")
    private String startPwd;

    /**
     * 启动作业
     * @param jobName 作业名称
     * @param user 用户
     * @param pwd 密码
     * @return R
     */
    @ApiOperation(value = "启动作业", notes = "启动作业")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "user",value="帐号",required = true),
            @ApiImplicitParam(name = "pwd", value = "密码", required = true),
    })
    @GetMapping("/start")
    public R<String> start(@RequestParam("jobName") String jobName,@RequestParam("user")String user,@RequestParam("pwd")String pwd){
        if(!startUser.equals(user) || !startPwd.equals(pwd)){
            throw new SystemRuntimeException("启动作业认证用户或密码不正确");
        }

        LoginAccountVO loginUser = HttpRequestHandler.getRequestUser(true);
        dataTransferJobManager.start(jobName,loginUser.getLoginName());
        return R.buildSuccess("启动作业成功");
    }

    /**
     * 重启作业
     * @param jobId 作业Id
     * @param user 用户
     * @param pwd 密码
     * @return R
     */
    @ApiOperation(value = "重启作业", notes = "重启作业")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "user",value="帐号",required = true),
            @ApiImplicitParam(name = "pwd", value = "密码", required = true),
    })
    @GetMapping("/restart")
    public R<String> restart(@RequestParam("jobId") String jobId,@RequestParam("user")String user,@RequestParam("pwd")String pwd){
        if(!startUser.equals(user) || !startPwd.equals(pwd)){
            throw new SystemRuntimeException("重启作业认证用户或密码不正确");
        }

        LoginAccountVO loginUser = HttpRequestHandler.getRequestUser(true);
        dataTransferJobManager.restart(jobId,loginUser.getLoginName());
        return R.buildSuccess("重启作业成功");
    }
}

Job管理接口DataTransferJobManager

根据作业名称jobName启动任务,看数据迁移作业管理接口 DataTransferJobManager

/**
 * <p>
 * 数据迁移作业 管理器接口
 * </p>
 *
 * @author liJY
 * @Date 2023-06-20
 */
public interface DataTransferJobManager {
    /**
     * 添加作业
     * @param job 作业
     */
    void add(DataTransferJob job);

    /**
     * 初始化作业
     */
    void initJobs();

    /**
     * 启动作业
     * @param jobName 作业名称
     * @param userAccount 用户帐号
     */
    void start(String jobName,String userAccount);

    /**
     * 重动作业
     * @param jobId 作业Id
     * @param userAccount 用户帐号
     */
    void restart(String jobId,String userAccount);

    /**
     * 获取执行器
     * @param jobName 作业名称
     * @param executorKey 执行器键名
     * @return 执行器
     */
    DataTransferExecutor getExecutor(String jobName,String executorKey);
}

接口实现类DataTransferJobManagerImpl

@RequiredArgsConstructor(onConstructor_={@Autowired})
@Slf4j
@RefreshScope
public class DataTransferJobManagerImpl implements DataTransferJobManager {
    //作业映射.结构:Map<作业名称,DataTransferJob>
    private Map<String,DataTransferJob> jobMap=new HashMap<>();

    //MQ批量消息发送大小
    @Value(value="${rocketmq.batch-message-size.data-transfer:3500}")
    private int mqBatchMessageSize;
    //消息主题前缀
    @Value(value="${rocketmq.topic-prefix.data-transfer:TOPIC_DATA_TRANSFER}")
    private String mqTopicPrefix;
    //消费者组前缀
    @Value(value = "${rocketmq.consumer.group.prefix.data-transfer:CONSUMER_DATA_TRANSFER}")
    private String mqConsumerGroupPrefix;
    //NameServer
    @Value(value="${rocketmq.name-server}")
    private String mqNameServer;

    private DataTransferTaskService dataTransferTaskService;
    private IdGenerator idGenerator;
    private RocketMQTemplate rocketMQTemplate;
    private DataTransferItemService dataTransferItemService;
    private ApplicationContext applicationContext;

    @Autowired
    public void setDataTransferTaskService(DataTransferTaskService dataTransferTaskService) {
        this.dataTransferTaskService = dataTransferTaskService;
    }
    @Autowired
    public void setIdGenerator(IdGenerator idGenerator) {
        this.idGenerator = idGenerator;
    }
    @Autowired
    public void setRocketMQTemplate(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
    }
    @Autowired
    public void setDataTransferItemService(DataTransferItemService dataTransferItemService) {
        this.dataTransferItemService = dataTransferItemService;
    }
    @Autowired
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    /**
     * 添加作业
     * @param job 作业
     */
    @Override
    public void add(DataTransferJob job) {
        if(jobMap.containsKey(job.getName())){
            log.warn("作业已存在,jobName:{}",job.getName());
            throw new SystemRuntimeException("当前作业已存在");
        }
        jobMap.put(job.getName(),job);
    }

    /**
     * 初始化作业
     */
    @Override
    @PostConstruct
    public void initJobs(){
        //获取所有作业
        if(jobMap.isEmpty()){
            log.warn("缺少作业信息,退出处理");
            return;
        }
        jobMap.forEach((jobName,job)->{
            try{
                log.info("开始初始化Job:{}",jobName);
                //根据作业的执行器,初始化MQ消费者.执行器结构:Map<执行器键名标识,DataTransferExecutor>
                Map<String,DataTransferExecutor> executorMap=job.getExecutorMap();
                if(executorMap.isEmpty()){
                    log.warn("作业缺少执行器信息,退出处理jobName:{}",job.getName());
                    return;
                }
                executorMap.forEach((executorKey,executor)->{
                    try{
                        log.info("开始初始化执行器:{}",executor.getKey());
                        initMqConsumer(job.getName(),executor);
                        log.info("完成初始化执行器:{}",executor.getKey());
                    }catch (Exception e){
                        log.error(MessageFormat.format("失败初始化执行器:{0}",executor.getKey()),e);
                    }
                });
                log.info("完成初始化Job:{}",jobName);
            }catch (Exception e){
                log.error(MessageFormat.format("失败初始化Job:{0}",jobName),e);
            }
        });
    }

    /**
     * 启动作业
     * @param jobName 作业名称
     * @param userAccount 用户帐号
     */
    @Override
    public void start(String jobName,String userAccount) {
        //1.获取作业任务执行器映射
        DataTransferJob job=jobMap.get(jobName);
        if(job==null){
            log.warn("缺少作业信息,jobName:{}",jobName);
            throw new SystemRuntimeException("缺少作业信息");
        }
        //结构:Map<执行器键名标识,DataTransferExecutor>
        Map<String,DataTransferExecutor> executorMap=job.getExecutorMap();

        //2.每一次运行,则属不同的作业实例,生成作业id
        String jobId=idGenerator.nextId("");

        //3.根据执行器创建任务实例
        executorMap.forEach((executorKey,executor)->{
            //通过处理器,获取数据总量
            if(executor.getHandler()==null){
                log.warn("执行器缺少处理器,executorKey:{}",executor.getKey());
                throw new SystemRuntimeException("执行器缺少处理器");
            }
            int total=executor.getHandler().readTotal();

            //创建启动模式任务实例.新启动任务时,来源任务Id为'空'
            DataTransferTaskInstance taskInstance=this.createTask(jobName,jobId,"",executor,DataTransferStartModeEnum.START,total,userAccount);
            if(taskInstance.getTotal()>0){
                //启动任务
                this.startTask(taskInstance);
            }
        });
    }
    
      /**
     * 重动作业
     * @param jobId 作业Id
     * @param userAccount 用户帐号
     */
    @Override
    public void restart(String jobId,String userAccount){
        //1.获取作业下可重启的任务
        List<DataTransferTask> tasks=this.dataTransferTaskService.getRestartTasks(jobId);
        if(CollectionUtils.isEmpty(tasks)){
            log.warn("没有需要重启的任务,jobId:{}",jobId);
            throw new SystemRuntimeException("没有需要重启的任务");
        }

        //2.获取作业任务执行器映射
        //==获取作业名称(同一作业id,作业名称一致)
        String jobName=tasks.get(0).getJobName();
        DataTransferJob job=jobMap.get(jobName);
        if(job==null){
            log.warn("缺少作业信息,jobName:{}",jobName);
            throw new SystemRuntimeException("缺少作业信息");
        }
        //结构:Map<执行器键名标识,DataTransferExecutor>
        Map<String,DataTransferExecutor> executorMap=job.getExecutorMap();

        //3.每一次运行,则属不同的作业实例,生成作业id
        String newJobId=idGenerator.nextId("");

        //4.根据重启的任务列表,创建重启任务实例
        tasks.stream().forEach(task->{
            //获取执行器
            DataTransferExecutor executor=executorMap.get(task.getExecutorKey());

            //获取处理器,并获取任务下迁移失败事项的总数据量
            if(executor.getHandler()==null){
                log.warn("执行器缺少处理器,executorKey:{}",executor.getKey());
                throw new SystemRuntimeException("执行器缺少处理器");
            }
            int total=this.dataTransferItemService.getRestartItemCount(task.getId());

            //创建重启模式的任务实例.来源任务Id为待重启的任务Id
            DataTransferTaskInstance taskInstance=this.createTask(jobName,newJobId,task.getId(),executor,DataTransferStartModeEnum.RESTART,total,userAccount);
            //启动任务
            this.startTask(taskInstance);
        });
    }

     /**
     * 获取执行器
     * @param jobName 作业名称
     * @param executorKey 执行器键名
     * @return 执行器
     */
    @Override
    public DataTransferExecutor getExecutor(String jobName,String executorKey){
        //获取作业
        DataTransferJob job=jobMap.get(jobName);
        if(job==null){
            throw new SystemRuntimeException("缺少作业信息");
        }

        //获取执行器
        DataTransferExecutor executor=job.getExecutorMap().get(executorKey);
        if(executor==null){
            throw new SystemRuntimeException("缺少执行器信息");
        }
        return executor;
    }
    
    /**
     * 初始化MQ消费者
     * @param jobName 作业名称
     * @param executor 执行器
     */
    private void initMqConsumer(String jobName, DataTransferExecutor executor){
        String consumerGroup=getConsumerGroup(mqConsumerGroupPrefix,jobName,executor.getKey());
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setConsumeThreadMin(executor.getThreadSize());
        consumer.setConsumeThreadMax(executor.getThreadSize());
        //每次只消费1条消息,批量消费的最大消息数量,缓存的消息数量达到参数设置的值,Push消费者SDK会将缓存的消息统一提交给消费线程,实现批量消费
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setConsumeTimeout(executor.getTimeout());
        consumer.setNamesrvAddr(mqNameServer);
        //不作重试处理,若想重试任务,需使用任务的restart功能
        consumer.setMaxReconsumeTimes(0);
        //获取topic
        String topic=this.getMsgTopic(mqTopicPrefix,jobName);
        //获取tag
        String tag=this.getMsgTag(executor.getKey());
        try {
            /*
            说明:
            1.一个作业,使用一个topic
            2.一个执行器,使用一个consumer,但使用不同tag
            */
            consumer.subscribe(topic, tag);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    //每次只会消费1条消息
                    MessageExt msgExt=msgs.get(0);
                    //输出消息日志
                    Map<String,Object> msgFields=new HashMap<>();
                    msgFields.put("msgId",msgExt.getMsgId());
                    msgFields.put("reconsumeTimes",msgExt.getReconsumeTimes());
                    msgFields.put("keys",msgExt.getKeys());
                    msgFields.put("topic",msgExt.getTopic());
                    msgFields.put("tags",msgExt.getTags());
                    String body=new String(msgExt.getBody(), StandardCharsets.UTF_8);
                    msgFields.put("body",body);
                    log.info("接收到数据迁移消息,msgFields:{}", msgFields);

                    DataTransferBatch batch=null;
                    try{
                        //转换为批次信息
                        batch=JsonUtils.toObject(body,DataTransferBatch.class);
                    }catch (Exception e){
                       log.error(MessageFormat.format("数据迁移批次信息转换异常,msgFields:{}",msgFields),e);

                        //无论如何,均返回消费成功
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }

                    //执行单批次数据迁移
                    try {
                        log.info("执行单批次数据迁移开始.jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
                        DataTransferBatchHandler batchHandle = (DataTransferBatchHandler) applicationContext.getBean(batch.getModel().getBeanName());
                        batchHandle.execute(batch);
                        log.info("执行单批次数据迁移完成.jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
                    }catch (Exception e){
                        log.error(MessageFormat.format("执行单批次数据迁移失败.jobName:{0},taskId:{1},batchNum:{2}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum()),e);
                    }

                    //无论如何,均返回消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }catch (Exception e){
            log.error(MessageFormat.format("数据迁移MQ消费者启动失败,jobName:{0},executorKey:{1}",jobName,executor.getKey()),e);
            throw new SystemRuntimeException("数据迁移MQ消费者启动失败");
        }
    }

    /**
     * 创建任务
     * @param jobName 作业名称
     * @param jobId 作业Id
     * @param sourceId 来源任务Id
     * @param executor 执行器
     * @param startMode 启动模式
     * @param total 总数据量
     * @param userAccount 用户帐号
     * @return 任务实例
     */
    private DataTransferTaskInstance createTask(String jobName, String jobId, String sourceId,DataTransferExecutor executor, DataTransferStartModeEnum startMode,int total,String userAccount){
        //获取处理的总批次量
        int batchCount=this.getBatchCount(total,executor.getBatchSize());

        //保存任务
        //==新启动任务,来源任务Id为空
        DataTransferTask task=new DataTransferTask();
        task.setJobName(jobName);
        task.setJobId(jobId);
        task.setExecutorKey(executor.getKey());
        task.setBatchSize(executor.getBatchSize());
        task.setBatchCount(batchCount);
        task.setModuleType(executor.getModuleType());
        task.setBusinessType(executor.getBusinessType());
        task.setTotal(total);
        task.setSourceId(sourceId);
        String taskId=dataTransferTaskService.add(task,userAccount);

        //创建任务实例
        DataTransferTaskInstance taskInstance=new DataTransferTaskInstance();
        taskInstance.setJobName(jobName);
        taskInstance.setJobId(jobId);
        taskInstance.setExecutorKey(executor.getKey());
        taskInstance.setTaskId(taskId);
        taskInstance.setSourceId(sourceId);
        taskInstance.setBatchSize(executor.getBatchSize());
        taskInstance.setBatchCount(batchCount);
        taskInstance.setTotal(total);
        taskInstance.setModuleType(executor.getModuleType());
        taskInstance.setBusinessType(executor.getBusinessType());
        taskInstance.setThreadSize(executor.getThreadSize());
        taskInstance.setTimeout(executor.getTimeout());
        taskInstance.setModel(startMode);
        return taskInstance;
    }


    /**
     * 获取数据批次处理总量
     * @param total 总数量
     * @param batchSize 批次大小
     * @return 批次总量
     */
    private int getBatchCount(Integer total,Integer batchSize){
        if(total==0){
            return 0;
        }
        int count=total/batchSize;
        if(total % batchSize !=0){
            count=count+1;
        }
        return count;
    }

    /**
     * 运行任务实例
     * @param taskInstance 任务实例
     */
    private void startTask(DataTransferTaskInstance taskInstance){
        //根据处理的批次总量,按批次发送MQ消息
        List<Message<DataTransferBatch>> messages = new ArrayList<>();

        for(int batchNum=1;batchNum<=taskInstance.getBatchCount();batchNum++){
            //创建批次信息
            DataTransferBatch batch=new DataTransferBatch();
            batch.setJobName(taskInstance.getJobName());
            batch.setExecutorKey(taskInstance.getExecutorKey());
            batch.setTaskId(taskInstance.getTaskId());
            batch.setSourceId(taskInstance.getSourceId());
            batch.setBatchSize(taskInstance.getBatchSize());
            batch.setBatchCount(taskInstance.getBatchCount());
            batch.setBatchNum(batchNum);
            batch.setTotal(taskInstance.getTotal());
            batch.setModel(taskInstance.getModel());
            //获取消息Key
            String msgKey=this.getMsgKey(batch.getExecutorKey(),batchNum);
            messages.add(MessageBuilder.withPayload(batch).setHeader("KEYS",msgKey).build());

            //判断是否需执行消息发送
            boolean isSend=this.isSendMessage(messages.size(),mqBatchMessageSize,batchNum,taskInstance.getBatchCount());
            if(isSend){
                //获取消息目的地
                String msgDest=MessageFormat.format("{0}:{1}",this.getMsgTopic(mqTopicPrefix,batch.getJobName()),this.getMsgTag(batch.getExecutorKey()));
                //发送消息
                SendResult sendRet=rocketMQTemplate.syncSend(msgDest,messages,10*1000);
                if(!SendStatus.SEND_OK.equals(sendRet.getSendStatus())){
                    log.warn("数据迁移消息发送失败,batch:{},sendRet:{}",batch,sendRet);
                    throw new SystemRuntimeException("消息发送失败");
                }
                //消息发送完毕,则清除
                messages.clear();
            }
        }
    }

    /**
     * 是否执行消息发送
     * @param currentMsgSize 当前消息大小
     * @param sendSize 消息发送大小
     * @param currentMsgCount 当前消息总量
     * @param msgTotal 消息总量
     * @return 是、否
     */
    private Boolean isSendMessage(Integer currentMsgSize,Integer sendSize,Integer currentMsgCount,Integer msgTotal){
        if(currentMsgSize % sendSize==0 || currentMsgCount>=msgTotal){
            //当前消息大小到达发送大小,或当前消息已是最后一条消息,则执行发送
            return true;
        }
        return false;
    }

    /**
     * 获取消息Topic
     * @param topicPrefix 消息主题前缀
     * @param jobName 作业名称
     */
    private String getMsgTopic(String topicPrefix,String jobName){
        //作业名称:作为topic
        return MessageFormat.format("{0}_{1}",topicPrefix,jobName.toUpperCase());
    }

    /**
     * 获取消息tag
     * @param executorKey 执行器键名标识
     * @return 消息tag
     */
    private String getMsgTag(String executorKey){
        //执行器键名标识:作为tag
        return MessageFormat.format("TAG_{0}",executorKey.toUpperCase());
    }

    /**
     * 获取消息Key
     * @param executorKey 执行器键名标识
     * @param batchNum 批次序号
     * @return 消息Key
     */
    private String getMsgKey(String executorKey,int batchNum){
        //Key=执行器键名标识_批次序号
        String result=MessageFormat.format("{0}_{1}",executorKey,batchNum);
        return result;
    }

    /**
     * 获取消费者组
     * @param groupPrefix 消费者组前缀
     * @param jobName 作业名称
     * @param executorKey 执行器名称
     * @return 消费者组全名称
     */
    private String getConsumerGroup(String groupPrefix,String jobName,String executorKey){
        String result=MessageFormat.format("{0}_{1}_{2}",groupPrefix,jobName.toUpperCase(),executorKey.toUpperCase());
        return result;
    }
}

数据迁移作业类DataTransferJob

/**
 * <p>
 * 数据迁移作业 对象
 * </p>
 *
 * @author liJY
 * @Date 2023-06-20
 */
@Slf4j
@Getter
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value="DataTransferJob对象", description="数据迁移作业 对象")
public class DataTransferJob{

    /**
     * 名称
     */
    private String name;

    /**
     * 任务执行器映射.结构:Map(执行器键名标识,DataTransferExecutor)
     */
    private Map<String,DataTransferExecutor> executorMap=new HashMap<>();

    /**
     * 构造函数
     * @param name 名称
     */
    public DataTransferJob(String name){
        this.name=name;
    }

    /**
     * 添加执行器
     * @param executor 执行器
     */
    public void add(DataTransferExecutor executor){
        if(StringUtils.isEmpty(executor.getKey())){
            log.warn("执行器缺少键名标识");
            throw new SystemRuntimeException("执行器缺少键名标识");
        }
        if(executorMap.containsKey(executor.getKey())){
            log.warn("执行器已存在,jobName:{},key:{}",this.name,executor.getKey());
            throw new SystemRuntimeException("执行器已存在");
        }
        executorMap.put(executor.getKey(),executor);
    }
}

数据迁移执行器DataTransferExecutor

一个作业可配多个执行器

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class DataTransferExecutor<SOURCE,TARGET>{
    /**
     * 键名标识
     */
    private String key;

    /**
     * 数据处理批次大小
     */
    private int batchSize;

    /**
     * 处理线程大小(消费者线程数)
     */
    private int threadSize;

    /**
     * 处理超时时间.单位:分钟
     */
    private int timeout=15;

    /**
     * 模块类型
     */
    private DataTransferModuleTypeEnum moduleType;

    /**
     * 业务类型
     */
    private DataTransferBusinessTypeEnum businessType;

    /**
     * 处理器
     */
    private DataTransferHandler<SOURCE,TARGET> handler;
}

数据迁移处理接口DataTransferHandler

/**
 * 数据迁移处理器
 * @author liJY
 * @Date 2023-06-20
 * @param <SOURCE> 数据来源对象类型
 * @param <TARGET> 数据目标对象类型
 */
public interface DataTransferHandler<SOURCE,TARGET> {
    /**
     * 读取来源处理事项总数(用于首次启动任务场景)
     * @return 总数量
     */
    int readTotal();

    /**
     * 读取来源数据(用于首次启动任务场景)
     * @param batchSize 批次大小
     * @param batchNum 批次序号
     * @return 来源数据列表.返回数量比批次数量少的,则为跳过处理,空:没有待处理数据
     */
    List<DataTransferSource<SOURCE>> read(int batchSize, int batchNum);

    /**
     * 读取来源数据(用于重启任务场景)
     * @param businessIds 业务Id列表
     * @return 来源数据列表.返回数量比入参数量少的,则为跳过处理,空:没有待处理数据
     */
    List<DataTransferSource<SOURCE>> read(List<String> businessIds);

    /**
     * 数据处理
     * @param sources 来源数据列表
     * @return 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
     */
    List<DataTransferResult<TARGET>> process(List<DataTransferSource<SOURCE>> sources);

    /**
     * 写入目标数据
     * @param targets 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
     */
    List<DataTransferResult> write(List<TARGET> targets);
}

数据迁移配置类DataTransferConfig

@Configuration
public class DataTransferConfig {
    private final DataTransferBusinessProperties dataTransferBusinessProperties;
    private final ApplicationContext applicationContext;

    public DataTransferConfig(DataTransferBusinessProperties dataTransferBusinessProperties,ApplicationContext applicationContext){
        this.dataTransferBusinessProperties=dataTransferBusinessProperties;
        this.applicationContext=applicationContext;
    }
    
    /**
     * 实例对象注入Spring IOC容器
     * @return
     */
    @Bean
    public DataTransferJobManager jobManager(){
        DataTransferJobManager manager=new DataTransferJobManagerImpl();
        //合同交付模块Job
        manager.add(this.buildDeliverRegisterJob());
        //停复工模块Job
        manager.add(this.buildWorkResumeJob());
        //停复工模块-复工确认Job,必须跑完停复工申请的Job才能跑复工确认的Job,因为业务数据存在前后时间关联
        manager.add(this.buildResumeConfirmDetailJob());
        return manager;
    }

    /**
     * 构建合同交付数据迁移作业
     * @return 数据迁移作业
     */
    private DataTransferJob buildDeliverRegisterJob(){
        //定义合同交付申请任务执行器
        DataTransferBusinessProperties.TaskConfig applyTaskConfig=dataTransferBusinessProperties.getDeliverRegisterApply();
        DataTransferExecutor applyExecutor=new DataTransferExecutor();
        applyExecutor.setKey("apply");
        applyExecutor.setBatchSize(applyTaskConfig.getBatchSize());
        applyExecutor.setThreadSize(applyTaskConfig.getThreadSize());
        applyExecutor.setTimeout(applyTaskConfig.getTimeout());
        applyExecutor.setModuleType(DataTransferModuleTypeEnum.DELIVER_REGISTER);
        applyExecutor.setBusinessType(DataTransferBusinessTypeEnum.DELIVER_REGISTER_APPLY);
        applyExecutor.setHandler(this.applicationContext.getBean(DeliverRegisterDtApplyHandler.class));

        //定义合同交付申请明细任务执行器
        DataTransferBusinessProperties.TaskConfig detailTaskConfig=dataTransferBusinessProperties.getDeliverRegisterDetail();
        DataTransferExecutor detailExecutor=new DataTransferExecutor();
        detailExecutor.setKey("detail");
        detailExecutor.setBatchSize(detailTaskConfig.getBatchSize());
        detailExecutor.setThreadSize(detailTaskConfig.getThreadSize());
        detailExecutor.setTimeout(detailTaskConfig.getTimeout());
        detailExecutor.setModuleType(DataTransferModuleTypeEnum.DELIVER_REGISTER);
        detailExecutor.setBusinessType(DataTransferBusinessTypeEnum.DELIVER_REGISTER_APPLY_DETAIL);
        detailExecutor.setHandler(this.applicationContext.getBean(DeliverRegisterDtDetailHandler.class));

        DataTransferJob job=new DataTransferJob("deliverRegister");
        job.add(applyExecutor);
        job.add(detailExecutor);
        return job;
    }

    /**
     * 构建停复工数据迁移作业
     * @return 数据迁移作业
     */
    private DataTransferJob buildWorkResumeJob(){
        //定义停工计划申请任务执行器
        DataTransferBusinessProperties.TaskConfig stopPlanApplyTaskConfig=dataTransferBusinessProperties.getWorkResumeStopPlanApply();
        DataTransferExecutor stopPlanApplyExecutor=new DataTransferExecutor();
        stopPlanApplyExecutor.setKey("stopPlanApply");
        stopPlanApplyExecutor.setBatchSize(stopPlanApplyTaskConfig.getBatchSize());
        stopPlanApplyExecutor.setThreadSize(stopPlanApplyTaskConfig.getThreadSize());
        stopPlanApplyExecutor.setTimeout(stopPlanApplyTaskConfig.getTimeout());
        stopPlanApplyExecutor.setModuleType(DataTransferModuleTypeEnum.WORK_RESUME);
        stopPlanApplyExecutor.setBusinessType(DataTransferBusinessTypeEnum.WORK_RESUME_STOP_PLAN_APPLY);
        stopPlanApplyExecutor.setHandler(this.applicationContext.getBean(WorkResumeDtStopPlanApplyHandler.class));
        
        //定义停工计划明细任务执行器
        DataTransferBusinessProperties.TaskConfig resumeDetailTaskConfig=dataTransferBusinessProperties.getWorkResumeStopPlanDetail();
        DataTransferExecutor stopPlanDetailExecutor = new DataTransferExecutor();
        stopPlanDetailExecutor.setKey("stopPlanDetail");
        stopPlanDetailExecutor.setBatchSize(resumeDetailTaskConfig.getBatchSize());
        stopPlanDetailExecutor.setThreadSize(resumeDetailTaskConfig.getThreadSize());
        stopPlanDetailExecutor.setTimeout(resumeDetailTaskConfig.getTimeout());
        stopPlanDetailExecutor.setModuleType(DataTransferModuleTypeEnum.WORK_RESUME);
        stopPlanDetailExecutor.setBusinessType(DataTransferBusinessTypeEnum.WORK_RESUME_STOP_PLAN_DETAIL);
        stopPlanDetailExecutor.setHandler(this.applicationContext.getBean(WorkResumeDtStopPlanDetailHandler.class));
        
        //定义复工确认申请任务执行器
        DataTransferBusinessProperties.TaskConfig confirmApplyTaskConfig=dataTransferBusinessProperties.getWorkResumeConfirmApply();
        DataTransferExecutor confirmApplyExecutor=new DataTransferExecutor();
        confirmApplyExecutor.setKey("confirmApply");
        confirmApplyExecutor.setBatchSize(confirmApplyTaskConfig.getBatchSize());
        confirmApplyExecutor.setThreadSize(confirmApplyTaskConfig.getThreadSize());
        confirmApplyExecutor.setTimeout(confirmApplyTaskConfig.getTimeout());
        confirmApplyExecutor.setModuleType(DataTransferModuleTypeEnum.WORK_RESUME);
        confirmApplyExecutor.setBusinessType(DataTransferBusinessTypeEnum.WORK_RESUME_CONFIRM_APPLY);
        confirmApplyExecutor.setHandler(this.applicationContext.getBean(WorkResumeDtConfirmApplyHandler.class));
        
        DataTransferJob job=new DataTransferJob("workResume");
        job.add(stopPlanApplyExecutor);
        job.add(confirmApplyExecutor);
        job.add(stopPlanDetailExecutor);
        return job;
    }
    
    /**
     * 构建复工确认明细数据迁移作业
     */
    private DataTransferJob buildResumeConfirmDetailJob(){
        //定义停复工确认明细执行器
        DataTransferBusinessProperties.TaskConfig confirmDetailTaskConfig=dataTransferBusinessProperties.getWorkResumeConfirmDetail();
        DataTransferExecutor confirmDetailExecutor = new DataTransferExecutor();
        confirmDetailExecutor.setKey("resumeConfirmDetail");
        confirmDetailExecutor.setBatchSize(confirmDetailTaskConfig.getBatchSize());
        confirmDetailExecutor.setThreadSize(confirmDetailTaskConfig.getThreadSize());
        confirmDetailExecutor.setTimeout(confirmDetailTaskConfig.getTimeout());
        confirmDetailExecutor.setModuleType(DataTransferModuleTypeEnum.WORK_RESUME);
        confirmDetailExecutor.setBusinessType(DataTransferBusinessTypeEnum.WORK_RESUME_CONFIRM_DETAIL);
        confirmDetailExecutor.setHandler(this.applicationContext.getBean(WorkResumeDtConfirmDetailHandler.class));
        DataTransferJob job=new DataTransferJob("resumeConfirmDetail");
        job.add(confirmDetailExecutor);
        return job;
    }
}

数据迁移业务模块属性类DataTransferBusinessProperties

@Component
@Data
@RefreshScope
@ConfigurationProperties(prefix = "data-transfer.business")
public class DataTransferBusinessProperties {
    /**
     * 合同交付申请迁移任务配置
     */
    TaskConfig deliverRegisterApply;

    /**
     * 合同交付申请明迁移任务配置
     */
    TaskConfig deliverRegisterDetail;
    
    /**
     * 停复工_计划停工申请任务配置
     */
    TaskConfig workResumeStopPlanApply;
    
    TaskConfig workResumeStopPlanDetail;

    /**
     * 停复工_复工确认申请任务配置
     */
    TaskConfig workResumeConfirmApply;
    
    TaskConfig workResumeConfirmDetail;

    /**
     * 任务配置
     */
    @Data
    public static class TaskConfig{
        /**
         * 数据处理批次大小
         */
        private Integer batchSize;

        /**
         * 处理线程大小
         */
        private Integer threadSize;

        /**
         * 处理超时时间.单位:分钟
         */
        private Integer timeout;

        /**
         * 来源数据迁移条件
         */
        private DataTransferSourceQuery query;
    }
}

注意增加了注解@RefreshScope,当配置变更时可以在不重启应用的前提下刷新bean中相关的属性值,配置文件bootstrap.properties增加属性配置

#数据迁移配置
##合同交付申请
data-transfer.business.deliver-register-apply.batch-size=500
data-transfer.business.deliver-register-apply.thread-size=10
data-transfer.business.deliver-register-apply.timeout=15
#data-transfer.business.deliver-register-apply.query.begin-time=2023-08-07 10:01:01
#data-transfer.business.deliver-register-apply.query.end-time=2023-08-07 10:01:01
data-transfer.business.deliver-register-apply.query.ids[0]=2271D0F8-3D2D-4561-9DE6-ACCA790A5BF7
##合同交付申请明细
data-transfer.business.deliver-register-detail.batch-size=200
data-transfer.business.deliver-register-detail.thread-size=10
data-transfer.business.deliver-register-detail.timeout=20
#data-transfer.business.deliver-register-detail.query.begin-time=2023-08-07 10:01:01
#data-transfer.business.deliver-register-detail.query.end-time=2023-08-07 10:01:01
data-transfer.business.deliver-register-detail.query.ids[0]=2271D0F8-3D2D-4561-9DE6-ACCA790A5BF7
##停复工_计划停工申请
data-transfer.business.work-resume-stop-plan-apply.batch-size=500
data-transfer.business.work-resume-stop-plan-apply.thread-size=10
data-transfer.business.work-resume-stop-plan-apply.timeout=15
#data-transfer.business.work-resume-stop-plan-apply.query.begin-time=2023-08-07 10:01:01
#data-transfer.business.work-resume-stop-plan-apply.query.end-time=2023-08-07 10:01:01
data-transfer.business.work-resume-stop-plan-apply.query.ids[0]=4DFE0593-2B50-4DDF-A511-B57554D141B4

##停复工_计划停工申请明细
data-transfer.business.work-resume-stop-plan-detail.batch-size=1000
data-transfer.business.work-resume-stop-plan-detail.thread-size=1
data-transfer.business.work-resume-stop-plan-detail.timeout=30
#data-transfer.business.work-resume-stop-plan-detail.query.begin-time=2023-08-07 10:01:01
#data-transfer.business.work-resume-stop-plan-detail.query.end-time=2023-08-07 10:01:01
data-transfer.business.work-resume-stop-plan-detail.query.ids[0]=4DFE0593-2B50-4DDF-A511-B57554D141B4

##停复工_复工确认申请
data-transfer.business.work-resume-confirm-apply.batch-size=500
data-transfer.business.work-resume-confirm-apply.thread-size=10
data-transfer.business.work-resume-confirm-apply.timeout=15
#data-transfer.business.work-resume-confirm-apply.query.begin-time=2023-08-07 10:01:01
#data-transfer.business.work-resume-confirm-apply.query.end-time=2023-08-07 10:01:01
#data-transfer.business.work-resume-confirm-apply.query.ids[0]=83877BBA-573D-4A20-B736-7E244859B627
##停复工_复工确认申请明细
data-transfer.business.work-resume-confirm-detail.batch-size=1000
data-transfer.business.work-resume-confirm-detail.thread-size=20
data-transfer.business.work-resume-confirm-detail.timeout=30
#data-transfer.business.work-resume-confirm-detail.query.begin-time=2023-08-07 10:01:01
#data-transfer.business.work-resume-confirm-detail.query.end-time=2023-08-07 10:01:01
#data-transfer.business.work-resume-confirm-detail.query.ids[0]=5B8E8553-8997-4457-AC7A-94A4172FAF17

迁移条件类DataTransferSourceQuery

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value = "数据迁移_来源数据查询条件对象", description = "数据迁移_来源数据查询条件对象")
public class DataTransferSourceQuery {
    /**
     * 开始时间
     */
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime beginTime;
    /**
     * 结束时间
     */
    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime endTime;
    /**
     * Id列表
     */
    private List<String> ids;
}

数据迁移任务表DataTransferTask

/**
 * <p>
 * 数据迁移任务表
 * </p>
 *
 * @author liJY
 * @Date 2023-06-20
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("t_data_transfer_task")
public class DataTransferTask extends Model<DataTransferTask> {

    private static final long serialVersionUID = 1L;

    /**
     * 主键名称
     */
    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private String id;

    /**
     * 创建人bip
     */
    @TableField("create_user")
    private String createUser;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新人bip
     */
    @TableField("update_user")
    private String updateUser;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;

    /**
     * 租户编码
     */
    @TableField("tenant_id")
    private String tenantId;

    /**
     * 版本号
     */
    @TableField("version_number")
    private String versionNumber;

    /**
     * 备注
     */
    @TableField("remark")
    private String remark;

    /**
     * 作业名称
     */
    @TableField("job_name")
    private String jobName;

    /**
     * 作业Id
     */
    @TableField("job_id")
    private String jobId;

    /**
     * 执行器键名标识
     */
    @TableField("executor_key")
    private String executorKey;

    /**
     * 来源任务Id
     */
    @TableField("source_id")
    private String sourceId;

    /**
     * 批次大小
     */
    @TableField("batch_size")
    private Integer batchSize;

    /**
     * 批次总量
     */
    @TableField("batch_count")
    private Integer batchCount;

    /**
     * 开始时间
     */
    @TableField("start_time")
    private LocalDateTime startTime;

    /**
     * 结束时间
     */
    @TableField("end_time")
    private LocalDateTime endTime;

    /**
     * 模块类型
     */
    @TableField("module_type")
    private DataTransferModuleTypeEnum moduleType;

    /**
     * 业务类型
     */
    @TableField("business_type")
    private DataTransferBusinessTypeEnum businessType;

    /**
     * 状态
     */
    @TableField("status")
    private DataTransferTaskStatusEnum status;

    /**
     * 总数据量
     */
    @TableField("total")
    private Integer total;

    /**
     * 成功事项数量
     */
    @TableField("success")
    private Integer success;

    /**
     * 失败事项数量
     */
    @TableField("fail")
    private Integer fail;

    /**
     * 跳过事项数量
     */
    @TableField("skip")
    private Integer skip;

    @Override
    protected Serializable pkVal() {
        return this.id;
    }

}
  • 枚举类

    /**
     * 数据迁移模块类型枚举
     *
     * @author liJY
     */
      
    public enum DataTransferModuleTypeEnum implements BaseEnum<Integer> {
        /**
         * 合同交付
         */
        DELIVER_REGISTER(1,"合同交付"),
        /**
         * 停复工
         */
        WORK_RESUME(2, "停复工"),
        ;
      
      
        private final Integer value;
      
        private final String displayName;
      
        DataTransferModuleTypeEnum(Integer value, String displayName) {
            this.value = value;
            this.displayName = displayName;
        }
      
        /**
         * 枚举value
         *
         * @return
         */
        @Override
        public Integer getValue() {
            return value;
        }
      
        /**
         * 枚举的显示名字
         *
         * @return
         */
        @Override
        public String getDisplayName() {
            return displayName;
        }
    }
      
    /**
     * 数据迁移事项业务类型枚举
     *
     * @author liJY
     */
      
    public enum DataTransferBusinessTypeEnum implements BaseEnum<Integer> {
        /**
         * 合同交付_申请
         */
        DELIVER_REGISTER_APPLY(1,"合同交付_申请"),
        /**
         * 合同交付_申请明细
         */
        DELIVER_REGISTER_APPLY_DETAIL(2, "合同交付_申请明细"),
        /**
         * 停复工_停工计划申请
         */
        WORK_RESUME_STOP_PLAN_APPLY(3,"停复工_停工计划申请"),
        /**
         * 停复工_复工确认申请
         */
        WORK_RESUME_CONFIRM_APPLY(4,"停复工_复工确认申请"),
        /**
         * 停复工_停工计划明细
         */
        WORK_RESUME_STOP_PLAN_DETAIL(5,"停复工_停工计划明细"),
        /**
         * 停复工_复工确认明细
         */
        WORK_RESUME_CONFIRM_DETAIL(6,"停复工_复工确认明细"),
        ;
      
        private final Integer value;
      
        private final String displayName;
      
        DataTransferBusinessTypeEnum(Integer value, String displayName) {
            this.value = value;
            this.displayName = displayName;
        }
      
        /**
         * 枚举value
         *
         * @return
         */
        @Override
        public Integer getValue() {
            return value;
        }
      
        /**
         * 枚举的显示名字
         *
         * @return
         */
        @Override
        public String getDisplayName() {
            return displayName;
        }
    }
      
    /**
     * 数据迁移任务状态枚举
     *
     * @author liJY
     */
      
    public enum DataTransferTaskStatusEnum implements BaseEnum<Integer> {
        /**
         * 进行中
         */
        PROGRESS(1,"进行中"),
        /**
         * 完成
         */
        FINISH(2, "完成");
      
        private final Integer value;
      
        private final String displayName;
      
        DataTransferTaskStatusEnum(Integer value, String displayName) {
            this.value = value;
            this.displayName = displayName;
        }
      
        /**
         * 枚举value
         *
         * @return
         */
        @Override
        public Integer getValue() {
            return value;
        }
      
        /**
         * 枚举的显示名字
         *
         * @return
         */
        @Override
        public String getDisplayName() {
            return displayName;
        }
    }
    
  • Service接口DataTransferTaskService

    ORM框架使用MybatisPlus

    public interface DataTransferTaskService extends IService<DataTransferTask> {
        /**
         * 添加新任务
         * @param task 任务信息
         * @param userAccount 用户帐号
         * @return 任务Id
         */
        String add(DataTransferTask task, String userAccount);
      
        /**
         * 递增事项状态数量
         * @param taskId 任务Id
         * @param success 递增成功数量.0:不作改变
         * @param fail 递增失败数量.0:不作改变
         * @param skip 递增跳过数量.0:不作改变
         * @return 是、否已完成任务
         */
        boolean incrItemCount(String taskId,int success,int fail,int skip);
      
        /**
         * 获取作业下可重启的任务列表
         * @param jobId 作业Id
         * @return 任务列表
         */
        List<DataTransferTask> getRestartTasks(String jobId);
    }
    

    服务接口实现类DataTransferTaskServiceImpl

    @RequiredArgsConstructor(onConstructor_={@Autowired})
    @Slf4j
    @Service
    public class DataTransferTaskServiceImpl extends ServiceImpl<DataTransferTaskMapper, DataTransferTask> implements DataTransferTaskService {
        /**
         * 添加新任务
         * @param task 任务信息
         * @param userAccount 用户帐号
         * @return 任务Id
         */
        @Override
        public String add(DataTransferTask task, String userAccount){
            //保存前填充信息
            LocalDateTime now=LocalDateTime.now();
            task.setCreateUser(userAccount);
            task.setCreateTime(now);
            task.setStartTime(now);
            if(task.getTotal()==0){
                //总数据量为0,表示完成
                task.setStatus(DataTransferTaskStatusEnum.FINISH);
                task.setEndTime(now);
            }else{
                task.setStatus(DataTransferTaskStatusEnum.PROGRESS);
            }
            task.setSuccess(0);
            task.setFail(0);
            task.setSkip(0);
            baseMapper.insert(task);
      
            return task.getId();
        }
      
        /**
         * 递增事项状态数量
         * @param taskId 任务Id
         * @param success 递增成功数量.0:不作改变
         * @param fail 递增失败数量.0:不作改变
         * @param skip 递增跳过数量.0:不作改变
         * @return 是、否已完成任务
         */
        @Override
        public boolean incrItemCount(String taskId,int success,int fail,int skip){
            //递增成功数量
            this.incrSuccess(taskId,success);
            //递增失败数量
            this.incrFail(taskId,fail);
            //递增跳过数量
            this.incrSkip(taskId,skip);
      
            //检查任务是否已完成
            int count=success+fail+skip;
            if(count<=0){
                //数量没发生变化,并未改变完成状态,因此返回'未完成任务'
                return false;
            }
            return this.checkFinish(taskId);
        }
      
         /**
         * 获取作业下可重启的任务列表
         * @param jobId 作业Id
         * @return 任务列表
         */
        @Override
        public List<DataTransferTask> getRestartTasks(String jobId){
            //只对存在:事项主体类型为'数据',且事项状态为'异常'的任务,用任务状态为'完成'的任务执行重启
            return baseMapper.getRestartTasks(jobId, DataTransferTaskStatusEnum.FINISH,DeletedEnum.NOT_DELETE, DataTransferSubjectTypeEnum.DATA
                    , Arrays.asList(DataTransferItemStatusEnum.BUSINESS_ERROR,DataTransferItemStatusEnum.SYSTEM_ERROR));
        }
          
        /**
         * 递增成功数量
         * @param taskId 任务Id
         * @param value 递增值
         */
        private void incrSuccess(String taskId,int value){
            if(value<=0){
                return;
            }
            int row= baseMapper.incrSuccess(taskId,value);
            if(row==0){
                log.warn("更新成功数量失败,taskId:{}",taskId);
            }
        }
      
        /**
         * 递增失败数量
         * @param taskId 任务Id
         * @param value 递增值
         */
        private void incrFail(String taskId,int value){
            if(value<=0){
                return;
            }
            int row= baseMapper.incrFail(taskId,value);
            if(row==0){
                log.warn("更新失败数量失败,taskId:{}",taskId);
            }
        }
      
        /**
         * 递增跳过数量
         * @param taskId 任务Id
         * @param value 递增值
         */
        private void incrSkip(String taskId,int value){
            if(value<=0){
                return;
            }
            int row= baseMapper.incrSkip(taskId,value);
            if(row==0){
                log.warn("更新跳过数量失败,taskId:{}",taskId);
            }
        }
      
        /**
         * 检查完成状态
         * @param taskId 任务Id
         * @return  是、否任务已完成
         */
        private boolean checkFinish(String taskId){
            DataTransferTask task=super.getById(taskId);
            if(task==null){
                log.warn("获取任务信息失败,taskId:{}",taskId);
                return false;
            }
      
            int currentTotal=task.getSuccess()+task.getFail()+task.getSkip();
            //成功+失败+跳过数量>=总数量,则表示处理完成
            if(currentTotal>=task.getTotal()){
                //只允许更新状态为'进行中'的任务为'完成'
                LambdaUpdateWrapper<DataTransferTask> updateWrapper=new LambdaUpdateWrapper<>();
                updateWrapper.eq(DataTransferTask::getId,taskId);
                updateWrapper.eq(DataTransferTask::getStatus,DataTransferTaskStatusEnum.PROGRESS);
      
                DataTransferTask updateTask=new DataTransferTask();
                updateTask.setStatus(DataTransferTaskStatusEnum.FINISH);
                updateTask.setEndTime(LocalDateTime.now());
                int row=baseMapper.update(updateTask,updateWrapper);
                log.info("任务状态更新影响行数,taskId:{},row:{}",taskId,row);
                if(row>0){
                    return true;
                }
            }
            return false;
        }
    }
    
  • DAO接口DataTransferTaskMapper

    public interface DataTransferTaskMapper extends BaseMapper<DataTransferTask> {
        /**
         * 递增成功数量
         * @param taskId 任务Id
         * @param value 递增值
         * @return 影响行数
         */
        int incrSuccess(@Param("taskId") String taskId, @Param("value")int value);
      
        /**
         * 递增失败数量
         * @param taskId 任务Id
         * @param value 递增值
         * @return 影响行数
         */
        int incrFail(@Param("taskId")String taskId,@Param("value")int value);
      
        /**
         * 递增跳过数量
         * @param taskId 任务Id
         * @param value 递增值
         * @return 影响行数
         */
        int incrSkip(@Param("taskId")String taskId,@Param("value")int value);
      
        /**
         * 获取可重启的任务列表
         * @param jobId 作业Id
         * @param taskStatus 任务状态
         * @param deleteTag 删除标识
         * @param subjectType 主体类型
         * @param status 事项状态列表
         * @return 任务列表
         */
        List<DataTransferTask> getRestartTasks(@Param("jobId") String jobId, @Param("taskStatus") DataTransferTaskStatusEnum taskStatus, @Param("deleteTag") DeletedEnum deleteTag, @Param("subjectType")DataTransferSubjectTypeEnum subjectType, @Param("status")List<DataTransferItemStatusEnum> status);
    }
    

    DataTransferTaskMapper.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
    <mapper namespace="com.lightning.channel.ww.ods.mapper.self.DataTransferTaskMapper">
        <!-- 通用查询映射结果 -->
        <resultMap id="BaseResultMap" type="com.lightning.channel.ww.ods.entity.DataTransferTask">
            <id column="id" property="id" />
            <result column="create_user" property="createUser" />
            <result column="create_time" property="createTime" />
            <result column="update_user" property="updateUser" />
            <result column="update_time" property="updateTime" />
            <result column="tenant_id" property="tenantId" />
            <result column="version_number" property="versionNumber" />
            <result column="remark" property="remark" />
            <result column="job_name" property="jobName" />
            <result column="job_id" property="jobId" />
            <result column="executor_key" property="executorKey" />
            <result column="source_id" property="sourceId" />
            <result column="batch_size" property="batchSize" />
            <result column="batch_count" property="batchCount" />
            <result column="start_time" property="startTime" />
            <result column="end_time" property="endTime" />
            <result column="module_type" property="moduleType" />
            <result column="business_type" property="businessType" />
            <result column="status" property="status" />
            <result column="total" property="total" />
            <result column="success" property="success" />
            <result column="fail" property="fail" />
            <result column="skip" property="skip" />
        </resultMap>
      
        <update id="incrSuccess">
            update t_data_transfer_task set success=success+#{value,jdbcType=INTEGER} where id=#{taskId,jdbcType=VARCHAR}
        </update>
      
        <update id="incrFail">
            update t_data_transfer_task set fail=fail+#{value,jdbcType=INTEGER} where id=#{taskId,jdbcType=VARCHAR}
        </update>
      
        <update id="incrSkip">
            update t_data_transfer_task set skip=skip+#{value,jdbcType=INTEGER} where id=#{taskId,jdbcType=VARCHAR}
        </update>
      
        <select id="getRestartTasks" resultMap="BaseResultMap">
            select t.*
            from t_data_transfer_task t
            where t.job_id=#{jobId,jdbcType=VARCHAR}
            and t.status=#{taskStatus,jdbcType=VARCHAR}
            and exists(
                select 1 from t_data_transfer_item i where t.id=i.task_id
                and i.delete_tag=#{deleteTag,jdbcType=VARCHAR}
                and i.subject_type=#{subjectType,jdbcType=VARCHAR}
                and i.status in
                <foreach item="statusItem" index="index" collection="status" open="(" separator="," close=")">
                    #{statusItem,jdbcType=VARCHAR}
                </foreach>
            )
        </select>
    </mapper>
    

数据迁移事项表DataTransferItem

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("t_data_transfer_item")
public class DataTransferItem extends Model<DataTransferItem> {

    private static final long serialVersionUID = 1L;

    /**
     * 主键名称
     */
    @TableId(value = "id", type = IdType.ASSIGN_ID)
    private String id;

    /**
     * 删除标识
     */
    @TableField("delete_tag")
    private DeletedEnum deleteTag;

    /**
     * 创建人bip
     */
    @TableField("create_user")
    private String createUser;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新人bip
     */
    @TableField("update_user")
    private String updateUser;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;

    /**
     * 租户编码
     */
    @TableField("tenant_id")
    private String tenantId;

    /**
     * 版本号
     */
    @TableField("version_number")
    private String versionNumber;

    /**
     * 备注
     */
    @TableField("remark")
    private String remark;

    /**
     * 任务Id
     */
    @TableField("task_id")
    private String taskId;

    /**
     * 主体Id.数据:业务Id,批次:批次序号
     */
    @TableField("subject_id")
    private String subjectId;

    /**
     * 主体类型
     */
    @TableField("subject_type")
    private DataTransferSubjectTypeEnum subjectType;

    /**
     * 状态
     */
    @TableField("status")
    private DataTransferItemStatusEnum status;

    /**
     * 数量
     */
    @TableField("number")
    private Integer number;

    /**
     * 阶段
     */
    @TableField("stage")
    private DataTransferItemStageEnum stage;

    /**
     * 追踪Id
     */
    @TableField("trace_id")
    private String traceId;

    /**
     * 错误信息
     */
    @TableField("error")
    private String error;

    @Override
    protected Serializable pkVal() {
        return this.id;
    }
}
  • Service接口 DataTransferItemService

    public interface DataTransferItemService extends IService<DataTransferItem> {
        /**
         * 批量新增(用于新增任务后,保存事项)
         * @param taskId 任务Id
         * @param subjectType 主体类型
         * @param stage 阶段
         * @param items 事项列表
         * @param userAccount 用户帐号
         */
        void batchAdd(String taskId, DataTransferSubjectTypeEnum subjectType, DataTransferItemStageEnum stage, List<DataTransferItem> items, String userAccount);
      
        /**
         * 获取任务下的重启事项数量
         * @param taskId 任务Id
         * @return 事项数量
         */
        int getRestartItemCount(String taskId);
      
        /**
         * 获取重启迁移事项列表
         * @param taskId 任务Id
         * @param batchSize 批次大小
         * @param batchNum 批次序号
         * @return 迁移事项列表
         */
        List<DataTransferItem> listRestartItems(String taskId,int batchSize,int batchNum);
    }
    

    服务接口实现类 DataTransferItemServiceImpl

    @RequiredArgsConstructor(onConstructor_={@Autowired})
    @Slf4j
    @Service
    public class DataTransferItemServiceImpl extends ServiceImpl<DataTransferItemMapper, DataTransferItem> implements DataTransferItemService {
        /**
         * 批量新增(用于新增任务后,保存事项)
         * @param taskId 任务Id
         * @param subjectType 主体类型
         * @param stage 阶段
         * @param items 事项列表
         * @param userAccount 用户帐号
         */
        @Override
        public void batchAdd(String taskId, DataTransferSubjectTypeEnum subjectType, DataTransferItemStageEnum stage,List<DataTransferItem> items, String userAccount){
            if(CollectionUtils.isEmpty(items)){
                log.warn("保存的事项列表为空,退出处时.taskId:{},stage:{}",taskId,stage);
                return;
            }
      
            //保存前填充信息
            LocalDateTime now=LocalDateTime.now();
            items.stream().forEach(item->{
                item.setDeleteTag(DeletedEnum.NOT_DELETE);
                item.setCreateUser(userAccount);
                item.setCreateTime(now);
                item.setTaskId(taskId);
                item.setSubjectType(subjectType);
                item.setStage(stage);
            });
      
            super.saveBatch(items);
        }
      
        /**
         * 获取任务下的重启事项数量
         * @param taskId 任务Id
         * @return 事项数量
         */
        @Override
        public int getRestartItemCount(String taskId){
            LambdaQueryWrapper<DataTransferItem> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(DataTransferItem::getTaskId,taskId);
            queryWrapper.eq(DataTransferItem::getDeleteTag,DeletedEnum.NOT_DELETE);
            //主体类型为'数据'
            queryWrapper.eq(DataTransferItem::getSubjectType,DataTransferSubjectTypeEnum.DATA);
            //状态为'异常'
            queryWrapper.in(DataTransferItem::getStatus,Arrays.asList(DataTransferItemStatusEnum.BUSINESS_ERROR,DataTransferItemStatusEnum.SYSTEM_ERROR));
            return baseMapper.selectCount(queryWrapper);
        }
      
        /**
         * 获取重启迁移事项列表
         * @param taskId 任务Id
         * @param batchSize 批次大小
         * @param batchNum 批次序号
         * @return 迁移事项列表
         */
        @Override
        public List<DataTransferItem> listRestartItems(String taskId,int batchSize,int batchNum){
            LambdaQueryWrapper<DataTransferItem> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(DataTransferItem::getTaskId,taskId);
            queryWrapper.eq(DataTransferItem::getDeleteTag,DeletedEnum.NOT_DELETE);
            //只查询主体类型为'数据'
            queryWrapper.eq(DataTransferItem::getSubjectType,DataTransferSubjectTypeEnum.DATA);
            //只处理状态为'异常'的事项
            queryWrapper.in(DataTransferItem::getStatus,Arrays.asList(DataTransferItemStatusEnum.BUSINESS_ERROR,DataTransferItemStatusEnum.SYSTEM_ERROR));
      
            queryWrapper.orderByAsc(DataTransferItem::getCreateTime).orderByAsc(DataTransferItem::getSubjectId);
      
            IPage<DataTransferItem> page = new Page<>(batchNum, batchSize, false);
            IPage<DataTransferItem> pageResult= baseMapper.selectPage(page,queryWrapper);
            return pageResult.getRecords();
        }
    }
    
  • DAO接口

    public interface DataTransferItemMapper extends BaseMapper<DataTransferItem> {
      
    }
    

start源码分析

上面 DataTransferConfig配置类,应用启动时就创建了DataTransferJobManager对象注入Sping IOC容器归Spring管理,并添加了合同交付、停复工两个业务模块的数据迁移作业对象,来到控制层DataTransferController的start方法启动作业

dataTransferJobManager.start(jobName,loginUser.getLoginName());

方法start

    @Override
    public void start(String jobName,String userAccount) {
        //1.获取作业任务执行器映射
        DataTransferJob job=jobMap.get(jobName);
        if(job==null){
            log.warn("缺少作业信息,jobName:{}",jobName);
            throw new SystemRuntimeException("缺少作业信息");
        }
        //结构:Map<执行器键名标识,DataTransferExecutor>
        Map<String,DataTransferExecutor> executorMap=job.getExecutorMap();

        //2.每一次运行,则属不同的作业实例,生成作业id
        String jobId=idGenerator.nextId("");

        //3.根据执行器创建任务实例
        executorMap.forEach((executorKey,executor)->{
            //通过处理器,获取数据总量
            if(executor.getHandler()==null){
                log.warn("执行器缺少处理器,executorKey:{}",executor.getKey());
                throw new SystemRuntimeException("执行器缺少处理器");
            }
            // 查询任务迁移总数据量
            int total=executor.getHandler().readTotal();

            //创建启动模式任务实例.新启动任务时,来源任务Id为'空'
            DataTransferTaskInstance taskInstance=this.createTask(jobName,jobId,"",executor,DataTransferStartModeEnum.START,total,userAccount);
            if(taskInstance.getTotal()>0){
                //启动任务
                this.startTask(taskInstance);
            }
        });
    }

指定作业的每个执行器分别创建一个任务实例,写入到表 t_data_transfer_task

任务实例类DataTransferTaskInstance

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value="DataTransferTaskInstance对象", description="数据迁移任务实例 对象")
public class DataTransferTaskInstance {
    /**
     * 作业名称
     */
    private String jobName;

    /**
     * 作业Id
     */
    private String jobId;

    /**
     * 执行器键名标识
     */
    private String executorKey;

    /**
     * 任务Id
     */
    private String taskId;

    /**
     * 来源任务Id
     */
    private String sourceId;

    /**
     * 数据处理批次大小
     */
    private int batchSize;

    /**
     * 数据处理批次总量
     */
    private int batchCount;

    /**
     * 总数据量
     */
    private int total;

    /**
     * 处理线程大小
     */
    private int threadSize;

    /**
     * 处理超时时间.单位:分钟
     */
    private int timeout;

    /**
     * 模块类型
     */
    private DataTransferModuleTypeEnum moduleType;

    /**
     * 业务类型
     */
    private DataTransferBusinessTypeEnum businessType;

    /**
     * 启动模式
     */
    private DataTransferStartModeEnum model;

}

任务数据量大于0,启动迁移任务

if(taskInstance.getTotal()>0){
    //启动任务
    this.startTask(taskInstance);
}

方法this.startTask在上面的类DataTransferJobManagerImpl

/**
     * 运行任务实例
     * @param taskInstance 任务实例
     */
    private void startTask(DataTransferTaskInstance taskInstance){
        //根据处理的批次总量,按批次发送MQ消息
        List<Message<DataTransferBatch>> messages = new ArrayList<>();

        for(int batchNum=1;batchNum<=taskInstance.getBatchCount();batchNum++){
            //创建批次信息
            DataTransferBatch batch=new DataTransferBatch();
            batch.setJobName(taskInstance.getJobName());
            batch.setExecutorKey(taskInstance.getExecutorKey());
            batch.setTaskId(taskInstance.getTaskId());
            batch.setSourceId(taskInstance.getSourceId());
            batch.setBatchSize(taskInstance.getBatchSize());
            batch.setBatchCount(taskInstance.getBatchCount());
            batch.setBatchNum(batchNum);
            batch.setTotal(taskInstance.getTotal());
            batch.setModel(taskInstance.getModel());
            //获取消息Key
            String msgKey=this.getMsgKey(batch.getExecutorKey(),batchNum);
            messages.add(MessageBuilder.withPayload(batch).setHeader("KEYS",msgKey).build());

            //判断是否需执行消息发送
            boolean isSend=this.isSendMessage(messages.size(),mqBatchMessageSize,batchNum,taskInstance.getBatchCount());
            if(isSend){
                //获取消息目的地
                String msgDest=MessageFormat.format("{0}:{1}",this.getMsgTopic(mqTopicPrefix,batch.getJobName()),this.getMsgTag(batch.getExecutorKey()));
                //发送消息
                SendResult sendRet=rocketMQTemplate.syncSend(msgDest,messages,10*1000);
                if(!SendStatus.SEND_OK.equals(sendRet.getSendStatus())){
                    log.warn("数据迁移消息发送失败,batch:{},sendRet:{}",batch,sendRet);
                    throw new SystemRuntimeException("消息发送失败");
                }
                //消息发送完毕,则清除
                messages.clear();
            }
        }
    }

数据迁移批次消息类DataTransferBatch

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value="DataTransferBatch对象", description="数据迁移任务批次 对象")
public class DataTransferBatch implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 作业名称
     */
    private String jobName;

    /**
     * 执行器键名标识
     */
    private String executorKey;

    /**
     * 任务Id
     */
    private String taskId;

    /**
     * 来源任务Id
     */
    private String sourceId;

    /**
     * 数据处理批次大小
     */
    private int batchSize;

    /**
     * 数据处理批次总量
     */
    private int batchCount;

    /**
     * 数据处理批次序号
     */
    private int batchNum;

    /**
     * 总数据量
     */
    private int total;

    /**
     * 启动模式
     */
    private DataTransferStartModeEnum model;
}

枚举类 DataTransferStartModeEnum

public enum DataTransferStartModeEnum implements BaseEnum<Integer> {
    /**
     * 启动
     */
    START(1,"启动","dataTransferBatchStartHandler"),
    /**
     * 重启
     */
    RESTART(2, "重启","dataTransferBatchRestartHandler");
    private final Integer value;

    private final String displayName;

    private final String beanName;

    DataTransferStartModeEnum(Integer value, String displayName,String beanName) {
        this.value = value;
        this.displayName = displayName;
        this.beanName=beanName;
    }

    /**
     * 枚举value
     *
     * @return
     */
    @Override
    public Integer getValue() {
        return value;
    }

    /**
     * 枚举的显示名字
     *
     * @return
     */
    @Override
    public String getDisplayName() {
        return displayName;
    }

    /**
     * Bean名称
     *
     * @return
     */
    public String getBeanName() {
        return beanName;
    }
}

一个数据迁移批次生成一个MQ消息,然后MQ消息被批量发送到RocketMQ Broker,迁移任务启动成功,下面看迁移任务消息的消费

2、消费数据迁移批次消息

消费者在应用启动的时候就会注册到RocketMQ,看 DataTransferJobManagerImpl 的 方法initJobs有@PostConstruct注解,初始化自定义Job会给Job里的每个业务执行器都注册一个消费者到RocketMQ,看 DataTransferJobManagerImpl 的 方法initMqConsumer,执行单批次消息数据迁移的代码片段

//执行单批次数据迁移
try {
    log.info("执行单批次数据迁移开始.jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
    DataTransferBatchHandler batchHandle = (DataTransferBatchHandler) applicationContext.getBean(batch.getModel().getBeanName());
    batchHandle.execute(batch);
    log.info("执行单批次数据迁移完成.jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
}catch (Exception e){
    log.error(MessageFormat.format("执行单批次数据迁移失败.jobName:{0},taskId:{1},batchNum:{2}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum()),e);
}

单批次消息处理接口DataTransferBatchHander

public interface DataTransferBatchHandler {
    /**
     * 执行单批次数据迁移
     * @param batch 批次信息
     */
    void execute(DataTransferBatch batch);
}

接口实现类根据场景实现有两个

  • 启动 DataTransferBatchStartHandlerImpl

    @RequiredArgsConstructor(onConstructor_={@Autowired})
    @Slf4j
    @Component(value="dataTransferBatchStartHandler")
    public class DataTransferBatchStartHandlerImpl extends DataTransferBatchHandlerSuper implements DataTransferBatchHandler {
        @Autowired
        public void setJobManager(DataTransferJobManager jobManager) {
            this.jobManager = jobManager;
        }
      
        @Autowired
        public void setDataTransferTaskService(DataTransferTaskService dataTransferTaskService) {
            this.dataTransferTaskService = dataTransferTaskService;
        }
      
        @Autowired
        public void setDataTransferItemService(DataTransferItemService dataTransferItemService) {
            this.dataTransferItemService = dataTransferItemService;
        }
      
        @Autowired
        public void setIdGenerator(IdGenerator idGenerator) {
            this.idGenerator = idGenerator;
        }
      
        @Autowired
        public void setApplicationContext(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }
      
        /**
         * 执行单批次数据迁移
         * @param batch 批次信息
         */
        @Override
        public void execute(DataTransferBatch batch){
            //处理器
            DataTransferHandler handler=null;
            //来源数据
            List<DataTransferSource> sources=null;
            //处理成功列表
            List<DataTransferResult> processSuccess;
      
            //1.获取处理器
            try{
                //获取执行器
                DataTransferExecutor executor=this.jobManager.getExecutor(batch.getJobName(),batch.getExecutorKey());
                handler=executor.getHandler();
            }catch (Exception e){
                //整批次处理失败
                this.applicationContext.getBean(this.getClass()).batchHandleFail(batch,"获取执行器", DataTransferItemStageEnum.BATCH_BEFORE,e);
                return;
            }
      
            //2.读取来源数据
            try{
                sources=handler.read(batch.getBatchSize(),batch.getBatchNum());
                //对来源数据没返回的数据,生成单批次的跳过迁移事项
                this.applicationContext.getBean(this.getClass()).batchHandleSkip(batch,sources.size(),DataTransferItemStageEnum.READ);
      
                if(CollectionUtils.isEmpty(sources)){
                    //来源数据为空,同不再执行后续的迁移操作
                    log.info("来源数据为空,不再执行后续阶段的迁移操作.taskId:{},batchNum:{}",batch.getTaskId(),batch.getBatchNum());
                    return;
                }
            }catch (Exception e){
                //整批次处理失败,则生成单批次的失败迁移事项
                this.applicationContext.getBean(this.getClass()).batchHandleFail(batch,"读取来源数据",DataTransferItemStageEnum.READ,e);
                return;
            }
      
            //3.数据处理
            processSuccess=this.executeBatchProcess(batch,handler,sources);
            if(CollectionUtils.isEmpty(processSuccess)){
                //没有成功处理结果,则不需执行后续的迁移操作,退出处理
                return;
            }
      
            //4.写入目标数据
            this.executeBatchWrite(batch,handler,processSuccess);
        }
    }
    
  • 重启 DataTransferBatchRestartHandlerImpl

    @RequiredArgsConstructor(onConstructor_={@Autowired})
    @Slf4j
    @Component(value="dataTransferBatchRestartHandler")
    public class DataTransferBatchRestartHandlerImpl extends DataTransferBatchHandlerSuper implements DataTransferBatchHandler {
        @Autowired
        public void setJobManager(DataTransferJobManager jobManager) {
            this.jobManager = jobManager;
        }
      
        @Autowired
        public void setDataTransferTaskService(DataTransferTaskService dataTransferTaskService) {
            this.dataTransferTaskService = dataTransferTaskService;
        }
      
        @Autowired
        public void setDataTransferItemService(DataTransferItemService dataTransferItemService) {
            this.dataTransferItemService = dataTransferItemService;
        }
      
        @Autowired
        public void setIdGenerator(IdGenerator idGenerator) {
            this.idGenerator = idGenerator;
        }
      
        @Autowired
        public void setApplicationContext(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }
      
        /**
         * 执行单批次数据迁移
         * @param batch 批次信息
         */
        @Override
        public void execute(DataTransferBatch batch){
            //当前批次处理的事项列表
            List<DataTransferItem> batchItems=null;
            //处理器
            DataTransferHandler handler=null;
            //来源数据
            List<DataTransferSource> sources=null;
            //处理成功列表
            List<DataTransferResult> processSuccess;
      
            //1.通过被重启的任务Id,获取当前批次的处理事项(重启任务时,被sourceId为被重启的任务Id)
            batchItems=this.dataTransferItemService.listRestartItems(batch.getSourceId(),batch.getBatchSize(),batch.getBatchNum());
            //对获取的处理事项列表,生成单个批次主体的跳过处理事项
            this.applicationContext.getBean(this.getClass()).batchHandleSkip(batch,batchItems.size(),DataTransferItemStageEnum.BATCH_BEFORE);
            if(CollectionUtils.isEmpty(batchItems)){
                //批次处理事项为空,则不再执行后续迁移操作
                log.info("处理事项为空,不再执行后续阶段的迁移操作.taskId:{},batchNum:{}",batch.getTaskId(),batch.getBatchNum());
                return;
            }
      
            //2.获取处理器
            try{
                //获取执行器
                DataTransferExecutor executor=this.jobManager.getExecutor(batch.getJobName(),batch.getExecutorKey());
                handler=executor.getHandler();
            }catch (Exception e){
                //整批次处理失败(业务Id为:当前批次的事项主体Id)
                List<String> businessIds=batchItems.stream().map(DataTransferItem::getSubjectId).collect(Collectors.toList());
                this.applicationContext.getBean(this.getClass()).batchDataHandleFail(batch,"获取执行器",DataTransferItemStageEnum.BATCH_BEFORE,businessIds,e);
                return;
            }
      
            //3.读取来源数据
            try{
                //根据迁移事项主体Id,作为业务Id,读取来源数据
                List<String> businessIds=batchItems.stream().map(DataTransferItem::getSubjectId).collect(Collectors.toList());
                sources=handler.read(businessIds);
                //对来源数据不返回的数据,生成跳过处理迁移事项
                this.applicationContext.getBean(this.getClass()).sourceDataHandleSkip(batch,batchItems,sources);
      
                if(CollectionUtils.isEmpty(sources)){
                    //来源数据为空,则不再执行后续的操作
                    log.info("来源数据为空,不再执行后续阶段的迁移操作.taskId:{},batchNum:{}",batch.getTaskId(),batch.getBatchNum());
                    return;
                }
            }catch (Exception e){
                //整批次处理失败(业务Id为:当前批次的事项主体Id)
                List<String> businessIds=batchItems.stream().map(DataTransferItem::getSubjectId).collect(Collectors.toList());
                this.applicationContext.getBean(this.getClass()).batchDataHandleFail(batch,"读取来源数据",DataTransferItemStageEnum.READ,businessIds,e);
                return;
            }
      
            //3.数据处理
            processSuccess=this.applicationContext.getBean(this.getClass()).executeBatchProcess(batch,handler,sources);
            if(CollectionUtils.isEmpty(processSuccess)){
                //没有成功处理结果,则不需执行后续的迁移操作,退出处理
                return;
            }
      
            //4.写入目标数据
            this.applicationContext.getBean(this.getClass()).executeBatchWrite(batch,handler,processSuccess);
        }
      
        /**
         * 对来源数据不返回的数据,生成跳过处理迁移事项
         * @param batch 批次信息
         * @param batchItems 当前批次迁移事项列表
         * @param sources 来源数据列表
         */
        @Transactional(rollbackFor = RuntimeException.class,transactionManager = "selfTransactionManager")
        public void sourceDataHandleSkip(DataTransferBatch batch,List<DataTransferItem> batchItems,List<DataTransferSource> sources){
            //获取迁移事项主体Id,作为源业务Id
            List<String> batchItemSubjectIds=batchItems.stream().map(DataTransferItem::getSubjectId).collect(Collectors.toList());
            //获取数据来源的业务Id,作为处理结果业务Id
            List<String> sourceBusinessIds=sources.stream().map(DataTransferSource::getBusinessId).collect(Collectors.toList());
      
            //筛选出来源数据中缺少的业务Id,作为跳过处理的事项
            List<DataTransferItem> skipItems=this.getSkipItems(batchItemSubjectIds,sourceBusinessIds);
      
            //保存跳过处理事项.主体类型为'数据'
            if(CollectionUtils.isEmpty(skipItems)){
                //没有跳过处理的事项,则退出处理
                log.info("不存在跳过处理事项,退出处理.jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
                return;
            }
            this.dataTransferItemService.batchAdd(batch.getTaskId(),DataTransferSubjectTypeEnum.DATA,DataTransferItemStageEnum.READ,skipItems,"");
      
            //更新任务跳过处理数量,并检测任务完成状态
            boolean isFinish=this.dataTransferTaskService.incrItemCount(batch.getTaskId(),0,0,skipItems.size());
            if(isFinish){
                log.info("数据迁移任务已完成,jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
            }
        }
    }
    

单批次消息处理父类DataTransferBatchHandlerSuper

@Slf4j
public class DataTransferBatchHandlerSuper {
    protected DataTransferJobManager jobManager;
    protected DataTransferTaskService dataTransferTaskService;
    protected DataTransferItemService dataTransferItemService;
    protected IdGenerator idGenerator;
    protected ApplicationContext applicationContext;

    /**
     * 执行单批次数据处理
     * @param batch 批次信息
     * @param handler 处理器
     * @param sources 来源数据列表
     * @return 处理成功结果列表.null:没有成功处理结果或失败,不需执行后续迁移操作
     */
    public List<DataTransferResult> executeBatchProcess(DataTransferBatch batch,DataTransferHandler handler,List<DataTransferSource> sources){
        //处理成功结果
        List<DataTransferResult> processSuccess=null;
        try {
            List<DataTransferResult> processResult = handler.process(sources);
            //对处理结果生成失败事项,并返回处理成功结果.对没返回的数据,生成跳过处理事项
            //==源业务Id列表为:来源数据业务Id
            List<String> businessIds=sources.stream().map(DataTransferSource::getBusinessId).collect(Collectors.toList());
            processSuccess=this.applicationContext.getBean(this.getClass()).resultDataHandle(batch,DataTransferItemStageEnum.PROGRESS,processResult,businessIds);
            if(CollectionUtils.isEmpty(processSuccess)){
                //成功处理结果为空,则表示不需执行后续的'写入'操作
                log.info("数据处理成功的结果为空,不再执行后续阶段的迁移操作.taskId:{},batchNum:{}",batch.getTaskId(),batch.getBatchNum());
                return null;
            }
        }catch (Exception e){
            //整批次的数据处理失败,根据业务Id列表生成多个数据主体的失败事项(业务Id为:来源数据业务Id)
            List<String> businessIds=sources.stream().map(DataTransferSource::getBusinessId).collect(Collectors.toList());
            this.applicationContext.getBean(this.getClass()).batchDataHandleFail(batch,"数据处理",DataTransferItemStageEnum.PROGRESS,businessIds,e);
            return null;
        }
        return processSuccess;
    }

    /**
     * 执行单版本目标数据写入
     * @param batch 批次信息
     * @param handler 处理器
     * @param processSuccess 处理成功结果列表
     */
    public void executeBatchWrite(DataTransferBatch batch,DataTransferHandler handler,List<DataTransferResult> processSuccess){
        try {
            List<Object> targets = processSuccess.stream().map(DataTransferResult::getData).collect(Collectors.toList());
            List<DataTransferResult> writeResult = handler.write(targets);
            //对处理结果生成失败事项.对没返回的数据,生成跳过处理事项
            //==源业务Id列表为:处理成功结果业务Id
            List<String> businessIds=processSuccess.stream().map(DataTransferResult::getBusinessId).collect(Collectors.toList());
            this.applicationContext.getBean(this.getClass()).resultDataHandle(batch,DataTransferItemStageEnum.WRITE,writeResult,businessIds);
        }catch (Exception e){
            //整批次的数据处理失败,根据业务Id列表生成多个数据主体的失败事项(业务Id为:成功处理结果业务Id)
            List<String> businessIds=processSuccess.stream().map(DataTransferResult::getBusinessId).collect(Collectors.toList());
            // 获取bean的方式执行方法,避免了@Transactional等注解的失效问题
            this.applicationContext.getBean(this.getClass()).batchDataHandleFail(batch,"写入目标数据",DataTransferItemStageEnum.WRITE,businessIds,e);
        }
    }

    /**
     * 输出异常信息,并返回迁移失败状态
     * @param exception 异常信息
     * @param traceId 追踪Id
     * @param action 动作描述
     * @param jobName 作业名称
     * @param executorKey 执行器键名
     * @param batchNum 批次序号
     * @return 迁移失败状态
     */
    protected DataTransferItemStatusEnum handleException(Exception exception,String traceId,String action,String jobName,String executorKey,Integer batchNum){
        //日志提示
        String logTips=null;
        DataTransferItemStatusEnum status=null;
        if(exception instanceof SystemRuntimeException){
            //业务异常
            status=DataTransferItemStatusEnum.BUSINESS_ERROR;
            logTips= MessageFormat.format("数据迁移处理结束,{0}时出现业务异常.traceId:{1},jobName:{2},executorKey:{3},batchNum:{4}"
                    ,action,traceId,jobName,executorKey,batchNum);
        }else{
            //系统异常
            status=DataTransferItemStatusEnum.SYSTEM_ERROR;
            logTips=MessageFormat.format("数据迁移处理结束,{0}时出现系统异常.traceId:{1},jobName:{2},executorKey:{3},batchNum:{4}"
                    ,action,traceId,jobName,executorKey,batchNum);
        }
        log.error(logTips,exception);
        return status;
    }

    /**
     * 生成单个批次主体的跳过处理事项
     * @param batch 批次信息
     * @param realTotal 实际返回数量
     * @param stage 阶段
     */
    @Transactional(rollbackFor = RuntimeException.class,transactionManager = "selfTransactionManager")
    public void batchHandleSkip(DataTransferBatch batch, int realTotal,DataTransferItemStageEnum stage){
        //1.计算跳过处理数量
        //获取当前批次应处理数量
        int batchItemCount=this.getBatchItemCount(batch.getBatchSize(),batch.getBatchNum(),batch.getBatchCount(),batch.getTotal());
        //跳过数量=当前批次应处理数量-实际返回数量
        int skipCount=batchItemCount-realTotal;

        //2.生成迁移跳过处理事项
        if(skipCount<=0){
            //没有跳过处理的事项,则退出处理
            log.info("不存在跳过处理事项,退出处理.jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
            return;
        }

        //3.保存跳过处理事项
        DataTransferItem skipItem=new DataTransferItem();
        //主体Id为'批次序号'
        skipItem.setSubjectId(String.valueOf(batch.getBatchNum()));
        skipItem.setStatus(DataTransferItemStatusEnum.SKIP);
        //跳过处理数量
        skipItem.setNumber(skipCount);
        skipItem.setTraceId("");
        skipItem.setError("");
        this.dataTransferItemService.batchAdd(batch.getTaskId(),DataTransferSubjectTypeEnum.BATCH,stage,Arrays.asList(skipItem),"");

        //4.递增跳过处理事项状态数量,并检测任务完成状态
        boolean isFinish=this.dataTransferTaskService.incrItemCount(batch.getTaskId(),0,0,skipCount);
        if(isFinish){
            log.info("数据迁移任务已完成,jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
        }
    }

    /**
     * 获取批次实际事项数量
     * @param batchSize 批次大小
     * @param batchNum 批次序号.从1开始
     * @param batchCount 批次数量
     * @param total 总数据量
     * @return 事项数量
     */
    protected int getBatchItemCount(Integer batchSize,Integer batchNum,Integer batchCount,Integer total){
        if(batchNum<batchCount){
            //不是最后一批,事项数量为批次大小
            return batchSize;
        }
        //最后一批,事项数量=总数据量-批次大小*(批次序号-1)
        return total-batchSize*(batchNum-1);
    }

    /**
     * 整批次处理失败,生成单个批次主体的失败事项
     * @param batch 批次信息
     * @param action 动作描述
     * @param stage 阶段
     * @param exception 异常
     */
    @Transactional(rollbackFor = RuntimeException.class,transactionManager = "selfTransactionManager")
    public void batchHandleFail(DataTransferBatch batch, String action, DataTransferItemStageEnum stage, Exception exception){
        //生成追踪Id
        String traceId=idGenerator.nextId("");

        //1.处理异常信息,并返回迁移失败状态
        DataTransferItemStatusEnum status=this.handleException(exception,traceId,action,batch.getJobName(),batch.getExecutorKey(),batch.getBatchNum());

        //2.创建失败迁移事项
        //获取批次实际失败数量
        int batchFailCount=this.getBatchItemCount(batch.getBatchSize(),batch.getBatchNum(),batch.getBatchCount(),batch.getTotal());

        DataTransferItem item=new DataTransferItem();
        item.setSubjectId(String.valueOf(batch.getBatchNum()));
        item.setStatus(status);
        item.setNumber(batchFailCount);
        item.setTraceId(traceId);
        item.setError(exception.getMessage());
        this.dataTransferItemService.batchAdd(batch.getTaskId(),DataTransferSubjectTypeEnum.BATCH,stage, Arrays.asList(item),"");

        //3.递增失败数量并检测任务完成状态
        boolean isFinish=this.dataTransferTaskService.incrItemCount(batch.getTaskId(),0,batchFailCount,0);
        if(isFinish){
            log.info("数据迁移任务已完成,jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
        }
    }

    /**
     * 整批次的数据处理失败,根据业务Id列表生成多个数据主体的失败事项
     * @param batch 批次信息
     * @param action 动作描述
     * @param stage 阶段
     * @param businessIds 业务Id列表
     * @param exception 异常
     */
    @Transactional(rollbackFor = RuntimeException.class,transactionManager = "selfTransactionManager")
    public void batchDataHandleFail(DataTransferBatch batch, String action, DataTransferItemStageEnum stage, List<String> businessIds, Exception exception){
        //生成追踪Id
        String traceId=idGenerator.nextId("");

        //处理异常信息,并返回迁移失败状态
        DataTransferItemStatusEnum status=this.handleException(exception,traceId,action,batch.getJobName(),batch.getExecutorKey(),batch.getBatchNum());

        //1.根据业务Id,生成整批次的迁移事项
        List<DataTransferItem> items=businessIds.stream().map(businessId->{

            DataTransferItem item=new DataTransferItem();
            //处理结果失败时,主体Id为业务Id
            item.setSubjectId(businessId);
            item.setStatus(status);
            //一个业务Id,数量值为:1
            item.setNumber(1);
            item.setTraceId(traceId);
            item.setError(exception.getMessage());
            return item;
        }).collect(Collectors.toList());
        //保存迁移事项.主体类型为'数据'
        this.dataTransferItemService.batchAdd(batch.getTaskId(),DataTransferSubjectTypeEnum.DATA,stage,items,"");

        //2.递增任务失败数量,并检测完成状态
        //检测任务完成状态
        boolean isFinish=this.dataTransferTaskService.incrItemCount(batch.getTaskId(),0,items.size(),0);
        if(isFinish){
            log.info("数据迁移任务已完成,jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
        }
    }

    /**
     * 根据处理结果与源业务Id对比,生成跳过事项、失败事项,并返回成功结果列表
     * @param batch 批次信息
     * @param stage 阶段
     * @param results 处理结果列表
     * @param businessIds 源业务Id列表
     * @return 处理成功结果列表
     */
    @Transactional(rollbackFor = RuntimeException.class,transactionManager = "selfTransactionManager")
    public List<DataTransferResult> resultDataHandle(DataTransferBatch batch,DataTransferItemStageEnum stage,List<DataTransferResult> results,List<String> businessIds){
        //处埋成功结果
        List<DataTransferResult> successResults=new ArrayList<>();
        //失败事项
        List<DataTransferItem> failItems= new ArrayList<>();

        //1.获取处理成功结果、失败事项
        for(int i=0;i<results.size();i++){
            DataTransferResult result=results.get(i);
            switch (result.getStatus()){
                case SUCCESS:
                    //处理成功结果
                    successResults.add(result);
                    continue;
                case SKIP:
                    //跳过
                    throw new SystemRuntimeException(MessageFormat.format("发现无效状态处理结果.状态:{}",result.getStatus()));
            }

            //生成处理失败事项
            DataTransferItem item=new DataTransferItem();
            //==主体Id为业务Id
            item.setSubjectId(result.getBusinessId());
            item.setStatus(result.getStatus());
            item.setTraceId(result.getTraceId());
            item.setError(result.getError());
            //==一个结果对应数量为:1
            item.setNumber(1);
            failItems.add(item);
        }

        //2.生成跳过处理事项
        //获取处理结果的业务Id列表
        List<String> resultBusinessIds=results.stream().map(DataTransferResult::getBusinessId).collect(Collectors.toList());
        List<DataTransferItem> skipItems=this.getSkipItems(businessIds,resultBusinessIds);

        //3.保存迁移失败事项、跳过事项.主体类型为'数据'
        List<DataTransferItem> addItems=new ArrayList<>();
        addItems.addAll(failItems);
        addItems.addAll(skipItems);

        //5.更新迁移事项
        if(CollectionUtils.isNotEmpty(addItems)){
            this.dataTransferItemService.batchAdd(batch.getTaskId(),DataTransferSubjectTypeEnum.DATA,stage,addItems,"");
        }

        //6.更新任务失败、跳过处理数量,以及完成状态
        //获取迁移失败事项数量
        int failCount=failItems.size();
        //获取跳过处理数量
        int skipCount=skipItems.size();
        //获取迁移功事项数量
        int successCount=0;
        if(DataTransferItemStageEnum.WRITE.equals(stage)){
            //只有在写入目标数据阶段为成功,才递增成功数量
            successCount=successResults.size();
        }
        //递增事项状态数量,并检测任务完成状态
        boolean isFinish=this.dataTransferTaskService.incrItemCount(batch.getTaskId(),successCount,failCount,skipCount);
        if(isFinish){
            log.info("数据迁移任务已完成,jobName:{},taskId:{},batchNum:{}",batch.getJobName(),batch.getTaskId(),batch.getBatchNum());
        }
        return successResults;
    }

    /**
     * 根据源业务Id与返回结果业务Id对比,获得跳过的处理事项
     * @param fromBusinessIds 源业务Id
     * @param resultBusinessIds 结果业务Id
     * @return 跳过事项列表
     */
    protected List<DataTransferItem> getSkipItems(List<String> fromBusinessIds,List<String> resultBusinessIds){
        //获取跳过处理的业务Id列表
        List<String> skipBusinessIds=new ArrayList<>();
        skipBusinessIds.addAll(fromBusinessIds);
        skipBusinessIds.removeAll(resultBusinessIds);
        //生成跳过事项
        List<DataTransferItem> skipItems=skipBusinessIds.stream().map(skipBusinessId->{
            DataTransferItem item=new DataTransferItem();
            item.setSubjectId(skipBusinessId);
            item.setStatus(DataTransferItemStatusEnum.SKIP);
            item.setTraceId("");
            item.setError("");
            //一个业务Id对应数量为:1
            item.setNumber(1);
            return item;
        }).collect(Collectors.toList());
        return skipItems;
    }
}

停复工模块迁移处理器

实现DataTransferHandler接口

停复工申请处理器 WorkResumeDtStopPlanApplyHandler

/**
 * <p>
 * 停复工_停工计划申请数据迁移处理器 实现类
 * </p>
 *
 * @author liJY
 * @Date 2023-06-20
 */
@RequiredArgsConstructor(onConstructor_={@Autowired})
@Slf4j
@Component
public class WorkResumeDtStopPlanApplyHandler extends DataTransferHandlerSuper implements DataTransferHandler<StopOrResumeWork, WorkResumeApplyResult> {
    private DataTransferBusinessProperties dataTransferBusinessProperties;
    private WorkflowInstanceService workflowInstanceService;
    private SysOrgService sysOrgService;
    private FlowBillService flowBillService;
    private OldProjectService projectService;
    private StopOrResumeWorkService stopOrResumeWorkService;
    private WorkResumeApplyService workResumeApplyService;
    private BasicDataTreeService basicDataTreeService;
    @Value("${data-transfer.business.work-resume-stop-plan-apply.overview-length:5000}")
    private Integer lengthLimit;

    @Autowired
    public void setIdGenerator(IdGenerator idGenerator) {
        this.idGenerator = idGenerator;
    }

    @Autowired
    public void setDataTransferBusinessProperties(DataTransferBusinessProperties dataTransferBusinessProperties) {
        this.dataTransferBusinessProperties = dataTransferBusinessProperties;
    }

    @Autowired
    public void setWorkflowInstanceService(WorkflowInstanceService workflowInstanceService) {
        this.workflowInstanceService = workflowInstanceService;
    }

    @Autowired
    public void setSysOrgService(SysOrgService sysOrgService) {
        this.sysOrgService = sysOrgService;
    }

    @Autowired
    public void setFlowBillService(FlowBillService flowBillService) {
        this.flowBillService = flowBillService;
    }

    @Autowired
    public void setProjectService(OldProjectService projectService) {
        this.projectService = projectService;
    }

    @Autowired
    public void setStopOrResumeWorkService(StopOrResumeWorkService stopOrResumeWorkService) {
        this.stopOrResumeWorkService = stopOrResumeWorkService;
    }

    @Autowired
    public void setWorkResumeApplyService(WorkResumeApplyService workResumeApplyService) {
        this.workResumeApplyService = workResumeApplyService;
    }

    @Autowired
    public void setBasicDataTreeService(BasicDataTreeService basicDataTreeService) {
        this.basicDataTreeService = basicDataTreeService;
    }

    /**
     * 读取来源处理事项总数(用于首次启动任务场景)
     * @return 总数量
     */
    @Override
    public int readTotal(){
        //使用配置的条件查询
        return this.stopOrResumeWorkService.getTotal(dataTransferBusinessProperties.getWorkResumeStopPlanApply().getQuery());
    }

    /**
     * 读取来源数据(用于首次启动任务场景)
     * @param batchSize 批次大小
     * @param batchNum 批次序号
     * @return 来源数据列表.返回数量比批次数量少的,则为跳过处理,空:没有待处理数据
     */
    @Override
    public List<DataTransferSource<StopOrResumeWork>> read(int batchSize, int batchNum){
        //使用配置的条件查询
        List<StopOrResumeWork> list= this.stopOrResumeWorkService.pageQuery(batchSize,batchNum, dataTransferBusinessProperties.getWorkResumeStopPlanApply().getQuery());
        if(CollectionUtils.isEmpty(list)){
            log.info("停复工_停工计划申请来源数据为空,退出处理");
            return new ArrayList<>();
        }

        //定义返回结果
        List<DataTransferSource<StopOrResumeWork>> result=list.stream().map(apply -> {
            DataTransferSource<StopOrResumeWork> source=new DataTransferSource<>();
            //定义业务Id
            source.setBusinessId(apply.getWorkID());
            source.setData(apply);
            return source;
        }).collect(Collectors.toList());
        return result;
    }

    /**
     * 读取来源数据(用于重启任务场景)
     * @param businessIds 业务Id列表
     * @return 来源数据列表.返回数量比入参数量少的,则为跳过处理,空:没有待处理数据
     */
    @Override
    public List<DataTransferSource<StopOrResumeWork>> read(List<String> businessIds){
        if(CollectionUtils.isEmpty(businessIds)){
            log.info("停复工_停工计划申请业务id为空,退出处理");
            return new ArrayList<>();
        }

        //固定使用id查询
        List<StopOrResumeWork> list=this.stopOrResumeWorkService.findByIds(businessIds);
        if(CollectionUtils.isEmpty(list)){
            log.info("停复工_停工计划申请来源数据为空,退出处理");
            return new ArrayList<>();
        }
        //定义返回结果
        List<DataTransferSource<StopOrResumeWork>> result=list.stream().map(apply -> {
            DataTransferSource<StopOrResumeWork> source=new DataTransferSource<>();
            //定义业务Id
            source.setBusinessId(apply.getWorkID());
            source.setData(apply);
            return source;
        }).collect(Collectors.toList());
        return result;
    }

    /**
     * 数据处理
     * @param sources 来源数据列表
     * @return 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
     */
    @Override
    public List<DataTransferResult<WorkResumeApplyResult>> process(List<DataTransferSource<StopOrResumeWork>> sources){
        if(CollectionUtils.isEmpty(sources)){
            log.info("停复工_停工计划申请来源数据为空,退出处埋");
            return new ArrayList<>();
        }

        //1.对已处理的数据进行过滤,防止重复迁移
        //获取来源数据的申请Id
        List<String> sourceApplyIds=sources.stream().map(DataTransferSource::getBusinessId).collect(Collectors.toList());
        //获取目标数据已存在的申请.结构:Map<申请Id,WorkResumeApply>
        Map<String,WorkResumeApply> existsApplys=this.workResumeApplyService.getByIds(sourceApplyIds);
        //过滤得到未迁移的来源数据
        List<DataTransferSource<StopOrResumeWork>> useSources=sources.stream().filter(source->{
            if(existsApplys.containsKey(source.getBusinessId())){
                return false;
            }
            return true;
        }).collect(Collectors.toList());
        if(CollectionUtils.isEmpty(useSources)){
            log.info("停复工_停工计划申请,待迁移的来源数据为空,退出处理");
            return new ArrayList<>();
        }

        //2.获取流程信息(流程信息会与申请信息一同写入,不需判断重复迁移).结构:Map<业务Id,WorkflowInstance>
        List<String> useApplyIds=useSources.stream().map(DataTransferSource::getBusinessId).collect(Collectors.toList());
        Map<String, WorkflowInstance> workflowMap=this.workflowInstanceService.getByBusinessIds(useApplyIds);

        //3.获取项目信息.结构:Map<项目Id,Project>
        List<String> projectIds=useSources.stream().map(source->{
            return source.getData().getProjectID();
        }).collect(Collectors.toList());
        Map<String, OldProject> projectMap=this.projectService.getByIds(projectIds);

        //4.获取旧平台项目所属区域信息(目的是获取区域编码、区域名称).结构:Map<区域Id,SysOrg>
        List<String> oldRegionIds=new ArrayList<>();
        projectMap.forEach((projectId,project)->{
            oldRegionIds.add(project.getRegionID());
        });
        Map<String, SysOrg> oldRegionMap=this.sysOrgService.getByOrgIds(oldRegionIds);

        //5.获取所属新平台区域信息(目的是获取新平台区域Id).结构:Map<区域编码,BasicDataTree>
        List<String> regionCodes=new ArrayList<>();
        oldRegionMap.forEach((regionId,SysOrg)->{
            regionCodes.add(SysOrg.getOrgCode());
        });
        Map<String, BasicDataTree> newRegionMap=this.basicDataTreeService.findByMdgId(regionCodes);

        //6.构建处理结果
        List<DataTransferResult<WorkResumeApplyResult>> result=useSources.stream().map(source->{
            //获取申请信息
            StopOrResumeWork apply = source.getData();
            //获取流程信息
            WorkflowInstance workflowInstance = workflowMap.get(apply.getWorkID());
            //获取项目信息
            OldProject project=projectMap.get(apply.getProjectID());
            //获取旧平台区域信息
            SysOrg oldRegion = null;
            if(project!=null){
                oldRegion=oldRegionMap.get(project.getRegionID());
            }
            //获取新平台区域信息
            BasicDataTree newRegion=null;
            if(oldRegion!=null){
                newRegion=newRegionMap.get(oldRegion.getOrgCode());
            }

            //构建处理结果
            DataTransferResult<WorkResumeApplyResult> processResult=buildProcessResult(apply,workflowInstance,project,oldRegion,newRegion);
            return  processResult;
        }).collect(Collectors.toList());
        return result;
    }

    /**
     * 构建处理结果
     * @param apply 申请信息
     * @param workflowInstance 流程实例信息
     * @param project 项目信息
     * @param oldRegion 旧平台区域信息
     * @param newRegion 新平台区域信息
     * @return 处理结果
     */
    private DataTransferResult<WorkResumeApplyResult> buildProcessResult(StopOrResumeWork apply, WorkflowInstance workflowInstance
            , OldProject project, SysOrg oldRegion,BasicDataTree newRegion){
        DataTransferResult<WorkResumeApplyResult> processResult=new DataTransferResult<>();
        //申请Id
        String applyId=apply.getWorkID();
        try{
            if (workflowInstance == null) {
                throw new SystemRuntimeException("缺少流程信息");
            }
            if(project==null){
                throw new SystemRuntimeException("缺少项目信息");
            }
            if (oldRegion == null) {
                throw new SystemRuntimeException("缺少旧平台区域信息");
            }
            if(newRegion==null){
                throw new SystemRuntimeException("缺少新平台区域信息,"+oldRegion.getOrgCode()+","+oldRegion.getOrgName());
            }
            if(StringUtils.length(apply.getOverview()) >= lengthLimit){
                throw new SystemRuntimeException("overview字段字符串长度超过"+lengthLimit);
            }

            //构建申请处理结果
            WorkResumeApplyResult applyResult = new WorkResumeApplyResult();
            applyResult.setApply(WorkResumeUtils.buildStopPlanApplyByOld(apply, workflowInstance, project,oldRegion,newRegion));
            applyResult.setFlow(WorkResumeUtils.buildFlowBillByOld(project, workflowInstance));

            //定义处理成功结果
            processResult.setData(applyResult);
            processResult.setBusinessId(applyId);
            processResult.setError("");
            processResult.setTraceId("");
            processResult.setStatus(DataTransferItemStatusEnum.SUCCESS);
        }catch (Exception e){
            //生成追踪Id,并对异常日志输出
            String traceId=idGenerator.nextId("");
            //输出异常日志,并返回处理状态
            DataTransferItemStatusEnum status=super.handleException(e,traceId,"停复工_停工计划申请","数据处理");

            //定义处理失败结果
            processResult.setData(null);
            processResult.setBusinessId(applyId);
            processResult.setError(e.getMessage());
            processResult.setTraceId(traceId);
            processResult.setStatus(status);
        }
        return processResult;
    }

    /**
     * 写入目标数据
     * @param targets 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
     */
    @Override
    @Transactional(rollbackFor = RuntimeException.class,transactionManager = "plan-rebuildTransactionManager")
    public List<DataTransferResult> write(List<WorkResumeApplyResult> targets){
        //使用事务写入,使申请与流程信息同时整批写入成功或失败

        //1.保存申请信息
        //获取需保存的申请信息
        List<WorkResumeApply> applys=targets.stream().map(WorkResumeApplyResult::getApply).collect(Collectors.toList());
        this.workResumeApplyService.batchSave(applys);

        //2.保存流程信息
        List<FlowBill> flowBills=targets.stream().map(WorkResumeApplyResult::getFlow).collect(Collectors.toList());
        this.flowBillService.batchSave(flowBills);

        //3.对整批次的目标写入数据,构建整批成功写入结果
        //获取写入目标数据业务Id
        List<String> businessIds=targets.stream().map(applyResult->{
            return applyResult.getApply().getId();
        }).collect(Collectors.toList());
        List<DataTransferResult> result=buildBatchWriteResultSuccess(businessIds);
        return result;
    }
}

来源数据类 StopOrResumeWork

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("StopOrResumeWork")
public class StopOrResumeWork extends Model<StopOrResumeWork> {

    private static final long serialVersionUID = 1L;
    /**
     * 申请ID
     */
    @TableField("WorkID")
    private String workID;
    /**
     * 运营项目Id
     */
    @TableField("ProjectID")
    private String projectID;
    /**
     * 创建时间
     */
    @TableField("InsertOn")
    private LocalDateTime insertOn;
    /**
     * 更新时间(审批状态更新时间)
     */
    @TableField("UpdateOn")
    private LocalDateTime updateOn;
    /**
     * 更新用户
     */
    @TableField("UpdateBy")
    private String updateBy;
    /**
     * 情况说明
     */
    @TableField("Overview")
    private String overview;

    /**
     * 创建人姓名(流程创建人姓名)
     */
    @TableField("CreatedUserName")
    private String createdUserName;
}

目标数据类

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@ApiModel(value="WorkResumeApplyResult对象", description="停复工申请信息数据迁移处理结果 对象")
public class WorkResumeApplyResult {
    /**
     * 申请信息
     */
    private WorkResumeApply apply;
    /**
     * 流程信息
     */
    private FlowBill flow;
}

复工确认申请处理器 WorkResumeDtConfirmApplyHandler

/**
 * <p>
 * 停复工_复工确认申请数据迁移处理器 实现类
 * </p>
 *
 * @author liJY
 * @Date 2023-06-20
 */
@RequiredArgsConstructor(onConstructor_={@Autowired})
@Slf4j
@Component
public class WorkResumeDtConfirmApplyHandler extends DataTransferHandlerSuper implements DataTransferHandler<ResumeWorkConfirmInfo, WorkResumeApplyResult> {
    private DataTransferBusinessProperties dataTransferBusinessProperties;
    private WorkflowInstanceService workflowInstanceService;
    private SysOrgService sysOrgService;
    private FlowBillService flowBillService;
    private OldProjectService projectService;
    private ResumeWorkConfirmInfoService resumeWorkConfirmInfoService;
    private WorkResumeApplyService workResumeApplyService;
    private BasicDataTreeService basicDataTreeService;
    @Value("${data-transfer.business.work-resume-stop-plan-apply.overview-length:5000}")
    private Integer lengthLimit;

    @Autowired
    public void setIdGenerator(IdGenerator idGenerator) {
        this.idGenerator = idGenerator;
    }

    @Autowired
    public void setDataTransferBusinessProperties(DataTransferBusinessProperties dataTransferBusinessProperties) {
        this.dataTransferBusinessProperties = dataTransferBusinessProperties;
    }

    @Autowired
    public void setWorkflowInstanceService(WorkflowInstanceService workflowInstanceService) {
        this.workflowInstanceService = workflowInstanceService;
    }

    @Autowired
    public void setSysOrgService(SysOrgService sysOrgService) {
        this.sysOrgService = sysOrgService;
    }

    @Autowired
    public void setFlowBillService(FlowBillService flowBillService) {
        this.flowBillService = flowBillService;
    }
    
    @Autowired
    public void setProjectService(OldProjectService projectService) {
        this.projectService = projectService;
    }

    @Autowired
    public void setResumeWorkConfirmInfoService(ResumeWorkConfirmInfoService resumeWorkConfirmInfoService) {
        this.resumeWorkConfirmInfoService = resumeWorkConfirmInfoService;
    }

    @Autowired
    public void setWorkResumeApplyService(WorkResumeApplyService workResumeApplyService) {
        this.workResumeApplyService = workResumeApplyService;
    }

    @Autowired
    public void setBasicDataTreeService(BasicDataTreeService basicDataTreeService) {
        this.basicDataTreeService = basicDataTreeService;
    }

    /**
     * 读取来源处理事项总数(用于首次启动任务场景)
     * @return 总数量
     */
    @Override
    public int readTotal(){
        //使用配置的条件查询
        return this.resumeWorkConfirmInfoService.getTotal(dataTransferBusinessProperties.getWorkResumeConfirmApply().getQuery());
    }

    /**
     * 读取来源数据(用于首次启动任务场景)
     * @param batchSize 批次大小
     * @param batchNum 批次序号
     * @return 来源数据列表.返回数量比批次数量少的,则为跳过处理,空:没有待处理数据
     */
    @Override
    public List<DataTransferSource<ResumeWorkConfirmInfo>> read(int batchSize, int batchNum){
        //使用配置的条件查询
        List<ResumeWorkConfirmInfo> list= this.resumeWorkConfirmInfoService.pageQuery(batchSize,batchNum, dataTransferBusinessProperties.getWorkResumeConfirmApply().getQuery());
        if(CollectionUtils.isEmpty(list)){
            log.info("停复工_复工确认申请来源数据为空,退出处理");
            return new ArrayList<>();
        }

        //定义返回结果
        List<DataTransferSource<ResumeWorkConfirmInfo>> result=list.stream().map(apply -> {
            DataTransferSource<ResumeWorkConfirmInfo> source=new DataTransferSource<>();
            //定义业务Id
            source.setBusinessId(apply.getId());
            source.setData(apply);
            return source;
        }).collect(Collectors.toList());
        return result;
    }

    /**
     * 读取来源数据(用于重启任务场景)
     * @param businessIds 业务Id列表
     * @return 来源数据列表.返回数量比入参数量少的,则为跳过处理,空:没有待处理数据
     */
    @Override
    public List<DataTransferSource<ResumeWorkConfirmInfo>> read(List<String> businessIds){
        if(CollectionUtils.isEmpty(businessIds)){
            log.info("停复工_复工确认申请业务id为空,退出处理");
            return new ArrayList<>();
        }

        //固定使用id查询
        List<ResumeWorkConfirmInfo> list=this.resumeWorkConfirmInfoService.findByIds(businessIds);
        if(CollectionUtils.isEmpty(list)){
            log.info("停复工_复工确认申请来源数据为空,退出处理");
            return new ArrayList<>();
        }
        //定义返回结果
        List<DataTransferSource<ResumeWorkConfirmInfo>> result=list.stream().map(apply -> {
            DataTransferSource<ResumeWorkConfirmInfo> source=new DataTransferSource<>();
            //定义业务Id
            source.setBusinessId(apply.getId());
            source.setData(apply);
            return source;
        }).collect(Collectors.toList());
        return result;
    }

    /**
     * 数据处理
     * @param sources 来源数据列表
     * @return 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
     */
    @Override
    public List<DataTransferResult<WorkResumeApplyResult>> process(List<DataTransferSource<ResumeWorkConfirmInfo>> sources){
        if(CollectionUtils.isEmpty(sources)){
            log.info("停复工_复工确认申请来源数据为空,退出处埋");
            return new ArrayList<>();
        }

        //1.对已处理的数据进行过滤,防止重复迁移
        //获取来源数据的申请Id
        List<String> sourceApplyIds=sources.stream().map(DataTransferSource::getBusinessId).collect(Collectors.toList());
        //获取目标数据已存在的申请.结构:Map<申请Id,WorkResumeApply>
        Map<String, WorkResumeApply> existsApplys=this.workResumeApplyService.getByIds(sourceApplyIds);
        //过滤得到未迁移的来源数据
        List<DataTransferSource<ResumeWorkConfirmInfo>> useSources=sources.stream().filter(source->{
            if(existsApplys.containsKey(source.getBusinessId())){
                return false;
            }
            return true;
        }).collect(Collectors.toList());
        if(CollectionUtils.isEmpty(useSources)){
            log.info("停复工_复工确认申请,待迁移的来源数据为空,退出处理");
            return new ArrayList<>();
        }

        //2.获取流程信息(流程信息会与申请信息一同写入,不需判断重复迁移).结构:Map<业务Id,WorkflowInstance>
        List<String> useApplyIds=useSources.stream().map(DataTransferSource::getBusinessId).collect(Collectors.toList());
        Map<String, WorkflowInstance> workflowMap=this.workflowInstanceService.getByBusinessIds(useApplyIds);
    
        //3.获取项目信息.结构:Map<项目Id,Project>
        List<String> projectIds=useSources.stream().map(source->{
            return source.getData().getProjectID();
        }).collect(Collectors.toList());
        Map<String, OldProject> projectMap=this.projectService.getByIds(projectIds);
        
        //4.获取旧平台项目所属区域信息(目的区域编码).结构:Map<区域Id,SysOrg>
        List<String> oldRegionIds=new ArrayList<>();
        projectMap.forEach((projectId,project)->{
            oldRegionIds.add(project.getRegionID());
        });
        Map<String, SysOrg> oldRegionMap=this.sysOrgService.getByOrgIds(oldRegionIds);

        //5.获取所属新平台区域信息(目的是获取新平台区域Id).结构:Map<区域编码,BasicDataTree>
        List<String> regionCodes=new ArrayList<>();
        oldRegionMap.forEach((regionId,SysOrg)->{
            regionCodes.add(SysOrg.getOrgCode());
        });
        Map<String, BasicDataTree> newRegionMap=this.basicDataTreeService.findByMdgId(regionCodes);

        //6.构建处理结果
        List<DataTransferResult<WorkResumeApplyResult>> result=useSources.stream().map(source->{
            //获取申请信息
            ResumeWorkConfirmInfo apply = source.getData();
            //获取流程信息
            WorkflowInstance workflowInstance = workflowMap.get(apply.getId());
            //获取项目信息
            OldProject project=projectMap.get(apply.getProjectID());
            //获取旧平台区域信息
            SysOrg oldRegion = null;
            if(project!=null){
                oldRegion=oldRegionMap.get(project.getRegionID());
            }
            //获取新平台区域信息
            BasicDataTree newRegion=null;
            if(oldRegion!=null){
                newRegion=newRegionMap.get(oldRegion.getOrgCode());
            }

            //构建处理结果
            DataTransferResult<WorkResumeApplyResult> processResult=buildProcessResult(apply,workflowInstance,oldRegion,newRegion);
            return  processResult;
        }).collect(Collectors.toList());
        return result;
    }

    /**
     * 构建处理结果
     * @param apply 申请信息
     * @param workflowInstance 流程实例信息
     * @param oldRegion 旧平台区域信息
     * @param newRegion 新平台区域信息
     * @return 处理结果
     */
    private DataTransferResult<WorkResumeApplyResult> buildProcessResult(ResumeWorkConfirmInfo apply,WorkflowInstance workflowInstance
            ,SysOrg oldRegion,BasicDataTree newRegion){
        DataTransferResult<WorkResumeApplyResult> processResult=new DataTransferResult<>();
        //申请Id
        String applyId=apply.getId();
        try{
            if (workflowInstance == null) {
                throw new SystemRuntimeException("缺少流程信息");
            }
            if (oldRegion == null) {
                throw new SystemRuntimeException("缺少旧平台区域信息");
            }
            if(newRegion==null){
                throw new SystemRuntimeException("缺少新平台区域信息,"+oldRegion.getOrgCode()+","+oldRegion.getOrgName());
            }
            if(StringUtils.length(apply.getOverview()) >= lengthLimit){
                throw new SystemRuntimeException("overview字段字符串长度超过"+lengthLimit);
            }
            //构建申请处理结果
            WorkResumeApplyResult applyResult = new WorkResumeApplyResult();
            applyResult.setApply(WorkResumeUtils.buildConfirmApplyByOld(apply, workflowInstance, oldRegion,newRegion));
            applyResult.setFlow(WorkResumeUtils.buildFlowBillByOld(new OldProject().setProjectID(apply.getProjectID()).setProjectName(apply.getProjectName()), workflowInstance));

            //定义处理成功结果
            processResult.setData(applyResult);
            processResult.setBusinessId(applyId);
            processResult.setError("");
            processResult.setTraceId("");
            processResult.setStatus(DataTransferItemStatusEnum.SUCCESS);
        }catch (Exception e){
            //生成追踪Id,并对异常日志输出
            String traceId=idGenerator.nextId("");
            //输出异常日志,并返回处理状态
            DataTransferItemStatusEnum status=super.handleException(e,traceId,"停复工_复工确认申请","数据处理");

            //定义处理失败结果
            processResult.setData(null);
            processResult.setBusinessId(applyId);
            processResult.setError(e.getMessage());
            processResult.setTraceId(traceId);
            processResult.setStatus(status);
        }
        return processResult;
    }

    /**
     * 写入目标数据
     * @param targets 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
     */
    @Override
    @Transactional(rollbackFor = RuntimeException.class,transactionManager = "plan-rebuildTransactionManager")
    public List<DataTransferResult> write(List<WorkResumeApplyResult> targets){
        //使用事务写入,使申请与流程信息同时整批写入成功或失败

        //1.保存申请信息
        //获取需保存的申请信息
        List<WorkResumeApply> applys=targets.stream().map(WorkResumeApplyResult::getApply).collect(Collectors.toList());
        this.workResumeApplyService.batchSave(applys);

        //2.保存流程信息
        List<FlowBill> flowBills=targets.stream().map(WorkResumeApplyResult::getFlow).collect(Collectors.toList());
        this.flowBillService.batchSave(flowBills);

        //3.对整批次的目标写入数据,构建整批成功写入结果
        //获取写入目标数据业务Id
        List<String> businessIds=targets.stream().map(applyResult->{
            return applyResult.getApply().getId();
        }).collect(Collectors.toList());
        List<DataTransferResult> result=buildBatchWriteResultSuccess(businessIds);
        return result;
    }
}

WorkResumeDtStopPlanDetailHandler

/**
 * 停复工申请明细迁移处理器,实现类
 * @author xiejinwei02
 * @date 2023/8/10 16:39
 */
@RequiredArgsConstructor(onConstructor_={@Autowired})
@Slf4j
@Component
public class WorkResumeDtStopPlanDetailHandler extends DataTransferHandlerSuper implements DataTransferHandler<BuildingStopOrResumeDTO, BuildingStopOrResumeDTO> {
	@Resource
	private DataTransferBusinessProperties dataTransferBusinessProperties;
	@Resource
	private BuildingStopOrResumeService buildingStopOrResumeService;
	@Resource
	private OldProjectService projectService;
	@Resource
	private SysOrgService sysOrgService;
	@Resource
	private WorkResumeRecordService workResumeRecordService;
	@Resource
	private WorkResumePlanService workResumePlanService;
	@Resource
	private BasicDataTreeService basicDataTreeService;
	@Resource
	@Qualifier("plan-rebuildTransactionManager")
	private PlatformTransactionManager planRebuildTransactionManager;
	private List<StopTypeEnum> springWinterBreakList = Arrays.asList(StopTypeEnum.SPRING_BREAK,StopTypeEnum.WINTER_BREAK);
	private final int longStopYear = 9990;
	
	@Autowired
	public void setIdGenerator(IdGenerator idGenerator) {
		this.idGenerator = idGenerator;
	}
	
	@Override
	public int readTotal() {
		return buildingStopOrResumeService.getTotal(dataTransferBusinessProperties.getWorkResumeStopPlanDetail().getQuery());
	}
	
	/**
	 * 读取来源数据(用于首次启动任务场景)
	 * @param batchSize 批次大小
	 * @param batchNum 批次序号
	 * @return
	 */
	@Override
	public List<DataTransferSource<BuildingStopOrResumeDTO>> read(int batchSize, int batchNum) {
		List<BuildingStopOrResumeDTO> dtoList = buildingStopOrResumeService.pageQuery(batchSize,batchNum,dataTransferBusinessProperties.getWorkResumeStopPlanDetail().getQuery());
		if(CollectionUtils.isEmpty(dtoList)){
			log.info("停复工申请明细来源数据为空,退出处理");
			return new ArrayList<>();
		}
		//定义返回结果
		List<DataTransferSource<BuildingStopOrResumeDTO>> result = dtoList.stream().map(dto->{
			DataTransferSource<BuildingStopOrResumeDTO> source = new DataTransferSource<>();
			source.setBusinessId(dto.getDetailId()); // 明细id
			source.setData(dto);
			return source;
		}).collect(Collectors.toList());
		return result;
	}
	
	/**
	 * 读取来源数据(用于重启任务场景)
	 * @param businessIds 业务Id列表
	 * @return
	 */
	@Override
	public List<DataTransferSource<BuildingStopOrResumeDTO>> read(List<String> businessIds) {
		if(CollectionUtils.isEmpty(businessIds)){
			log.info("停复工申请明细业务id为空,退出处理");
			return new ArrayList<>();
		}
		//固定使用id查询
		List<BuildingStopOrResumeDTO> dtoList = buildingStopOrResumeService.findByIds(businessIds);
		if(CollectionUtils.isEmpty(dtoList)){
			log.info("停复工申请明细来源数据为空,退出处理");
			return new ArrayList<>();
		}
		//定义返回结果
		List<DataTransferSource<BuildingStopOrResumeDTO>> result = dtoList.stream().map(dto->{
			DataTransferSource<BuildingStopOrResumeDTO> source = new DataTransferSource<>();
			source.setBusinessId(dto.getDetailId()); // 明细id
			source.setData(dto);
			return source;
		}).collect(Collectors.toList());
		return result;
	}
	
	/**
	 * 数据处理
	 * @param sources
	 * @return
	 */
	@Override
	public List<DataTransferResult<BuildingStopOrResumeDTO>> process(List<DataTransferSource<BuildingStopOrResumeDTO>> sources) {
		// 1、排除已迁移的数据
		List<String> sourceDetailIds = sources.stream().map(s->s.getBusinessId()).collect(Collectors.toList());
		List<String> exitsDetailIds = workResumeRecordService.listObjs(new LambdaQueryWrapper<WorkResumeRecord>().select(WorkResumeRecord::getId).in(WorkResumeRecord::getId,sourceDetailIds),v->v.toString());
		// 过滤得到未迁移的来源数据
		List<DataTransferSource<BuildingStopOrResumeDTO>> useSources = sources.stream().filter(s-> !exitsDetailIds.contains(s.getBusinessId())).collect(Collectors.toList());
		if(org.apache.commons.collections4.CollectionUtils.isEmpty(useSources)){
			log.info("停复工申请明细,待迁移的来源数据为空,退出处理");
			return new ArrayList<>();
		}
		
		// 2、查询项目信息,结构:Map<项目Id,Project>
		List<String> projectIds = useSources.stream().map(s->s.getData().getProjectId()).collect(Collectors.toList());
		Map<String, OldProject> projectMap = projectService.getByIds(projectIds);
		// 3、查询项目所属区域信息,结构:Map<区域Id,SysOrg>
		List<String> regionIds=new ArrayList<>();
		projectMap.forEach((projectId,project)->{
			regionIds.add(project.getRegionID());
		});
		Map<String, SysOrg> oldRegionMap = sysOrgService.getByOrgIds(regionIds);
		// 4、获取所属新平台区域信息(目的是获取新平台区域Id).结构:Map<区域编码,BasicDataTree>
		List<String> regionCodes=new ArrayList<>();
		oldRegionMap.forEach((regionId,SysOrg)->{
			regionCodes.add(SysOrg.getOrgCode());
		});
		Map<String, BasicDataTree> newRegionMap=this.basicDataTreeService.findByMdgId(regionCodes);
		
		// 5、构建处理结果
		List<DataTransferResult<BuildingStopOrResumeDTO>> result = useSources.stream().map(s ->{
			BuildingStopOrResumeDTO detail = s.getData();
			OldProject project = projectMap.get(detail.getProjectId());
			SysOrg oldRegion = Optional.ofNullable(project).map(p->oldRegionMap.get(p.getRegionID())).orElse(null);
			BasicDataTree newRegion = Optional.ofNullable(oldRegion).map(r -> newRegionMap.get(r.getOrgCode())).orElse(null);
			
			DataTransferResult<BuildingStopOrResumeDTO> processResult = buildProcessResult(detail, project, oldRegion, newRegion);
			return processResult;
		}).collect(Collectors.toList());
		
		return result;
	}
	
	/**
	 * 写入目标数据
	 * @param targets 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
	 */
	@Override
	public List<DataTransferResult> write(List<BuildingStopOrResumeDTO> targets) {
		// 查询原因说明字典值
		List<StopReasonDTO> stopReasons = buildingStopOrResumeService.listStopReason();
		List<WorkResumeRecord> resumeRecordList = new ArrayList<>();
		List<WorkResumePlan> resumePlanInsertList = new ArrayList<>();
		List<WorkResumePlan> resumePlanUpdateList = new ArrayList<>();
		LocalDate now = LocalDate.now();
		
		for (BuildingStopOrResumeDTO target : targets) {
			WorkResumeRecord resumeRecord = buildWorkRecordByOld(target,stopReasons);
			resumeRecordList.add(resumeRecord);
			// 查询insert集合楼栋的停工计划是否已存在
			WorkResumePlan resumePlan = resumePlanInsertList.stream().filter(p->Objects.equals(p.getPlanBuildingCode(),target.getPlanBuildingCode()) && Objects.equals(p.getStopPlanTime(),target.getStopStartOn()))
					.findFirst().orElseGet(()->{
						// 查询数据库楼栋的停工计划是否已存在
						return workResumePlanService.getOne(new LambdaQueryWrapper<WorkResumePlan>().select(WorkResumePlan::getId,WorkResumePlan::getResumePlanTime)
								.eq(WorkResumePlan::getPlanBuildingCode, target.getPlanBuildingCode()).eq(WorkResumePlan::getStopPlanTime, target.getStopStartOn())
								.eq(WorkResumePlan::getInvalid, 0).notIn(WorkResumePlan::getStopType, springWinterBreakList).last("limit 1"));
					});
			if(Objects.isNull(resumePlan)){
				resumePlan = buildResumePlanByOld(target,now);
				resumePlanInsertList.add(resumePlan);
				resumeRecord.setResumePlanId(resumePlan.getId());
				resumePlan.setReason(resumeRecord.getReason());
				resumePlan.setReasonType(resumeRecord.getReasonType());
				// 停工类型,判断是否与春节/冬歇期停工计划重叠
				OverlapPutVO putVO = new OverlapPutVO();
				putVO.setProjectId(resumePlan.getProjectId());
				putVO.setPlanBuildingCode(resumePlan.getPlanBuildingCode());
				putVO.setAdjustStopPlanTime(resumePlan.getStopPlanTime());
				putVO.setAdjustResumePlanTime(resumePlan.getResumePlanTime());
				List<OverlapRetVO> overlapList = workResumePlanService.overlap(putVO);
				if(!CollectionUtils.isEmpty(overlapList)){
					if(Objects.equals(overlapList.get(0).getStopType(),StopTypeEnum.SPRING_BREAK)){
						resumePlan.setStopType(StopTypeEnum.INCLUDE_SPRING_BREAK);
					}else{
						resumePlan.setStopType(StopTypeEnum.INCLUDE_WINTER_BREAK);
					}
					for (OverlapRetVO retVO : overlapList) {
						WorkResumePlan invalidPlan = new WorkResumePlan();
						invalidPlan.setId(retVO.getId());
						invalidPlan.setInvalid(InvalidEnum.OVERLAP_INVALID);
						resumePlanUpdateList.add(invalidPlan);
					}
				}
			}else{
				resumeRecord.setResumePlanId(resumePlan.getId());
				// 判断是否停工申请,是则不更新,因为存在的停工计划是复工申请,已包含停复工的计划停工复工时间和实际停工复工时间
				if(0==target.getStopOrResumeChose()){
					// 复工申请,更新计划复工时间,实际复工时间
					updateResumePlanByOld(target,resumePlan);
					resumePlanUpdateList.add(resumePlan);
					// 停工类型,判断是否与春节/冬歇期停工计划重叠
					OverlapPutVO putVO = new OverlapPutVO();
					putVO.setProjectId(target.getProjectId());
					putVO.setPlanBuildingCode(target.getPlanBuildingCode());
					putVO.setAdjustStopPlanTime(target.getStopStartOn());
					putVO.setAdjustResumePlanTime(Optional.ofNullable(target.getApplyResumeOn()).orElse(target.getStopEndOn()));
					List<OverlapRetVO> overlapList = workResumePlanService.overlap(putVO);
					if(!CollectionUtils.isEmpty(overlapList)){
						if(Objects.equals(overlapList.get(0).getStopType(),StopTypeEnum.SPRING_BREAK)){
							resumePlan.setStopType(StopTypeEnum.INCLUDE_SPRING_BREAK);
						}else{
							resumePlan.setStopType(StopTypeEnum.INCLUDE_WINTER_BREAK);
						}
						for (OverlapRetVO retVO : overlapList) {
							WorkResumePlan invalidPlan = new WorkResumePlan();
							invalidPlan.setId(retVO.getId());
							invalidPlan.setInvalid(InvalidEnum.OVERLAP_INVALID);
							resumePlanUpdateList.add(invalidPlan);
						}
					}
				}
			}
		}
		// 同一事务控制保存
		DefaultTransactionDefinition def1 = new DefaultTransactionDefinition();
		def1.setName("WorkResumeDtStopPlanDetailHandler");
		def1.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
		TransactionStatus status = planRebuildTransactionManager.getTransaction(def1);
		try{
			workResumeRecordService.saveBatch(resumeRecordList);
			workResumePlanService.saveBatch(resumePlanInsertList);
			workResumePlanService.updateBatchById(resumePlanUpdateList);
		}catch (Exception e){
			planRebuildTransactionManager.rollback(status);
			throw e;
		}
		planRebuildTransactionManager.commit(status);
		//3.对整批次的目标写入数据,构建整批成功写入结果
		//获取写入目标数据业务Id
		List<String> businessIds=targets.stream().map(t->t.getDetailId()).collect(Collectors.toList());
		List<DataTransferResult> result=buildBatchWriteResultSuccess(businessIds);
		return result;
	}
	
	private DataTransferResult<BuildingStopOrResumeDTO> buildProcessResult(BuildingStopOrResumeDTO detail,OldProject project, SysOrg oldRegion,BasicDataTree newRegion){
		DataTransferResult<BuildingStopOrResumeDTO> processResult = new DataTransferResult<>();
		String detailId = detail.getDetailId();
		try {
			if(project==null){
				throw new SystemRuntimeException("缺少项目信息");
			}
			if (oldRegion == null) {
				throw new SystemRuntimeException("缺少旧平台区域信息");
			}
			if(newRegion==null){
				throw new SystemRuntimeException("缺少新平台区域信息,"+oldRegion.getOrgCode()+","+oldRegion.getOrgName());
			}
			if(Objects.isNull(detail.getStopStartOn())){
				throw new SystemRuntimeException("计划停工时间为空");
			}
			if(Objects.isNull(detail.getStopEndOn())){
				throw new SystemRuntimeException("计划复工时间为空");
			}
			if(!detail.getStopStartOn().isBefore(Optional.ofNullable(detail.getApplyResumeOn()).orElse(detail.getStopEndOn()))){
				throw new SystemRuntimeException("计划停工时间必须小于计划复工时间");
			}
			
			detail.setAreaId(newRegion.getWwId());
			detail.setAreaCode(oldRegion.getOrgCode());
			detail.setAreaName(oldRegion.getOrgName());
			detail.setProjectName(project.getProjectName());
			
			//定义处理成功结果
			processResult.setData(detail);
			processResult.setBusinessId(detailId);
			processResult.setError("");
			processResult.setTraceId("");
			processResult.setStatus(DataTransferItemStatusEnum.SUCCESS);
		}catch (Exception e) {
			//生成追踪Id,并对异常日志输出
			String traceId=idGenerator.nextId("");
			//输出异常日志,并返回处理状态
			DataTransferItemStatusEnum status=super.handleException(e,traceId,"停复工_申请明细","数据处理");
			
			//定义处理失败结果
			processResult.setData(null);
			processResult.setBusinessId(detailId);
			processResult.setError(e.getMessage());
			processResult.setTraceId(traceId);
			processResult.setStatus(status);
		}
		return processResult;
	}
	
	private WorkResumeRecord buildWorkRecordByOld(BuildingStopOrResumeDTO target,List<StopReasonDTO> stopReasons){
		WorkResumeRecord resumeRecord = new WorkResumeRecord();
		resumeRecord.setId(target.getDetailId());
		resumeRecord.setApplyId(target.getWorkId());
		resumeRecord.setPlanBuildingCode(target.getPlanBuildingCode());
		resumeRecord.setAdjustStopPlanTime(target.getStopStartOn());
		resumeRecord.setAdjustResumePlanTime(target.getStopEndOn());
		resumeRecord.setOperation(1);
		if(Objects.nonNull(target.getSubWorkflow())){
			stopReasons.stream().filter(s-> Objects.equals(s.getAttr1(),target.getSubWorkflow())).findFirst().ifPresent(s->{
				resumeRecord.setReason(s.getNarrationCn());
				resumeRecord.setReasonType(1==target.getStopOrResumeChose()?1:2);
			});
		}
		resumeRecord.setCreateTime(target.getInsertOn());
		resumeRecord.setCreateUser(OldPlanUtils.getUserAccount(target.getInsertBy()));
		resumeRecord.setTenantId(TenantEnum.DEFAULT.getValue());
		return resumeRecord;
	}
	
	private WorkResumePlan buildResumePlanByOld(BuildingStopOrResumeDTO target,LocalDate now){
		WorkResumePlan resumePlan = new WorkResumePlan();
		resumePlan.setId(IdWorker.getIdStr());
		resumePlan.setAreaId(target.getAreaId());
		resumePlan.setAreaCode(target.getAreaCode());
		resumePlan.setAreaName(OldPlanUtils.getRegionName(target.getAreaName()));
		resumePlan.setProjectId(target.getProjectId());
		resumePlan.setProjectName(target.getProjectName());
		resumePlan.setPlanBuildingCode(target.getPlanBuildingCode());
		resumePlan.setPlanBuildingName(target.getPlanBuildingName());
		resumePlan.setStopPlanTime(target.getStopStartOn());
		resumePlan.setResumePlanTime(Optional.ofNullable(target.getApplyResumeOn()).orElse(target.getStopEndOn()));
		// 计划停工时间<=迁移当天,实际停工时间=计划停工时间
		if(now.compareTo(target.getStopStartOn())>=0){
			resumePlan.setStopRealTime(target.getStopStartOn());
		}
		resumePlan.setResumeRealTime(target.getResumeOn());
		resumePlan.setStopType(StopTypeEnum.OTHER_BREAK);
		// 计划状态
		resumePlan.setStatus(ResumeStatusEnum.PASS_RESUME_PLAN);
		if(Objects.nonNull(resumePlan.getResumeRealTime())){
			// 6-已复工,有实际复工时间
			resumePlan.setStatus(ResumeStatusEnum.RESUMED);
		}else if(Objects.nonNull(resumePlan.getStopRealTime())){
			// 4-停工:有实际停工时间,没有实际复工时间
			resumePlan.setStatus(ResumeStatusEnum.STOPPED);
		}
		resumePlan.setCreateTime(target.getInsertOn());
		resumePlan.setCreateUser(OldPlanUtils.getUserAccount(target.getInsertBy()));
		resumePlan.setUpdateTime(target.getUpdateOn());
		resumePlan.setUpdateUser(OldPlanUtils.getUserAccount(target.getUpdateBy()));
		resumePlan.setTenantId(TenantEnum.DEFAULT.getValue());
		// 停复工记录增加一个“是否长期停工”字段,根据计划复工时间是否为9990-01-01来初始化
		if(Objects.isNull(resumePlan.getResumeRealTime()) && Objects.nonNull(resumePlan.getResumePlanTime()) && resumePlan.getResumePlanTime().getYear() == longStopYear){
			resumePlan.setLongStop(YesOrNoEnum.YES);
		}
		return resumePlan;
	}
	
	private void updateResumePlanByOld(BuildingStopOrResumeDTO target,WorkResumePlan resumePlan){
		resumePlan.setResumePlanTime(target.getApplyResumeOn());
		resumePlan.setResumeRealTime(target.getResumeOn());
		resumePlan.setStopType(StopTypeEnum.OTHER_BREAK);
		// 计划状态
		resumePlan.setStatus(ResumeStatusEnum.PASS_RESUME_PLAN);
		if(Objects.nonNull(resumePlan.getResumeRealTime())){
			// 6-已复工,有实际复工时间
			resumePlan.setStatus(ResumeStatusEnum.RESUMED);
		}
		resumePlan.setUpdateTime(target.getUpdateOn());
		resumePlan.setUpdateUser(OldPlanUtils.getUserAccount(target.getUpdateBy()));
		// 停复工记录增加一个“是否长期停工”字段,根据计划复工时间是否为9990-01-01来初始化
		if(Objects.isNull(resumePlan.getResumeRealTime()) && Objects.nonNull(resumePlan.getResumePlanTime()) && resumePlan.getResumePlanTime().getYear() == longStopYear){
			resumePlan.setLongStop(YesOrNoEnum.YES);
		}
	}
}

WorkResumeDtConfirmDetailHandler

/**
 * 复工确认明细迁移处理器,实现类
 * @author xiejinwei02
 * @date 2023/8/11 8:48
 */
@RequiredArgsConstructor(onConstructor_={@Autowired})
@Slf4j
@Component
public class WorkResumeDtConfirmDetailHandler extends DataTransferHandlerSuper implements DataTransferHandler<ResumeWorkConfirmDetail, WorkResumeRecord> {
	@Resource
	private DataTransferBusinessProperties dataTransferBusinessProperties;
	@Resource
	ResumeWorkConfirmDetailService resumeWorkConfirmDetailService;
	@Resource
	private WorkResumeRecordService workResumeRecordService;
	
	@Autowired
	public void setIdGenerator(IdGenerator idGenerator) {
		this.idGenerator = idGenerator;
	}
	
	/**
	 * 读取来源处理事项总数(用于首次启动任务场景)
	 * @return 总数量
	 */
	@Override
	public int readTotal() {
		return resumeWorkConfirmDetailService.getTotal(dataTransferBusinessProperties.getWorkResumeConfirmDetail().getQuery());
	}
	
	/**
	 * 读取来源数据(用于首次启动任务场景)
	 * @param batchSize 批次大小
	 * @param batchNum 批次序号
	 * @return 来源数据列表.返回数量比批次数量少的,则为跳过处理,空:没有待处理数据
	 */
	@Override
	public List<DataTransferSource<ResumeWorkConfirmDetail>> read(int batchSize, int batchNum) {
		List<ResumeWorkConfirmDetail> details = resumeWorkConfirmDetailService.pageQuery(batchSize,batchNum,dataTransferBusinessProperties.getWorkResumeConfirmDetail().getQuery());
		if(CollectionUtils.isEmpty(details)){
			log.info("复工确认明细来源数据为空,退出处理");
			return new ArrayList<>();
		}
		//定义返回结果
		List<DataTransferSource<ResumeWorkConfirmDetail>> result = details.stream().map(d->{
			DataTransferSource<ResumeWorkConfirmDetail> source = new DataTransferSource<>();
			source.setBusinessId(d.getId());
			source.setData(d);
			return source;
		}).collect(Collectors.toList());
		return result;
	}
	
	/**
	 * 读取来源数据(用于重启任务场景)
	 * @param businessIds 业务Id列表
	 * @return 来源数据列表.返回数量比入参数量少的,则为跳过处理,空:没有待处理数据
	 */
	@Override
	public List<DataTransferSource<ResumeWorkConfirmDetail>> read(List<String> businessIds) {
		if(CollectionUtils.isEmpty(businessIds)){
			log.info("复工确认明细业务id为空,退出处理");
			return new ArrayList<>();
		}
		//固定使用id查询
		List<ResumeWorkConfirmDetail> details = resumeWorkConfirmDetailService.findByIds(businessIds);
		if(CollectionUtils.isEmpty(details)){
			log.info("复工确认明细来源数据为空,退出处理");
			return new ArrayList<>();
		}
		//定义返回结果
		List<DataTransferSource<ResumeWorkConfirmDetail>> result = details.stream().map(d->{
			DataTransferSource<ResumeWorkConfirmDetail> source = new DataTransferSource<>();
			source.setBusinessId(d.getId());
			source.setData(d);
			return source;
		}).collect(Collectors.toList());
		return result;
	}
	
	/**
	 * 数据处理
	 * @param sources 来源数据列表
	 * @return 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
	 */
	@Override
	public List<DataTransferResult<WorkResumeRecord>> process(List<DataTransferSource<ResumeWorkConfirmDetail>> sources) {
		// 1、排除已迁移的数据
		List<String> sourceDetailIds = sources.stream().map(s->s.getBusinessId()).collect(Collectors.toList());
		List<String> exitsDetailIds = workResumeRecordService.listObjs(new LambdaQueryWrapper<WorkResumeRecord>().select(WorkResumeRecord::getId).in(WorkResumeRecord::getId,sourceDetailIds), v->v.toString());
		// 过滤得到未迁移的来源数据
		List<DataTransferSource<ResumeWorkConfirmDetail>> useSources = sources.stream().filter(s-> !exitsDetailIds.contains(s.getBusinessId())).collect(Collectors.toList());
		if(org.apache.commons.collections4.CollectionUtils.isEmpty(useSources)){
			log.info("复工确认明细,待迁移的来源数据为空,退出处理");
			return new ArrayList<>();
		}
		
		// 2、构建处理结果
		List<DataTransferResult<WorkResumeRecord>> result = new ArrayList<>();
		useSources.stream().forEach(s->{
			ResumeWorkConfirmDetail detail = s.getData();
			// 查询停复工申请明细是否已存在
			WorkResumeRecord one = workResumeRecordService.getOne(new LambdaQueryWrapper<WorkResumeRecord>().select(WorkResumeRecord::getId, WorkResumeRecord::getResumePlanId)
					.eq(WorkResumeRecord::getId, detail.getBuildingStopOrResumeId()));
			result.add(buildProcessResult(detail, Optional.ofNullable(one).map(r->r.getResumePlanId()).orElse(null)));
		});
		
		return result;
	}
	
	/**
	 * 写入目标数据
	 * @param targets 目标数据列表.返回数量比入参数量少的,则为跳过处理.空:没有待处理数据
	 */
	@Override
	@Transactional(rollbackFor = RuntimeException.class,transactionManager = "plan-rebuildTransactionManager")
	public List<DataTransferResult> write(List<WorkResumeRecord> targets) {
		workResumeRecordService.saveBatch(targets);
		
		//2.对整批次的目标写入数据,构建整批成功写入结果
		//获取写入目标数据业务Id
		List<String> businessIds=targets.stream().map(t->t.getId()).collect(Collectors.toList());
		List<DataTransferResult> result=buildBatchWriteResultSuccess(businessIds);
		return result;
	}
	
	private DataTransferResult<WorkResumeRecord> buildProcessResult(ResumeWorkConfirmDetail detail,String resumePlanId){
		DataTransferResult<WorkResumeRecord> processResult = new DataTransferResult<>();
		String detailId = detail.getId();
		try{
			if(Objects.isNull(resumePlanId)){
				throw new SystemRuntimeException("找不到对应的停复工申请明细,workResumeRecordId:"+detail.getBuildingStopOrResumeId());
			}
			
			WorkResumeRecord resumeRecord = new WorkResumeRecord();
			resumeRecord.setId(detailId);
			resumeRecord.setApplyId(detail.getResumeWorkConfirmId());
			resumeRecord.setResumePlanId(resumePlanId);
			resumeRecord.setPlanBuildingCode(detail.getPlanBuildingCode());
			resumeRecord.setOperation(ResumeOperationEnum.RESUME_CONFIRM.getValue());
			resumeRecord.setCreateTime(detail.getInsertOn());
			resumeRecord.setCreateUser(OldPlanUtils.getUserAccount(detail.getInsertBy()));
			resumeRecord.setTenantId(TenantEnum.DEFAULT.getValue());
			
			//定义处理成功结果
			processResult.setData(resumeRecord);
			processResult.setBusinessId(detailId);
			processResult.setError("");
			processResult.setTraceId("");
			processResult.setStatus(DataTransferItemStatusEnum.SUCCESS);
		}catch (Exception e){
			//生成追踪Id,并对异常日志输出
			String traceId=idGenerator.nextId("");
			//输出异常日志,并返回处理状态
			DataTransferItemStatusEnum status=super.handleException(e,traceId,"复工确认明细","数据处理");
			
			//定义处理失败结果
			processResult.setData(null);
			processResult.setBusinessId(detailId);
			processResult.setError(e.getMessage());
			processResult.setTraceId(traceId);
			processResult.setStatus(status);
		}
		return processResult;
	}
}

时间段类TimeSegment

/**
 * @author xiejinwei02
 * @date 2023/7/11 20:09
 * 时间段重叠比较类,重写Comparable#compartTo()接口方法,定义规则,segment1在segment2的左测返回-1,segment1在segment2的右侧返回1,其他返回0,表示重叠
 */
public class TimeSegment implements Comparable {
	private Long start;
	private Long end;
	// 与其他时间段的重叠次数
	private Integer overlapCount = 0;
	
	public TimeSegment(Long start,Long end){
		this.start = start;
		this.end = end;
	}
	
	public Integer getOverlapCount(){
		return overlapCount;
	}
	
	
	@Override
	public int compareTo(Object o) {
		TimeSegment other =(TimeSegment) o;
		if(end < other.start){
			return -1;
		}else if(start > other.end){
			return 1;
		}
		overlapCount++;
		return 0;
	}
	
	/**
	 * 是否重叠
	 * @param other 另一个时间段
	 * @return
	 */
	public boolean isOverlap(TimeSegment other){
		return compareTo(other) == 0;
	}
}

表结构ER图

Post Directory