Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task input type #87

Merged
merged 274 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
274 commits
Select commit Hold shift + click to select a range
f902dba
do run AnswerTaskRunner and AnswerTaskDispatcher
Sep 12, 2024
53d7611
rename answer task runner
Sep 12, 2024
10d7584
add icon for answer task
Sep 12, 2024
7bb4e04
add tolerance logic for answer task runner
Sep 12, 2024
448b35a
fix wrong resourceProtocol
Sep 12, 2024
20dd01b
schedule answer task
Sep 12, 2024
81b2588
schedule answer task
Sep 12, 2024
73f6aaa
add comments
Sep 13, 2024
4a40e62
Improve task filtering logic for ready-to-run tasks
Sep 13, 2024
fb326aa
extract static final variable
Sep 14, 2024
7517d4b
use uriBuilder to refactor SseProtocolDispatcher to improve error han…
Sep 18, 2024
00e6f5c
remove useless params
Sep 18, 2024
e5503ca
use uribuilder to build url in SseProtocolDispatcher
Sep 18, 2024
12333ba
use uribuilder to build url in SseProtocolDispatcher
Sep 18, 2024
3f51eb2
fix nullpoint exception
Sep 18, 2024
c634eaf
add unit test for AnswerTaskRunnegr
Sep 18, 2024
d24dd28
add unit test for updateDagStatus update invoke time info
Sep 18, 2024
e47bd74
add unit test for DAGDescriptorFacade.generateResourceProtocol
Sep 18, 2024
69bdfd9
add unit test for DAGDescriptorFacade.generateResourceProtocol
Sep 18, 2024
4f9ca23
add unit test for DAGWalkHelper to test without answer node
Sep 18, 2024
25cfe96
check whether is key mode
Sep 19, 2024
b95dd25
check whether is key mode
Sep 19, 2024
722886e
check whether is key mode
Sep 19, 2024
461de1d
allow callback_info in header
Sep 19, 2024
b0480fb
optimit code
Sep 19, 2024
bc59bb9
add unit test to test when answer node is key callback in key mode
Sep 19, 2024
09c9284
add comments
Sep 19, 2024
cf1bd4d
add unit test
Sep 19, 2024
bc64c4b
add unit test to test buildUrl in SseProtocolDispatcher
Sep 19, 2024
7f0a512
optimit code
Sep 19, 2024
5d60471
add unit test to test buildHttpEntity in SseProtocolDispatcher
Sep 19, 2024
4fa5411
add unit test to test handle in SseProtocolDispatcher
Sep 19, 2024
e85c32e
test buildHttpEntity without callback info
Sep 19, 2024
d6c8423
test getReadyToRunTasks answer after answer
Sep 19, 2024
43e7c17
add unit test to test AnswerTaskRunner.doRun when answerTask is null
Sep 19, 2024
343d26d
add unit test to test AnswerTaskRunner.doRun when answerTask is null
Sep 19, 2024
84dc6d9
add unit test to test AnswerTaskRunner.doRun when dispatch error
Sep 20, 2024
e74c0a1
fix sonar
Sep 20, 2024
d7aa645
fix sonar
Sep 20, 2024
19cdd8a
fix sonar
Sep 20, 2024
bae24b2
add unit test to test AnswerTaskDispatcher
Sep 20, 2024
b3da7b6
add unit test to test AnswerTask
Sep 20, 2024
70105b1
add unit test to test AnswerTask
Sep 20, 2024
194a143
add copyright comments
Sep 23, 2024
8c8b60b
add header
Sep 23, 2024
357d717
remove header
Sep 23, 2024
18d7de3
change enter
Sep 23, 2024
81516b5
change enter
Sep 23, 2024
ee63700
change answer task to stream input task
Sep 29, 2024
d5bdef3
fix unit test
Sep 29, 2024
89a68c5
fix unit test
Sep 29, 2024
2d8b59f
fix issue
Sep 29, 2024
ba0b696
add rill_flow_host parameter when calling aggregate/execute.json
Sep 30, 2024
9d38c9f
remove useless parameter
Sep 30, 2024
f0fd068
add X-Flow-Server header
Sep 30, 2024
c1781f3
fix sonar
Oct 9, 2024
064733d
remove sseProtocolDispatcher
Oct 9, 2024
3b2668d
remove useless code
Oct 10, 2024
4760720
remove useless code
Oct 10, 2024
103f4cf
remove useless code
Oct 10, 2024
6f9c409
remove useless code
Oct 10, 2024
9b6f30c
remove useless code
Oct 10, 2024
1bb5827
add output type
Oct 11, 2024
5274e7d
fix unit test
Oct 11, 2024
a0e2c4c
modify comments
Oct 15, 2024
556a19c
modify comments
Oct 15, 2024
6effb21
modify comments
Oct 15, 2024
e2fbd7c
optimit code
Oct 15, 2024
7c3118b
optimit code
Oct 15, 2024
56945bc
Fix the issue where a stream task depends on another stream task that…
Oct 15, 2024
f9eb1cf
fix sonar:Reduce the total number of break and continue statements in…
Oct 15, 2024
a887d01
modify comments
Oct 15, 2024
f830844
remove hasStreamInputTask
Oct 15, 2024
8a45c15
add todo mark
Oct 16, 2024
e360e8c
add unit test to drive the development
Oct 16, 2024
c6959d8
auto generate output mappings
Oct 16, 2024
cff7e49
modify gitignore
Oct 16, 2024
c3bc1da
add the situation that contains dot in mapping elements
Oct 16, 2024
1f0b816
remove useless variable
Oct 16, 2024
638ee3b
optimit code
Oct 16, 2024
2918395
adapt v2.0 when get descriptors
Oct 16, 2024
7cc01d4
adapt v2.0 when get descriptors
Oct 16, 2024
ff5a68a
remove context keyword in v2.0 version
Oct 16, 2024
e48f401
process inputMappings when set and get descriptor
Oct 17, 2024
03442df
fix index error in processInputMappingsWhenGetDescriptor and add unit…
Oct 17, 2024
a8ea584
optimit code
Oct 17, 2024
70a27d7
add input keyword to generate inputMappings and outputMappings
Oct 17, 2024
4d66de3
fix unit test
Oct 17, 2024
a84e51a
not limit version
Oct 17, 2024
d7384f5
remove useless variable
Oct 17, 2024
558d063
optimit code
Oct 17, 2024
0bf14af
generate end pass task
Oct 17, 2024
3a551e4
generate end pass task
Oct 17, 2024
1dfa9ad
fix unit test
Oct 17, 2024
7a6000f
fix unit test
Oct 17, 2024
a8ff214
fix sonar
Oct 17, 2024
e5fe076
fix sonar
Oct 17, 2024
1b2d113
change endtaskname
Oct 17, 2024
2181dfd
add unit test for generating end pass task
Oct 18, 2024
66a026d
optimit code
Oct 18, 2024
3147b08
optimit code
Oct 18, 2024
9e2c73e
optimit code
Oct 18, 2024
739a839
extract code into DescriptorParseService and implement
Oct 18, 2024
a051061
extract code into DescriptorParseService and implement
Oct 18, 2024
442b26f
add unit test for processWhenGetDescriptor and only use this method f…
Oct 18, 2024
e0f9612
fix error when element is number
Oct 18, 2024
611339f
allow array in json path when mapping
Oct 18, 2024
477daf2
allow array in json path when mapping
Oct 18, 2024
456cf81
allow array in json path when mapping
Oct 18, 2024
8d551cf
recovery useless modify
Oct 18, 2024
67509ed
remove logic to merge common prefixes and add logic to remove duplica…
Oct 21, 2024
b436b9d
remove useless modify
Oct 21, 2024
6d5bde5
modify unit test
Oct 21, 2024
db9994a
optimit code
Oct 21, 2024
f583634
optimit code
Oct 21, 2024
2c9309e
add comments
Oct 21, 2024
ef28da5
add comments
Oct 21, 2024
47ed905
add comments
Oct 21, 2024
7a560b2
add comments
Oct 21, 2024
878671f
add comments
Oct 21, 2024
5808911
recovery useless modify
Oct 21, 2024
49aca15
recovery useless modify
Oct 21, 2024
0172462
remove useless modify
Oct 21, 2024
fc33979
modify comments
Oct 21, 2024
e2ddfcd
complete processInputMappingsWhenGetDescriptor
Oct 21, 2024
21cbac1
complete processInputMappingsWhenGetDescriptor
Oct 21, 2024
d230d82
complete processInputMappingsWhenGetDescriptor
Oct 21, 2024
605d76e
complete processInputMappingsWhenGetDescriptor
Oct 21, 2024
fd188b0
optimit code
Oct 21, 2024
a5b288d
add comments
Oct 21, 2024
d448a64
add comments
Oct 21, 2024
7dbd534
add comments
Oct 21, 2024
35bd99a
add comments
Oct 21, 2024
20edf5f
modify return type from array to List
Oct 21, 2024
a51f3ea
merge generate_output_mappings and resolve conflicts
Oct 21, 2024
10c60b6
rename InputType to TaskInputOutputType
Oct 21, 2024
d1af60b
add output type into function.json
Oct 21, 2024
de6d430
modify walkhelper
Oct 22, 2024
96080f4
modify walkhelper
Oct 22, 2024
44374ef
fix unit test
Oct 22, 2024
5c66b19
add unit tests
Oct 22, 2024
9741249
move all the calls of descriptorParseService into DescriptorManager
Oct 23, 2024
d9b721c
use strategy design pattern to redesign dag process
Oct 23, 2024
ad0be5f
optimit code
Oct 23, 2024
f41980a
add unit tests
Oct 23, 2024
5d1dac1
fix sonar
Oct 23, 2024
8f96160
fix sonar
Oct 23, 2024
2999042
fix sonar
Oct 23, 2024
f89dba3
optimit code
Oct 23, 2024
eaa760f
optimit code
Oct 23, 2024
cf97496
optimit code
Oct 23, 2024
525ab96
optimit code
Oct 23, 2024
1e1efaf
optimit code
Oct 23, 2024
4cf5de7
optimit code
Oct 23, 2024
9fec5d7
add license
Oct 23, 2024
f88c090
modify enter
Oct 24, 2024
0ed46a7
modify enter
Oct 24, 2024
23440b1
add license and comments
Oct 24, 2024
1dc06fb
add license and comments
Oct 24, 2024
d58cbe4
rename method name
Oct 24, 2024
420b4fd
rename method name
Oct 24, 2024
6b1f0f8
redesign DescriptorManager method
Oct 24, 2024
bd0b905
fix unit test
Oct 24, 2024
e9ee796
modify method name
Oct 24, 2024
d88ac16
modify method name
Oct 24, 2024
6a6af2b
add comments
Oct 24, 2024
87700b3
Merge branch 'generate_output_mappings' of github.com:weibocom/rill-f…
Oct 24, 2024
efb2c42
create dag descriptor converter
Oct 24, 2024
7790437
create dag descriptor converter
Oct 24, 2024
6022894
create dag descriptor converter
Oct 24, 2024
b0a2c02
fix unit test
Oct 24, 2024
5b8b6fa
move converter into converter package
Oct 24, 2024
8748aef
modify DO to PO
Oct 24, 2024
587d375
modify DO to PO
Oct 24, 2024
33de274
remove type id comment after polymorphism serialize
Oct 25, 2024
a5e5d35
add unit tests
Oct 25, 2024
743be89
add unit tests
Oct 25, 2024
1ee9504
add unit tests to cover if conditions in DescriptorManager
Oct 25, 2024
db25f4d
add unit test for createDAGDescriptor method in DescriptorManage
Oct 25, 2024
8d61a80
Merge branch 'dag_descriptor_converter' of github.com:weibocom/rill-f…
Oct 25, 2024
569bbf5
regard input as output directly in PassTaskRunner
Oct 25, 2024
b3e8348
regard input as output directly in PassTaskRunner
Oct 25, 2024
a816361
Merge branch 'generate_output_mappings' of github.com:weibocom/rill-f…
Oct 25, 2024
d6edbf6
Merge branch 'dag_descriptor_converter' of github.com:weibocom/rill-f…
Oct 25, 2024
8e5d239
use output keyword in outputMappings in pass task
Oct 25, 2024
b0fe87f
fix unit test
Oct 25, 2024
685cc5e
Merge branch 'dag_descriptor_converter' of github.com:weibocom/rill-f…
Oct 25, 2024
d2461fa
add unit test for JSONPathInputOutputMapping and fix problem
Oct 28, 2024
d73eefe
add unit test for JSONPathInputOutputMapping and fix problem
Oct 28, 2024
72e9486
Merge branch 'dag_descriptor_converter' of github.com:weibocom/rill-f…
Oct 28, 2024
269ed2f
Merge branch 'task_input_type' of github.com:weibocom/rill-flow into …
Oct 28, 2024
d4cd047
remove useless comments
Oct 28, 2024
bb21e6c
add unit test for JSONPathInputOutputMapping
Oct 28, 2024
96d6fe3
use LinkedHashMultimap to rebuild DAGDescriptorConverterImpl
Oct 28, 2024
887a214
optimit code
Oct 28, 2024
75efbbf
modify comments
Oct 28, 2024
94a5683
optimit code
Oct 28, 2024
36d657f
optimit code
Oct 28, 2024
b63b69d
Merge branch 'dag_descriptor_converter' of github.com:weibocom/rill-f…
Oct 28, 2024
5c7a535
rename method name
Oct 28, 2024
e3dfd2d
rename method name
Oct 28, 2024
625c419
rename DescriptorManager into DAGDescriptorManager
Oct 28, 2024
f34a9e8
extract DAGDescriptorManager into daos
Oct 28, 2024
3ed690d
add license and comments
Oct 28, 2024
fae9479
add license and comments
Oct 28, 2024
d9a6a07
extract dao
Oct 28, 2024
9550ff0
add unit test for DAGABTestDAOTest
Oct 28, 2024
46904d6
rename DAGDescriptorUtil to DAGStorageKeysUtil
Oct 28, 2024
6b7fbbf
add unit test for DAGAliasDAO
Oct 28, 2024
2bb3699
add unit test for DAGDescriptorDAO
Oct 29, 2024
e2d6eb4
add unit test for daos
Oct 29, 2024
f4cebcf
add license and comments
Oct 29, 2024
362abc8
add unit test for daos
Oct 29, 2024
585fbe6
add unit test for DAGDescriptorFacade
Oct 29, 2024
2a2118f
add unit test for DAGDescriptorFacade
Oct 29, 2024
4bd5ebb
add unit test for DAGDescriptorService
Oct 29, 2024
6c248a7
add license
Oct 29, 2024
be150bc
add license
Oct 29, 2024
879233b
optimit code
Oct 29, 2024
6c9b07f
add unit test for FunctionTaskService
Oct 29, 2024
2c3ef8b
use getDAG instead of getDescriptorDO
Oct 29, 2024
e55c732
resolve conflicts
Oct 29, 2024
e9b6979
add comments
Oct 29, 2024
c954559
Merge branch 'extract_dag_descriptor_dao' of github.com:weibocom/rill…
Oct 29, 2024
87f530e
resolve conflicts
Oct 29, 2024
874afdd
recovery useless modify
Oct 29, 2024
0481548
recovery useless modify
Oct 29, 2024
80c1792
recovery useless modify
Oct 29, 2024
b3fbc2c
recovery useless modify
Oct 29, 2024
25808f6
add @JsonProperty
Oct 29, 2024
66102f6
add @JsonProperty
Oct 29, 2024
ae5f68e
add @JsonAlias("key_callback")
Oct 29, 2024
e2b4603
add @JsonAlias
Oct 29, 2024
e9fe038
rename field
Oct 29, 2024
f722697
add @JsonAlias
Oct 29, 2024
38947b6
add @JsonAlias
Oct 29, 2024
9a9bff0
add @JsonAlias
Oct 29, 2024
8996c0f
remove useless import
Oct 30, 2024
ac4a8cb
add comments
Oct 30, 2024
3ad3aab
modify comments
Oct 30, 2024
1bac582
split judgement to let code easy to be read
Oct 30, 2024
86e3109
optimit code
Oct 30, 2024
5d4108e
modify unit test
Oct 30, 2024
fad5abe
modify unit test
Oct 30, 2024
7cab690
extract method
Oct 30, 2024
47df362
modify comments
Oct 30, 2024
c669b70
modify comments
Oct 30, 2024
78489ea
optimit code
Oct 30, 2024
ea1f168
add unit test to test getReadyToRunTasks when there are multiple stre…
Oct 30, 2024
734bf66
optimit code
Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus;
import com.weibo.rill.flow.olympicene.core.model.strategy.CallbackConfig;
import com.weibo.rill.flow.interfaces.model.task.FunctionTask;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -53,22 +52,82 @@ public static DAGWalkHelper getInstance() {
}

