dataX中TaskGroupContainer向JobContianer汇报详解

汇报过程

汇报源码逻辑是在TaskGroupContainer#reportTaskGroupCommunication这个方法中,方法的两个形参分别为lastTaskGroupContainerCommunication为上次汇报的信息,每次做数据统计的时候需要将当前communication的数据和lastTaskGroupContainerCommunication进行合并;taskCount为该TaskGroup的所有的任务数。具体不走位

  1. 收集当前TaskGroupContainer对应所有Task的的communication,然后将其合并成一个communication。
    具体合并代码为步骤1,主要逻辑是在Communication#mergeFrom,它的主要功能将两communication的变量合并。主要关注下两个communication的状态合并,可以看到只要该TaskGroup中有一个Task的状态是FAILED或者KILLED就会将整个TaskGroup的状态标记为FAILED,当且仅当所有的任务的状态是SUCCEEDED,该TaskGroup的状态才能标记为SUCCEEDED(这很重要,这很重要,这很重要)。
  2. 生成新的reportCommunication作为该TaskGroupContainer上报给JobContianer的communication,
    主要是生成一些技术统计,比方说当前已经导入的记录数和字节数等。
  3. 上报给JobContianer,主要代码见步骤3,将该TaskGroupContainer最新的communication更新到StandAloneJobContainerCommunicator 能够够的得到的地方,即全局变量LocalTGCommunicationManager#taskGroupCommunicationMap中。
// TaskGroupContainer#reportTaskGroupCommunication
private Communication reportTaskGroupCommunication(Communication lastTaskGroupContainerCommunication, int taskCount){
    Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
    nowTaskGroupContainerCommunication.setTimestamp(System.currentTimeMillis());
    Communication reportCommunication = CommunicationTool.getReportCommunication(nowTaskGroupContainerCommunication,
        lastTaskGroupContainerCommunication, taskCount);
    this.containerCommunicator.report(reportCommunication);
    return reportCommunication;
}
步骤1
//AbstractCollector#collectFromTask
public Communication collectFromTask() {
    Communication communication = new Communication();
    communication.setState(State.SUCCEEDED);

    for (Communication taskCommunication :
            this.taskCommunicationMap.values()) {
    communication.mergeFrom(taskCommunication);
    }
    return communication;
}
Communication#mergeStateFrom
public synchronized State mergeStateFrom(final Communication otherComm) {
        State retState = this.getState();
        if (otherComm == null) {
            return retState;
        }

        if (this.state == State.FAILED || otherComm.getState() == State.FAILED
                || this.state == State.KILLED || otherComm.getState() == State.KILLED) {
            retState = State.FAILED;
        } else if (this.state.isRunning() || otherComm.state.isRunning()) {
            retState = State.RUNNING;
        }

        this.setState(retState);
        return retState;
}
步骤3
// StandaloneTGContainerCommunicator#report
public void report(Communication communication) {
    super.getReporter().reportTGCommunication(super.taskGroupId, communication);
}

// ProcessInnerReporter
public void reportTGCommunication(Integer taskGroupId, Communication communication) {
    LocalTGCommunicationManager.updateTaskGroupCommunication(taskGroupId, communication);
}

// LocalTGCommunicationManager#updateTaskGroupCommunication
public static void updateTaskGroupCommunication(final int taskGroupId,
                                                    final Communication communication) {
        Validate.isTrue(taskGroupCommunicationMap.containsKey(
                taskGroupId), String.format("taskGroupCommunicationMap中没有注册taskGroupId[%d]的Communication," +
                "无法更新该taskGroup的信息", taskGroupId));
        taskGroupCommunicationMap.put(taskGroupId, communication);
}

汇报时机

TaskGroupContainer向JobContainer汇报该TaskGroupContainer的执行情况的时机均在TaskGroupContainer#start中。

1、当前TaskGroup中有状态为FAILED或者KILLED的Task

如果一个Task只能执行一次(默认是1次,没有做重试)且该Task被标记为FAILED或者KILLED,马上将failedOrKilled这个变量标记为true并执行汇报逻辑。这种情况下除了汇报之后,还会抛出一个运行时异常,结束执行当前TaskGroupContainer的线程(TaskGroupContianer是在线程池中执行的)。

