diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java index da62e4092..e20612913 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperations.java @@ -51,7 +51,7 @@ import java.util.function.Supplier; @Slf4j -public class DAGOperations { +public class DAGOperations implements DAGOperationsInterface { private static final String EXECUTION_ID = "executionId"; private final ExecutorService runnerExecutor; diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperationsInterface.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperationsInterface.java new file mode 100644 index 000000000..e4ea2c464 --- /dev/null +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGOperationsInterface.java @@ -0,0 +1,18 @@ +package com.weibo.rill.flow.olympicene.traversal; + +import com.weibo.rill.flow.interfaces.model.task.TaskInfo; +import com.weibo.rill.flow.olympicene.core.model.NotifyInfo; +import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo; +import com.weibo.rill.flow.olympicene.core.model.dag.DAGInvokeMsg; +import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.Collection; +import java.util.Map; + +public interface DAGOperationsInterface { + void finishDAG(String executionId, DAGInfo dagInfo, DAGStatus dagStatus, DAGInvokeMsg dagInvokeMsg); + void finishTaskAsync(String executionId, String taskCategory, NotifyInfo notifyInfo, Map output); + void finishTaskSync(String executionId, String taskCategory, NotifyInfo notifyInfo, Map output); + void runTasks(String executionId, Collection>> taskInfoToContexts); +} diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java index 73b16e4db..c350133e8 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/DAGTraversal.java @@ -56,7 +56,7 @@ public class DAGTraversal { private final DAGStorageProcedure dagStorageProcedure; private final ExecutorService traversalExecutor; @Setter - private DAGOperations dagOperations; + private DAGOperationsInterface dagOperations; @Setter private Stasher stasher;