public Set<TaskInfo> getReadyToRunTasks(Collection<TaskInfo> taskInfos) {
Set<TaskInfo> readyToRunTasks = taskInfos.stream()
.filter(taskInfo -> taskInfo != null && taskInfo.getTaskStatus() == TaskStatus.NOT_STARTED)
.filter(taskInfo -> !taskInfo.getTask().isKeyCallback())
.filter(taskInfo -> CollectionUtils.isEmpty(taskInfo.getDependencies()) || taskInfo.getDependencies().stream().allMatch(i -> i.getTaskStatus().isSuccessOrSkip()))
boolean isKeyMode = isKeyMode(taskInfos);
// 根据依赖关系获取准备运行的非流式输入任务
Set<TaskInfo> readyToRunTasks = getReadyToRunBlockInputTasks(taskInfos, isKeyMode);
// 添加准备运行的流式输入任务
addReadyToRunStreamInputTasks(taskInfos, readyToRunTasks, isKeyMode);
return readyToRunTasks;
}

/**
* 筛选出准备运行的任务:
* 1. 当前任务不为空且状态为未开始
* 2. 依赖任务全部完成(如果在关键路径模式下,则包括关键路径完成)
*/
private Set<TaskInfo> getReadyToRunBlockInputTasks(Collection<TaskInfo> taskInfos, boolean isKeyMode) {
return taskInfos.stream()
.filter(Objects::nonNull)
.filter(taskInfo -> taskInfo.getTaskStatus() == TaskStatus.NOT_STARTED)
.filter(taskInfo -> TaskInputOutputType.getTypeByValue(taskInfo.getTask().getInputType()) == TaskInputOutputType.BLOCK)
.filter(taskInfo -> isDependenciesAllSuccessOrSkip(taskInfo, isKeyMode))
.collect(Collectors.toSet());
}

