Skip to content

Commit

Permalink
add switch task and switch task runner (#72)
Browse files Browse the repository at this point in the history
* add switch task and switch task runner

---------

Co-authored-by: zeyu10 <[email protected]>
  • Loading branch information
techloghub and zeyu10 authored Aug 7, 2024
1 parent 33f3270 commit c09f3ec
Show file tree
Hide file tree
Showing 17 changed files with 959 additions and 36 deletions.
45 changes: 45 additions & 0 deletions docs/samples/switch-sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
version: 0.0.1
type: flow
workspace: rillFlowSample
dagName: switchSample
tasks:
- category: pass
name: startNode
next: randomNode
- category: pass
name: randomNode
outputMappings:
- transform: " return rand(source) + 1; "
target: $.context.random_num
source: $.context.input_num
next: choice
- category: switch
name: choice
switches:
- next: choiceOnePass
condition: $.input.[?(@.num >= 5)]
break: true
- next: choiceTwoPass
condition: default
inputMappings:
- target: $.input.num
source: $.context.random_num
- category: pass
name: choiceOnePass
outputMappings:
- target: $.context.choice_num
transform: "return 10 * source;"
source: $.context.random_num
next: printNode
- category: pass
name: choiceTwoPass
next: printNode
outputMappings:
- target: $.context.choice_num
transform: "return -1 * source;"
source: $.context.random_num
- category: pass
name: printNode
outputMappings:
- target: $.context.result
source: $.context.choice_num
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021-2023 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.rill.flow.olympicene.core.model.task;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.Objects;

@Getter
@Setter
@NoArgsConstructor
public class Switch {
String condition;
String next;
boolean isBreak;

@JsonCreator
public Switch(
@JsonProperty("condition") String condition,
@JsonProperty("next") String next,
@JsonProperty("break") Boolean isBreak
) {
this.condition = condition;
this.next = next;
this.isBreak = Objects.requireNonNullElse(isBreak, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2021-2023 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.rill.flow.olympicene.core.model.task;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.strategy.Degrade;
import com.weibo.rill.flow.interfaces.model.strategy.Progress;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import lombok.Getter;
import lombok.Setter;

import java.util.*;
import java.util.stream.Collectors;

@Getter
@Setter
@JsonTypeName("switch")
public class SwitchTask extends BaseTask {

List<Switch> switches;

@JsonCreator
public SwitchTask(
@JsonProperty("name") String name,
@JsonProperty("title") String title,
@JsonProperty("description") String description,
@JsonProperty("category") String category,
@JsonProperty("inputMappings") List<Mapping> inputMappings,
@JsonProperty("switches") List<Switch> switches,
@JsonProperty("progress") Progress progress,
@JsonProperty("degrade") Degrade degrade,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, null, false, inputMappings, null, progress,
degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
this.switches = switches;
this.setNext(switches.stream().map(Switch::getNext).collect(Collectors.joining(",")));
}

@Override
public List<BaseTask> subTasks() {
return new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public enum TaskCategory {
// 流程控制Task,执行分支语句
CHOICE("choice", 1),

// 流程控制Task,执行分支语句
SWITCH("switch", 1),

// 流程控制Task,执行循环语句
FOREACH("foreach", 1),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class ObjectMapperFactory {
new NamedType(ForeachTask.class, "foreach"),
new NamedType(PassTask.class, "pass"),
new NamedType(SuspenseTask.class, "suspense"),
new NamedType(ReturnTask.class, "return")
new NamedType(ReturnTask.class, "return"),
new NamedType(SwitchTask.class, "switch")
);
}

Expand All @@ -69,7 +70,8 @@ public class ObjectMapperFactory {
new NamedType(ForeachTask.class, "foreach"),
new NamedType(PassTask.class, "pass"),
new NamedType(SuspenseTask.class, "suspense"),
new NamedType(ReturnTask.class, "return")
new NamedType(ReturnTask.class, "return"),
new NamedType(SwitchTask.class, "switch")
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import java.util.Map;

public class NotSupportedTaskValidator implements TaskValidator<BaseTask> {
private final List<Class<?>> supportedTasks = List.of(FunctionTask.class, SuspenseTask.class, PassTask.class, ReturnTask.class, ForeachTask.class, ChoiceTask.class);
private final List<Class<?>> supportedTasks = List.of(FunctionTask.class, SuspenseTask.class, PassTask.class,
ReturnTask.class, ForeachTask.class, ChoiceTask.class, SwitchTask.class);

@Override
public void validate(BaseTask task, Map<String, BaseResource> resourceMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ public SuspenseTaskRunner suspenseTaskRunner(
return new SuspenseTaskRunner(inputOutputMapping, dagInfoStorage, dagContextStorage, dagStorageProcedure, switcherManager);
}

@Bean
@ConditionalOnMissingBean(name = "switchTaskRunner")
public SwitchTaskRunner switchTaskRunner(
@Autowired @Qualifier("dagContextStorage") DAGContextStorage dagContextStorage,
@Autowired @Qualifier("dagInfoStorage") DAGInfoStorage dagInfoStorage,
@Autowired @Qualifier("dagStorageProcedure") DAGStorageProcedure dagStorageProcedure,
@Autowired @Qualifier("inputOutputMapping") JSONPathInputOutputMapping inputOutputMapping,
@Autowired SwitcherManager switcherManager) {
log.info("begin to init default switchTaskRunner bean");
return new SwitchTaskRunner(inputOutputMapping, dagInfoStorage, dagContextStorage, dagStorageProcedure, switcherManager);
}

@Bean
@ConditionalOnMissingBean(name = "returnTaskRunner")
public ReturnTaskRunner returnTaskRunner(
Expand Down Expand Up @@ -240,7 +252,8 @@ public Map<String, TaskRunner> taskRunners(
@Autowired @Qualifier("suspenseTaskRunner") SuspenseTaskRunner suspenseTaskRunner,
@Autowired @Qualifier("returnTaskRunner") ReturnTaskRunner returnTaskRunner,
@Autowired @Qualifier("foreachTaskRunner") ForeachTaskRunner foreachTaskRunner,
@Autowired @Qualifier("choiceTaskRunner") ChoiceTaskRunner choiceTaskRunner) {
@Autowired @Qualifier("choiceTaskRunner") ChoiceTaskRunner choiceTaskRunner,
@Autowired @Qualifier("switchTaskRunner") SwitchTaskRunner switchTaskRunner) {
log.info("begin to init default Map<TaskCategory, TaskRunner> bean");
Map<String, TaskRunner> taskRunners = Maps.newConcurrentMap();
taskRunners.put(TaskCategory.FUNCTION.getValue(), functionTaskRunner);
Expand All @@ -249,6 +262,7 @@ public Map<String, TaskRunner> taskRunners(
taskRunners.put(TaskCategory.RETURN.getValue(), returnTaskRunner);
taskRunners.put(TaskCategory.FOREACH.getValue(), foreachTaskRunner);
taskRunners.put(TaskCategory.CHOICE.getValue(), choiceTaskRunner);
taskRunners.put(TaskCategory.SWITCH.getValue(), switchTaskRunner);
return taskRunners;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class DAGInfoDAOTest extends Specification {
args.subList(0, 13) == ['172800', '_placeholder_', '{"workspace":"workspace","dagName":"dagName","version":"1.0.0","type":"flow","tasks":[]}', '_placeholder_', 'execution_id', '"executionId"', 'dag_status', '"not_started"', '@class_#A', 'com.weibo.rill.flow.interfaces.model.task.TaskInfo', '@class_dag_status', 'com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus', 'dag'] &&
args.get(13).startsWith('"dag_descriptor_') &&
args.subList(14, 17) == ['@class_dag', 'java.lang.String', '#A'] &&
args.get(17).endsWith('"name":"A","next":[],"children":{},"dependencies":[]}') &&
args.get(17).endsWith('"name":"A","next":[],"children":{},"dependencies":[],"skip_next_task_names":[]}') &&
args.subList(18, 20) == ['@class_execution_id', 'java.lang.String']
}
)
Expand Down Expand Up @@ -104,9 +104,9 @@ class DAGInfoDAOTest extends Specification {
args.subList(0, 13) == ['172800', '_placeholder_', '{"workspace":"workspace","dagName":"dagName","version":"1.0.0","type":"flow","tasks":[]}', '_placeholder_', 'execution_id', '"executionId"', 'dag_status', '"not_started"', '@class_#A', 'com.weibo.rill.flow.interfaces.model.task.TaskInfo', '@class_dag_status', 'com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus', 'dag'] &&
args.get(13).startsWith('"dag_descriptor_') &&
args.subList(14, 17) == ['@class_dag', 'java.lang.String', '#A'] &&
args.get(17).endsWith('"name":"A","next":[],"children":{},"dependencies":[]}') &&
args.get(17).endsWith('"name":"A","next":[],"children":{},"dependencies":[],"skip_next_task_names":[]}') &&
args.subList(18, 24) == ['@class_execution_id', 'java.lang.String', '_placeholder_', '@class_#A1', 'com.weibo.rill.flow.interfaces.model.task.TaskInfo', '#A1'] &&
args.get(24).endsWith('"name":"A1","next":[],"children":{},"dependencies":[]}') &&
args.get(24).endsWith('"name":"A1","next":[],"children":{},"dependencies":[],"skip_next_task_names":[]}') &&
args.subList(25, 28) == ['_placeholder_', 'A', 'sub_task_executionId_A']
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
package com.weibo.rill.flow.olympicene.traversal.config;

import com.google.common.collect.Maps;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.event.Callback;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.result.DAGResultHandler;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
import com.weibo.rill.flow.olympicene.traversal.DAGOperations;
import com.weibo.rill.flow.olympicene.traversal.DAGTraversal;
import com.weibo.rill.flow.olympicene.traversal.Olympicene;
Expand Down Expand Up @@ -88,6 +87,7 @@ public static Map<String, TaskRunner> buildTaskRunners(DAGInfoStorage dagInfoSto
ForeachTaskRunner foreachTaskRunner = new ForeachTaskRunner(mapping, jsonPath, dagContextStorage, dagInfoStorage, dagStorageProcedure, switcherManager);
foreachTaskRunner.setStasher(stasher);
ChoiceTaskRunner choiceTaskRunner = new ChoiceTaskRunner(mapping, dagContextStorage, dagInfoStorage, dagStorageProcedure, switcherManager);
SwitchTaskRunner switchTaskRunner = new SwitchTaskRunner(mapping, dagInfoStorage, dagContextStorage, dagStorageProcedure, switcherManager);

Map<String, TaskRunner> runners = Maps.newConcurrentMap();
runners.put(TaskCategory.FUNCTION.getValue(), functionTaskRunner);
Expand All @@ -96,6 +96,7 @@ public static Map<String, TaskRunner> buildTaskRunners(DAGInfoStorage dagInfoSto
runners.put(TaskCategory.SUSPENSE.getValue(), suspenseTaskRunner);
runners.put(TaskCategory.PASS.getValue(), passTaskRunner);
runners.put(TaskCategory.RETURN.getValue(), returnTaskRunner);
runners.put(TaskCategory.SWITCH.getValue(), switchTaskRunner);
return runners;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,14 @@ public void mapping(Map<String, Object> context, Map<String, Object> input, Map<
map.put("output", output);

List<Mapping> mappingRules = rules.stream()
.filter(rule -> StringUtils.isNoneBlank(rule.getSource()) && StringUtils.isNoneBlank(rule.getTarget()))
.filter(rule -> (StringUtils.isNotBlank(rule.getSource()) || StringUtils.isNotBlank(rule.getTransform()))
&& StringUtils.isNotBlank(rule.getTarget()))
.toList();
for (Mapping mapping : mappingRules) {
boolean intolerance = mapping.getTolerance() != null && !mapping.getTolerance();
try {
String source = mapping.getSource();
Object sourceValue = null;
String[] infos = source.split("\\.");
if (source.startsWith("$.tasks.") && infos.length > 3) {
String taskName = infos[2];
String key = infos[3];
if (key.equals("trigger_url") || key.startsWith("trigger_url?")) {
sourceValue = serverHost + rillFlowFunctionTriggerUri + "?execution_id=" + context.get("flow_execution_id") + "&task_name=" + taskName;
String[] queryInfos = source.split("\\?");
if (queryInfos.length > 0) {
sourceValue += '&' + queryInfos[1];
}
}
} else {
sourceValue = source.startsWith("$") ? getValue(map, source) : parseSource(source);
}

Object sourceValue = calculateSourceValue((String) context.get("flow_execution_id"), source, map);
Object transformedValue = transformSourceValue(sourceValue, context, input, output, mapping.getTransform());

if (transformedValue != null) {
Expand All @@ -97,6 +83,32 @@ public void mapping(Map<String, Object> context, Map<String, Object> input, Map<
}
}

private Object calculateSourceValue(String executionId, String source, Map<String, Object> map) {
if (StringUtils.isBlank(source)) {
return null;
}
String[] infos = source.split("\\.");
if (source.startsWith("$.tasks.") && infos.length > 3) {
String taskName = infos[2];
String key = infos[3];
return tryToGenerateTriggerUrl(executionId, source, key, taskName);
} else {
return source.startsWith("$") ? getValue(map, source) : parseSource(source);
}
}

private Object tryToGenerateTriggerUrl(String executionId, String source, String key, String taskName) {
if (!key.equals("trigger_url") && !key.startsWith("trigger_url?")) {
return null;
}
Object sourceValue = serverHost + rillFlowFunctionTriggerUri + "?execution_id=" + executionId + "&task_name=" + taskName;
String[] queryInfos = source.split("\\?");
if (queryInfos.length > 1) {
sourceValue += '&' + queryInfos[1];
}
return sourceValue;
}

public Object transformSourceValue(Object sourceValue, Map<String, Object> context, Map<String, Object> input,
Map<String, Object> output, String transform) {
if (StringUtils.isBlank(transform)) {
Expand Down
Loading

0 comments on commit c09f3ec

Please sign in to comment.