Skip to content

Commit

Permalink
fix switch task in sub task (#82)
Browse files Browse the repository at this point in the history
* fix switch task in sub task

---------

Co-authored-by: zeyu10 <[email protected]>
  • Loading branch information
techloghub and zeyu10 authored Aug 29, 2024
1 parent fbe6082 commit 7b981e0
Show file tree
Hide file tree
Showing 3 changed files with 609 additions and 439 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.helper.DAGWalkHelper;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.model.task.Switch;
import com.weibo.rill.flow.olympicene.core.model.task.SwitchTask;
Expand Down Expand Up @@ -131,8 +132,12 @@ private static class DefaultSwitch {
*/
private static boolean calculateCondition(TaskInfo taskInfo, Map<String, Object> input, Switch switchObj,
Set<String> skipTaskNames, Set<String> runTaskNames, DefaultSwitch defaultSwitch) {
Set<String> nextTaskNames = Arrays.stream(switchObj.getNext().split(",")).map(String::trim)
.filter(StringUtils::isNotBlank).collect(Collectors.toSet());
DAGWalkHelper dagWalkHelper = DAGWalkHelper.getInstance();
boolean isAncestorTask = dagWalkHelper.isAncestorTask(taskInfo.getName());
String rootName = isAncestorTask? null: dagWalkHelper.getRootName(taskInfo.getName());
Set<String> nextTaskNames = Arrays.stream(switchObj.getNext().split(",")).map(String::trim).filter(StringUtils::isNotBlank)
.map(it -> isAncestorTask ? it: dagWalkHelper.buildTaskInfoName(rootName, it))
.collect(Collectors.toSet());
boolean condition = judgeCondition(taskInfo, input, switchObj, defaultSwitch);

if (!condition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,92 @@ class ReturnTaskTraversalTest extends Specification {
})
}

def "if return task is in sub task"() {
given:
String text = "workspace: default\n" +
"dagName: switchSubTask\n" +
"alias: release\n" +
"type: flow\n" +
"inputSchema: '[]'\n" +
"tasks:\n" +
" - name: foreach\n" +
" description: ''\n" +
" synchronization:\n" +
" conditions: []\n" +
" iterationMapping:\n" +
" item: info\n" +
" index: index\n" +
" collection: \$.input.infos_array\n" +
" inputMappings:\n" +
" - transform: return seq.list(0, 1, 2);\n" +
" target: \$.input.infos_array\n" +
" category: foreach\n" +
" tasks:\n" +
" - name: A\n" +
" description: ''\n" +
" category: pass\n" +
" title: ''\n" +
" - name: B\n" +
" description: ''\n" +
" category: pass\n" +
" title: ''\n" +
" - name: returnA\n" +
" category: return\n" +
" title: ''\n" +
" description: ''\n" +
" inputMappings:\n" +
" - source: \$.context.index\n" +
" target: \$.input.index\n" +
" conditions:\n" +
" - \$.input.[?(@.index == 1)]\n" +
" next: A\n" +
" - name: returnB\n" +
" category: return\n" +
" title: ''\n" +
" description: ''\n" +
" inputMappings:\n" +
" - source: \$.context.index\n" +
" target: \$.input.index\n" +
" conditions:\n" +
" - \$.input.[?(@.index == 0)]\n" +
" next: B\n"
DAG dag = dagParser.parse(text)

when:
olympicene.submit("executionIdSuccess", dag, ['input':15])

then:
noExceptionThrown()
1 * callback.onEvent({
Event event -> {
event.eventCode == DAGEvent.TASK_FINISH.getCode() && event.getData() instanceof DAGCallbackInfo
&& ((DAGCallbackInfo) event.getData()).getTaskInfo().getName().equals("foreach_0-A")
&& ((DAGCallbackInfo) event.getData()).getTaskInfo().getTaskStatus() == TaskStatus.SUCCEED
}
})
1 * callback.onEvent({
Event event -> {
event.eventCode == DAGEvent.TASK_SKIPPED.getCode() && event.getData() instanceof DAGCallbackInfo
&& ((DAGCallbackInfo) event.getData()).getTaskInfo().getName().equals("foreach_0-B")
&& ((DAGCallbackInfo) event.getData()).getTaskInfo().getTaskStatus() == TaskStatus.SKIPPED
}
})
1 * callback.onEvent({
Event event -> {
event.eventCode == DAGEvent.TASK_SKIPPED.getCode() && event.getData() instanceof DAGCallbackInfo
&& ((DAGCallbackInfo) event.getData()).getTaskInfo().getName().equals("foreach_1-A")
&& ((DAGCallbackInfo) event.getData()).getTaskInfo().getTaskStatus() == TaskStatus.SKIPPED
}
})
1 * callback.onEvent({
Event event -> {
event.eventCode == DAGEvent.TASK_FINISH.getCode() && event.getData() instanceof DAGCallbackInfo
&& ((DAGCallbackInfo) event.getData()).getTaskInfo().getName().equals("foreach_1-B")
&& ((DAGCallbackInfo) event.getData()).getTaskInfo().getTaskStatus() == TaskStatus.SUCCEED
}
})
}

def "if return task status is skip then next tasks status should not be skip"() {
given:
String text = "version: 0.0.1\n" +
Expand Down
Loading

0 comments on commit 7b981e0

Please sign in to comment.