if (isKeyMode(taskInfos)) {
Set<TaskInfo> keyCallbackTasks = taskInfos.stream()
.filter(taskInfo -> taskInfo != null && taskInfo.getTaskStatus() == TaskStatus.NOT_STARTED)
.filter(taskInfo -> taskInfo.getTask().isKeyCallback()) // 此类型任务只需前置依赖节点关键路径完成即可执行
.filter(taskInfo -> CollectionUtils.isEmpty(taskInfo.getDependencies()) || taskInfo.getDependencies().stream().allMatch(i -> i.getTaskStatus().isSuccessOrKeySuccessOrSkip()))
.collect(Collectors.toSet());
readyToRunTasks.addAll(keyCallbackTasks);
/**
* 非流式输入任务,所有依赖的 block 输出任务都已经完成,或在关键路径模式下关键路径完成或跳过,则任务可以运行
*/
private boolean isDependenciesAllSuccessOrSkip(TaskInfo taskInfo, boolean isKeyMode) {
// 1. 没有依赖视为依赖均已完成
if (CollectionUtils.isEmpty(taskInfo.getDependencies())) {
return true;
}
boolean isTaskKeyMode = isKeyMode && taskInfo.getTask().isKeyCallback();
// 2. 非流式输入任务,所有依赖任务是否都已完成(非关键路径模式下完成或跳过,或者在关键路径模式下关键路径完成或跳过)
return taskInfo.getDependencies().stream().allMatch(dependency -> isTaskSuccessOrSkip(dependency, isTaskKeyMode));
}

return readyToRunTasks;
private boolean isTaskSuccessOrSkip(TaskInfo taskInfo, boolean isTaskKeyMode) {
return taskInfo.getTaskStatus().isSuccessOrSkip()
|| (isTaskKeyMode && taskInfo.getTaskStatus().isSuccessOrKeySuccessOrSkip());
}

/**
* 获取准备运行的流式输入任务,并添加到 readyToRunTasks
*
* @param taskInfos 所有任务的集合
* @param readyToRunTasks 已准备运行的任务集合
*/
private void addReadyToRunStreamInputTasks(Collection<TaskInfo> taskInfos, Set<TaskInfo> readyToRunTasks, boolean isKeyMode) {
Set<TaskInfo> readyToRunStreamTasks = new HashSet<>();
taskInfos.stream().filter(Objects::nonNull).filter(taskInfo -> taskInfo.getTaskStatus() == TaskStatus.NOT_STARTED)
.filter(taskInfo -> TaskInputOutputType.getTypeByValue(taskInfo.getTask().getInputType()) == TaskInputOutputType.STREAM)
.forEach(taskInfo -> {
boolean isTaskKeyMode = isKeyMode && taskInfo.getTask().isKeyCallback();
boolean needRun = taskInfo.getDependencies().stream()
.anyMatch(dependency -> isStreamTaskReadyToRun(readyToRunTasks, dependency, isTaskKeyMode));
if (needRun) {
readyToRunStreamTasks.add(taskInfo);
}
});
readyToRunTasks.addAll(readyToRunStreamTasks);
}

/**
* 判断流式输入任务是否可执行,只要有任何依赖符合以下任一条件,流式输入任务就可以执行
* 1. 任意依赖的流式输出任务开始执行或者准备被执行
* 2. 任意依赖的非流式输出任务执行完成(包括关键路径模式下关键路径执行完成)
*/
private boolean isStreamTaskReadyToRun(Set<TaskInfo> readyToRunTasks, TaskInfo dependency, boolean isTaskKeyMode) {
TaskInputOutputType dependencyOutputType = TaskInputOutputType.getTypeByValue(dependency.getTask().getOutputType());
boolean streamDependencyStarted = false;
boolean blockDependencyFinished = false;
if (dependencyOutputType == TaskInputOutputType.STREAM) {
streamDependencyStarted = dependency.getTaskStatus() != TaskStatus.NOT_STARTED || readyToRunTasks.contains(dependency);
} else {
blockDependencyFinished = isTaskSuccessOrSkip(dependency, isTaskKeyMode);
}
return streamDependencyStarted || blockDependencyFinished;
}

private boolean isKeyMode(Collection<TaskInfo> allTasks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.weibo.rill.flow.olympicene.core.model.dag;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
Expand Down Expand Up @@ -43,6 +44,7 @@ public class DAG {
private List<BaseTask> tasks;
@Setter
private List<BaseResource> resources;
@JsonProperty("callback")
private CallbackConfig callbackConfig;
@Setter
private Map<String, String> defaultContext;
Expand All @@ -61,7 +63,7 @@ public DAG(@JsonProperty("workspace") String workspace,
@JsonProperty("timer") Timeline timeline,
@JsonProperty("tasks") List<BaseTask> tasks,
@JsonProperty("resources") List<BaseResource> resources,
@JsonProperty("callback") CallbackConfig callbackConfig,
@JsonProperty("callback") @JsonAlias({"callback_config", "callbackConfig"}) CallbackConfig callbackConfig,
@JsonProperty("defaultContext") Map<String, String> defaultContext,
@JsonProperty("commonMapping") Map<String, List<Mapping>> commonMapping,
@JsonProperty("namespace") String namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
@JsonProperty("progress") Progress progress,
@JsonProperty("degrade") Degrade degrade,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyCallback") boolean keyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
@JsonProperty("templateId") String templateId,
@JsonProperty("inputType") String inputType,
@JsonProperty("outputType") String outputType) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress,

Check warning on line 61 in rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ChoiceTask.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/ChoiceTask.java#L61

Added line #L61 was not covered by tests
degrade, timeline, keyCallback, keyExp, parameters, templateId, input, inputType, outputType);
this.choices = choices;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ public ForeachTask(
@JsonProperty("progress") Progress progress,
@JsonProperty("degrade") Degrade degrade,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyCallback") boolean keyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
@JsonProperty("input") Map<String, Object> input,
@JsonProperty("inputType") String inputType,
@JsonProperty("outputType") String outputType) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
timeline, keyCallback, keyExp, parameters, templateId, input, inputType, outputType);
this.synchronization = synchronization;
this.iterationMapping = iterationMapping;
this.tasks = tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@
@JsonProperty("progress") Progress progress,
@JsonProperty("degrade") Degrade degrade,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyCallback") boolean keyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
@JsonProperty("input") Map<String, Object> input,
@JsonProperty("inputType") String inputType,
@JsonProperty("outputType") String outputType) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline,

