diff --git a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/FunctionTaskRunner.java b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/FunctionTaskRunner.java index 8ca565044..95e2aa695 100644 --- a/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/FunctionTaskRunner.java +++ b/rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/FunctionTaskRunner.java @@ -22,30 +22,29 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.weibo.rill.flow.olympicene.core.lock.LockerKey; -import com.weibo.rill.flow.interfaces.model.strategy.DispatchInfo; -import com.weibo.rill.flow.olympicene.core.model.NotifyInfo; import com.weibo.rill.flow.interfaces.model.mapping.Mapping; +import com.weibo.rill.flow.interfaces.model.strategy.DispatchInfo; import com.weibo.rill.flow.interfaces.model.strategy.Retry; +import com.weibo.rill.flow.interfaces.model.task.*; +import com.weibo.rill.flow.olympicene.core.lock.LockerKey; +import com.weibo.rill.flow.olympicene.core.model.NotifyInfo; import com.weibo.rill.flow.olympicene.core.model.strategy.RetryContext; -import com.weibo.rill.flow.olympicene.traversal.strategy.RetryPolicy; -import com.weibo.rill.flow.olympicene.traversal.strategy.SimpleRetryPolicy; -import com.weibo.rill.flow.interfaces.model.task.FunctionPattern; -import com.weibo.rill.flow.interfaces.model.task.FunctionTask; +import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult; import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory; import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage; import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage; import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure; import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager; -import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil; +import com.weibo.rill.flow.olympicene.ddl.serialize.ObjectMapperFactory; import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode; import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher; import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException; import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper; -import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult; import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping; import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer; -import com.weibo.rill.flow.interfaces.model.task.*; +import com.weibo.rill.flow.olympicene.traversal.strategy.RetryPolicy; +import com.weibo.rill.flow.olympicene.traversal.strategy.SimpleRetryPolicy; +import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -295,6 +294,7 @@ public ExecutionResult finish(String executionId, NotifyInfo notifyInfo, Map needUpdateContext = t -> !t.isFailed() || CollectionUtils.isNotEmpty(functionTask.getSuccessConditions()) || CollectionUtils.isNotEmpty(functionTask.getFailConditions()); @@ -304,6 +304,19 @@ public ExecutionResult finish(String executionId, NotifyInfo notifyInfo, Map output) { + if (!switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT")) { + return; + } + TaskInvokeMsg taskInvokeMsg = notifyInfo.getTaskInvokeMsg(); + if (taskInvokeMsg == null) { + taskInvokeMsg = buildInvokeMsg(ObjectMapperFactory.getJSONMapper().convertValue(output, JsonNode.class)); + } else { + taskInvokeMsg.setOutput(output); + } + notifyInfo.setTaskInvokeMsg(taskInvokeMsg); + } + private ExecutionResult executionCallback(String executionId, TaskInfo taskInfo, NotifyInfo notifyInfo, Map output, Function needUpdateContext) { RetryContext retryContext = RetryContext.builder().retryConfig(((FunctionTask) taskInfo.getTask()).getRetry()) diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy index a533fe416..2b56121de 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/InvokeMsgTest.groovy @@ -32,6 +32,10 @@ class InvokeMsgTest extends Specification { SwitcherManager switcherManager = Mock(SwitcherManager.class) Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager) + def setup() { + switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true + } + def "big flow taskInfo and small flow dagInfo and small flow taskInfo invokeMsg"() { given: String bigFlowYaml = "version: 0.0.1\n" + @@ -122,7 +126,8 @@ class InvokeMsgTest extends Specification { ((DAGCallbackInfo) event.getData()).getDagInfo().getExecutionId() == "bigFlow" && ((DAGCallbackInfo) event.getData()).getDagInfo().getDagInvokeMsg().getCode() == "code" && ((DAGCallbackInfo) event.getData()).getDagInfo().getDagInvokeMsg().getMsg() == "msg" && - ((DAGCallbackInfo) event.getData()).getDagInfo().getDagInvokeMsg().getExt() == ['ext':'info'] + ((DAGCallbackInfo) event.getData()).getDagInfo().getDagInvokeMsg().getExt() == ['ext':'info'] && + ((DAGCallbackInfo) event.getData()).getDagInfo().getTask("A").getTaskInvokeMsg().getOutput() == ['flow_root_execution_id':'bigFlow', 'segments':['gopUrl']] }) } } diff --git a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy index 4eb7e51b9..9d9363b82 100644 --- a/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy +++ b/rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/TimeCheckerTest.groovy @@ -37,10 +37,11 @@ class TimeCheckerTest extends Specification { DefaultTimeChecker timeChecker = new DefaultTimeChecker() Olympicene olympicene String executionId = 'executionId' + SwitcherManager switcherManager = Mock(SwitcherManager.class) def setup() { timeChecker.redisClient = Mock(RedisClient.class) - SwitcherManager switcherManager = Mock(SwitcherManager.class) + switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager) timeChecker.timeCheckRunner = olympicene.dagOperations.timeCheckRunner } @@ -311,6 +312,7 @@ class TimeCheckerTest extends Specification { def "dag timeout config value check"() { given: + switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> false String text = "version: 0.0.1\n" + "namespace: olympicene\n" + "service: mca\n" +