Skip to content

Commit

Permalink
add output into TaskInvokeMsg when function finish (#77)
Browse files Browse the repository at this point in the history
* add output into TaskInvokeMsg when function finish


---------

Co-authored-by: zeyu10 <[email protected]>
  • Loading branch information
techloghub and zeyu10 authored Aug 7, 2024
1 parent 98a347f commit 33f3270
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,29 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.weibo.rill.flow.olympicene.core.lock.LockerKey;
import com.weibo.rill.flow.interfaces.model.strategy.DispatchInfo;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.strategy.DispatchInfo;
import com.weibo.rill.flow.interfaces.model.strategy.Retry;
import com.weibo.rill.flow.interfaces.model.task.*;
import com.weibo.rill.flow.olympicene.core.lock.LockerKey;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.strategy.RetryContext;
import com.weibo.rill.flow.olympicene.traversal.strategy.RetryPolicy;
import com.weibo.rill.flow.olympicene.traversal.strategy.SimpleRetryPolicy;
import com.weibo.rill.flow.interfaces.model.task.FunctionPattern;
import com.weibo.rill.flow.interfaces.model.task.FunctionTask;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil;
import com.weibo.rill.flow.olympicene.ddl.serialize.ObjectMapperFactory;
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer;
import com.weibo.rill.flow.interfaces.model.task.*;
import com.weibo.rill.flow.olympicene.traversal.strategy.RetryPolicy;
import com.weibo.rill.flow.olympicene.traversal.strategy.SimpleRetryPolicy;
import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
Expand Down Expand Up @@ -295,6 +294,7 @@ public ExecutionResult finish(String executionId, NotifyInfo notifyInfo, Map<Str

FunctionTask functionTask = (FunctionTask) taskInfo.getTask();
notifyInfo.setTaskStatus(taskTypeStatus(output, functionTask, notifyInfo.getTaskStatus()));
setOutputIntoTaskInvokeMsg(notifyInfo, output);
Function<TaskStatus, Boolean> needUpdateContext = t -> !t.isFailed()
|| CollectionUtils.isNotEmpty(functionTask.getSuccessConditions())
|| CollectionUtils.isNotEmpty(functionTask.getFailConditions());
Expand All @@ -304,6 +304,19 @@ public ExecutionResult finish(String executionId, NotifyInfo notifyInfo, Map<Str
return executionRef.get();
}

private void setOutputIntoTaskInvokeMsg(NotifyInfo notifyInfo, Map<String, Object> output) {
if (!switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT")) {
return;
}
TaskInvokeMsg taskInvokeMsg = notifyInfo.getTaskInvokeMsg();
if (taskInvokeMsg == null) {
taskInvokeMsg = buildInvokeMsg(ObjectMapperFactory.getJSONMapper().convertValue(output, JsonNode.class));
} else {
taskInvokeMsg.setOutput(output);
}
notifyInfo.setTaskInvokeMsg(taskInvokeMsg);
}

private ExecutionResult executionCallback(String executionId, TaskInfo taskInfo, NotifyInfo notifyInfo,
Map<String, Object> output, Function<TaskStatus, Boolean> needUpdateContext) {
RetryContext retryContext = RetryContext.builder().retryConfig(((FunctionTask) taskInfo.getTask()).getRetry())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class InvokeMsgTest extends Specification {
SwitcherManager switcherManager = Mock(SwitcherManager.class)
Olympicene olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, Mock(DefaultTimeChecker.class), switcherManager)

def setup() {
switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true
}

def "big flow taskInfo and small flow dagInfo and small flow taskInfo invokeMsg"() {
given:
String bigFlowYaml = "version: 0.0.1\n" +
Expand Down Expand Up @@ -122,7 +126,8 @@ class InvokeMsgTest extends Specification {
((DAGCallbackInfo) event.getData()).getDagInfo().getExecutionId() == "bigFlow" &&
((DAGCallbackInfo) event.getData()).getDagInfo().getDagInvokeMsg().getCode() == "code" &&
((DAGCallbackInfo) event.getData()).getDagInfo().getDagInvokeMsg().getMsg() == "msg" &&
((DAGCallbackInfo) event.getData()).getDagInfo().getDagInvokeMsg().getExt() == ['ext':'info']
((DAGCallbackInfo) event.getData()).getDagInfo().getDagInvokeMsg().getExt() == ['ext':'info'] &&
((DAGCallbackInfo) event.getData()).getDagInfo().getTask("A").getTaskInvokeMsg().getOutput() == ['flow_root_execution_id':'bigFlow', 'segments':['gopUrl']]
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ class TimeCheckerTest extends Specification {
DefaultTimeChecker timeChecker = new DefaultTimeChecker()
Olympicene olympicene
String executionId = 'executionId'
SwitcherManager switcherManager = Mock(SwitcherManager.class)

def setup() {
timeChecker.redisClient = Mock(RedisClient.class)
SwitcherManager switcherManager = Mock(SwitcherManager.class)
switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> true
olympicene = OlympiceneFacade.build(dagStorage, dagStorage, callback, dispatcher, dagStorageProcedure, timeChecker, switcherManager)
timeChecker.timeCheckRunner = olympicene.dagOperations.timeCheckRunner
}
Expand Down Expand Up @@ -311,6 +312,7 @@ class TimeCheckerTest extends Specification {

def "dag timeout config value check"() {
given:
switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT") >> false
String text = "version: 0.0.1\n" +
"namespace: olympicene\n" +
"service: mca\n" +
Expand Down

0 comments on commit 33f3270

Please sign in to comment.