Check warning on line 60 in rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/PassTask.java

View check run for this annotation

Codecov / codecov/patch

rill-flow-dag/olympicene-core/src/main/java/com/weibo/rill/flow/olympicene/core/model/task/PassTask.java#L60

Added line #L60 was not covered by tests
keyCallback, keyExp, parameters, templateId, input, inputType, outputType);
Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ public ReturnTask(@JsonProperty("name") String name,
@JsonProperty("progress") Progress progress,
@JsonProperty("degrade") Degrade degrade,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyCallback") boolean keyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
@JsonProperty("input") Map<String, Object> input,
@JsonProperty("inputType") String inputType,
@JsonProperty("outputType") String outputType) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
timeline, keyCallback, keyExp, parameters, templateId, input, inputType, outputType);
Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null));
this.conditions = conditions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ public SuspenseTask(@JsonProperty("name") String name,
@JsonProperty("progress") Progress progress,
@JsonProperty("degrade") Degrade degrade,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyCallback") boolean keyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
@JsonProperty("input") Map<String, Object> input,
@JsonProperty("inputType") String inputType,
@JsonProperty("outputType") String outputType) {
super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
timeline, keyCallback, keyExp, parameters, templateId, input, inputType, outputType);
this.conditions = conditions;
this.interruptions = interruptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import lombok.Getter;
import lombok.Setter;