if (failedOrKilled) {
    lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);

    throw DataXException.asDataXException(
        FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
}
2、上次失败的Task仍未结束

如果一个Task标记为FAILED或者KILLED,但是有重试逻辑就不会执行上面第1步的逻辑,而是会调用当前的Task对应TaskExecutor#shutdown,关闭当前的TaskExecutor。在调用TaskExecutor#shutdown一段时间发发现给TaskExecutor还没有关闭,触发下面逻辑,进行汇报的同时抛出异常。

if(now - failedTime > taskMaxWaitInMsec){
    markCommunicationFailed(taskId);
    reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
    throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时");
}
3、TaskGroupContainer任务列表为空,所有任务都是成功执行, 搜集状态为SUCCEEDED

这个没什么好说的,该TaskGroup中所有的任务执行成功,该Job执行成功。

4、如果当前时间已经超出汇报时间的interval,那么我们需要马上汇报

可以理解为心跳了

5、TaskGroupContainer所在的线程正常结束时汇报一次

这个真没什么好说的了

JobContainer收到汇报之后怎么处理

JobContainer的处理逻辑是在dataX所在JVM的主线程中,具体是在AbstractScheduler#schedule中。

  1. 每隔一段时间,合并所有TaskGoupContianer汇报的信息,具体合并的逻辑和TaskGoupContianer合并Task的汇报信息差不多;
  2. 正常结束就正常退出;
  3. 处理isJobKilling,StandAloneScheduler并没有提供kill接口,咱不管;
  4. 重点关注下FAILED的逻辑,直接关闭当前Scheduler的线程池并在主线程中抛出异常,整个dataX进程退出。
// AbstractScheduler#schedule
public void schedule(List<Configuration> configurations) {
        ...
        ...
        Communication lastJobContainerCommunication = new Communication();

        long lastReportTimeStamp = System.currentTimeMillis();
        try {
            while (true) {
                Communication nowJobContainerCommunication = this.containerCommunicator.collect();
                nowJobContainerCommunication.setTimestamp(System.currentTimeMillis());
                LOG.debug(nowJobContainerCommunication.toString());

                //汇报周期
                long now = System.currentTimeMillis();
                if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {
                    Communication reportCommunication = CommunicationTool
                            .getReportCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);

                    this.containerCommunicator.report(reportCommunication);
                    lastReportTimeStamp = now;
                    lastJobContainerCommunication = nowJobContainerCommunication;
                }

                errorLimit.checkRecordLimit(nowJobContainerCommunication);

                if (nowJobContainerCommunication.getState() == State.SUCCEEDED) {
                    LOG.info("Scheduler accomplished all tasks.");
                    break;
                }

                if (isJobKilling(this.getJobId())) {
                    dealKillingStat(this.containerCommunicator, totalTasks);
                } else if (nowJobContainerCommunication.getState() == State.FAILED) {
                    dealFailedStat(this.containerCommunicator, nowJobContainerCommunication.getThrowable());
                }

                Thread.sleep(jobSleepIntervalInMillSec);
            }
        } catch (InterruptedException e) {
            // 以 failed 状态退出
            LOG.error("捕获到InterruptedException异常!", e);

            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR, e);
        }

    }
    
    // ProcessInnerScheduler#dealFailedStat
    public void dealFailedStat(AbstractContainerCommunicator frameworkCollector, Throwable throwable) {
        this.taskGroupContainerExecutorService.shutdownNow();
        throw DataXException.asDataXException(
                FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, throwable);
    }

dataX进程退出逻辑

从上面的汇报逻辑可以梳理一下dataX以StandAlone模式运行时的退出逻辑。

  1. 如果所有任务正常结束那么dataX正常退出;
  2. 如果JobContianer中所有的任务组中有任何一个任务的状态为FAILED或者KILLED那么JobContainer在适当的时间点会感知到这一点,感知到之后会在主线程中强制关闭线程池并抛出异常退出JVM。

这种关闭相对来说比较暴力了,已经导入到目的库的数据无法rollback,其它在线程池中正常执行的TaskGroupContainer也会被强制关闭。如果想重新导入只有人工删数据了(当然可以在Wirter配置中配置preSql来执行删表了),当然其它类似的工具也没有应对这种问题的解决方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容