import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Getter
Expand All @@ -48,13 +50,15 @@ public SwitchTask(
@JsonProperty("progress") Progress progress,
@JsonProperty("degrade") Degrade degrade,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyCallback") boolean keyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
@JsonProperty("input") Map<String, Object> input,
@JsonProperty("inputType") String inputType,
@JsonProperty("outputType") String outputType) {
super(name, title, description, category, null, false, inputMappings, null, progress,
degrade, timeline, isKeyCallback, keyExp, parameters, templateId, input);
degrade, timeline, keyCallback, keyExp, parameters, templateId, input, inputType, outputType);
this.switches = switches;
this.setNext(switches.stream().map(Switch::getNext).collect(Collectors.joining(",")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ public enum TaskCategory {
// 流程控制Task,执行分支语句
CHOICE("choice", 1),

// 流程控制Task,执行分支语句
SWITCH("switch", 1),

// 流程控制Task,执行循环语句
FOREACH("foreach", 1),

Expand All @@ -43,6 +40,9 @@ public enum TaskCategory {

// return task
RETURN("return", 2),

// 执行分支语句
SWITCH("switch", 2),
;

private final String value;
Expand All @@ -66,6 +66,7 @@ public static TaskCategory getEnumByValue(String category) {
case "suspense" -> SUSPENSE;
case "pass" -> PASS;
case "return" -> RETURN;
case "switch" -> SWITCH;
default -> null;
};
}
Expand Down
Loading
Loading