Skip to content

Commit

Permalink
check dag length when add descriptor (#48)
Browse files Browse the repository at this point in the history
* check dag length when add descriptor

---------

Co-authored-by: techloghub <[email protected]>
Co-authored-by: zeyu10 <[email protected]>
  • Loading branch information
3 people authored Jun 26, 2024
1 parent cf32306 commit f8f0490
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,6 @@ public DAGInfoDAO(RedisClient redisClient, DAGInfoDeserializeService dagInfoDese
this.finishStatusReserveTimeInSecond = finishStatusReserveTimeInSecond;
}

// ------------------------------------------------------
// 若想动态修改属性值 如: 不同业务设置不同值
// 需要继承该类 重写该方法 添加动态设置业务逻辑
protected void checkDAGInfoLength(String executionId, List<byte[]> contents) {
log.debug("checkDAGInfoLength executionId:{} contents size empty:{}", executionId, CollectionUtils.isEmpty(contents));
}

protected int getFinishStatusReserveTimeInSecond(String executionId) {
log.debug("getFinishStatusReserveTimeInSecond executionId:{}, time:{}", executionId, finishStatusReserveTimeInSecond);
return finishStatusReserveTimeInSecond;
Expand All @@ -160,13 +153,6 @@ public DAGInfo getDagInfo(String executionId, boolean needSubTasks) {
return null;
}

List<byte[]> contents = dagInfos.stream()
.map(array -> array.get(1))
.filter(CollectionUtils::isNotEmpty)
.flatMap(Collection::stream)
.toList();
checkDAGInfoLength(executionId, contents);

return deserializeDagInfo(dagInfos);
} catch (Exception e) {
log.warn("getDagInfo fails, executionId:{}", executionId, e);
Expand Down Expand Up @@ -304,12 +290,6 @@ private TaskInfo getTaskInfo(String executionId, String taskName, String subTask
}
List<List<byte[]>> ret = (List<List<byte[]>>) redisClient.eval(RedisScriptManager.dagInfoGetByFieldScript(), executionId, keys, argv);

List<byte[]> contents = ret.stream()
.filter(CollectionUtils::isNotEmpty)
.flatMap(Collection::stream)
.toList();
checkDAGInfoLength(executionId, contents);

// dag描述符
DAG dag = DagStorageSerializer.deserialize(ret.get(0).get(0), DAG.class);
Map<String, BaseTask> baseTaskMap = getBaseTask(1, Optional.ofNullable(dag).map(DAG::getTasks).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
import com.weibo.rill.flow.service.manager.DescriptorManager;
import com.weibo.rill.flow.service.service.ProtocolPluginService;
import com.weibo.rill.flow.service.statistic.DAGSubmitChecker;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
Expand All @@ -46,6 +47,7 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -73,6 +75,8 @@ public class DAGDescriptorFacade {
private RedisClient redisClient;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private DAGSubmitChecker dagSubmitChecker;

public Map<String, Object> modifyBusiness(boolean add, String businessId) {
boolean ret = add ? descriptorManager.createBusiness(businessId) : descriptorManager.remBusiness(businessId);
Expand Down Expand Up @@ -172,6 +176,9 @@ public Map<String, Object> getVersion(String businessId, String featureName, Str

public Map<String, Object> addDescriptor(String identity, String businessId, String featureName, String alias, String descriptor) {
try {
if (StringUtils.isNotEmpty(descriptor)) {
dagSubmitChecker.checkDAGInfoLengthByBusinessId(businessId, List.of(descriptor.getBytes(StandardCharsets.UTF_8)));
}
String descriptorId = descriptorManager.createDAGDescriptor(businessId, featureName, alias, descriptor);

Map<String, String> attachments = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private String getDescriptorRedisKeyByAlias(String businessId, String featureNam
Set<String> redisRet = redisClient.zrange(businessId, buildVersionRedisKey(businessId, featureName, alias), -1, -1);
if (CollectionUtils.isEmpty(redisRet)) {
log.info("getDescriptorRedisKeyByAlias redisRet empty");
throw new TaskException(BizError.ERROR_PROCESS_FAIL.getCode(), String.format("alias %s value empty", alias));
throw new TaskException(BizError.ERROR_PROCESS_FAIL.getCode(), String.format("cannot find descriptor: %s:%s:%s", businessId, featureName, alias));
}

String md5 = redisRet.iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import com.weibo.rill.flow.common.function.ResourceStatus;
import com.weibo.rill.flow.common.model.BizError;
import com.weibo.rill.flow.common.util.SerializerUtil;
import com.weibo.rill.flow.olympicene.storage.constant.StorageErrorCode;
import com.weibo.rill.flow.olympicene.storage.exception.StorageException;
import com.weibo.rill.flow.service.configuration.BeanConfig;
import com.weibo.rill.flow.service.dconfs.BizDConfs;
import com.weibo.rill.flow.service.dconfs.DynamicClientConfs;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.service.util.ExecutionIdUtil;
import com.weibo.rill.flow.service.util.ValueExtractor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -61,6 +64,8 @@ public class DAGSubmitChecker {
@Autowired
private SwitcherManager switcherManagerImpl;

private static final int DAG_INFO_MAX_LENGTH_CONFIG = 30 * 1024 + 600 * 1024; // dag 描述符最大为 30K 每个任务大小最大为 600B 最大存 1000 个任务

public ResourceCheckConfig getCheckConfig(String resourceCheck) {
if (StringUtils.isBlank(resourceCheck)) {
return null;
Expand Down Expand Up @@ -294,6 +299,25 @@ private FlowCheck flowRuntimeCheck(String businessId, String serviceId) {
return flowCheck;
}

public void checkDAGInfoLength(String executionId, List<byte[]> contents) {
String businessId = ExecutionIdUtil.getBusinessId(executionId);
checkDAGInfoLengthByBusinessId(businessId, contents);
}

public void checkDAGInfoLengthByBusinessId(String businessId, List<byte[]> contents) {
if (!switcherManagerImpl.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") || CollectionUtils.isEmpty(contents)) {
return;
}

int length = contents.stream().filter(Objects::nonNull).mapToInt(content -> content.length).sum();
int maxLength = ValueExtractor.getConfiguredValueByBusinessId(businessId, bizDConfs.getRedisBusinessIdToDAGInfoMaxLength(), DAG_INFO_MAX_LENGTH_CONFIG);
if (length > maxLength) {
throw new StorageException(
StorageErrorCode.DAG_LENGTH_LIMITATION.getCode(),
String.format("dag info length:%s exceed the limit", length));
}
}

@Getter
@Setter
@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public RuntimeStorage(RedisClient redisClient, Map<String, RedisClient> clientId
DAGInfoDeserializeService dagInfoDeserializeService, SwitcherManager switcherManagerImpl) {
this.bizDConfs = bizDConfs;

DAGInfoRedisDAO dagInfoRedisDAO = new DAGInfoRedisDAO(redisClient, bizDConfs, dagInfoDeserializeService, switcherManagerImpl);
DAGInfoRedisDAO dagInfoRedisDAO = new DAGInfoRedisDAO(redisClient, bizDConfs, dagInfoDeserializeService);
ContextRedisDAO contextRedisDAO = new ContextRedisDAO(redisClient, bizDConfs, switcherManagerImpl);
this.runtimeRedisStorage = new DAGRedisStorage(dagInfoRedisDAO, contextRedisDAO);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,20 @@

package com.weibo.rill.flow.service.storage.dao;

import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.storage.constant.StorageErrorCode;
import com.weibo.rill.flow.olympicene.storage.exception.StorageException;
import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
import com.weibo.rill.flow.olympicene.storage.save.impl.DAGInfoDAO;
import com.weibo.rill.flow.olympicene.storage.save.impl.DAGInfoDeserializeService;
import com.weibo.rill.flow.service.dconfs.BizDConfs;
import com.weibo.rill.flow.service.util.ValueExtractor;
import org.apache.commons.collections.CollectionUtils;

import java.util.List;
import java.util.Objects;


public class DAGInfoRedisDAO extends DAGInfoDAO {
private SwitcherManager switcherManagerImpl;

private static final int DAG_INFO_MAX_LENGTH_CONFIG = 30 * 1024 + 600 * 1024; // dag描述符最大为30K 每个任务大小最大为600B最大存1000个任务

private final BizDConfs bizDConfs;

public DAGInfoRedisDAO(RedisClient redisClient, BizDConfs bizDConfs,
DAGInfoDeserializeService dagInfoDeserializeService, SwitcherManager switcherManagerImpl) {
DAGInfoDeserializeService dagInfoDeserializeService) {
super(redisClient, dagInfoDeserializeService);
this.bizDConfs = bizDConfs;
this.switcherManagerImpl = switcherManagerImpl;
}

@Override
Expand All @@ -53,19 +41,4 @@ public int getFinishStatusReserveTimeInSecond(String executionId) {
protected int getUnfinishedStatusReserveTimeInSecond(String executionId) {
return ValueExtractor.getConfiguredValue(executionId, bizDConfs.getRedisBusinessIdToUnfinishedReserveSecond(), 259200);
}

@Override
protected void checkDAGInfoLength(String executionId, List<byte[]> contents) {
if (!switcherManagerImpl.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") || CollectionUtils.isEmpty(contents)) {
return;
}

int length = contents.stream().filter(Objects::nonNull).mapToInt(content -> content.length).sum();
int maxLength = ValueExtractor.getConfiguredValue(executionId, bizDConfs.getRedisBusinessIdToDAGInfoMaxLength(), DAG_INFO_MAX_LENGTH_CONFIG);
if (length > maxLength) {
throw new StorageException(
StorageErrorCode.DAG_LENGTH_LIMITATION.getCode(),
String.format("dag info length:%s exceed the limit", length));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public static int getConfiguredValue(String executionId, Map<String, Integer> ti
}
}

public static int getConfiguredValueByBusinessId(String businessId, Map<String, Integer> timeConfigMap, int defaultValue) {
try {
return timeConfigMap.getOrDefault(businessId, defaultValue);
} catch (Exception e) {
return defaultValue;
}
}

private ValueExtractor() {

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.weibo.rill.flow.service.facade

import com.weibo.rill.flow.common.exception.TaskException
import com.weibo.rill.flow.olympicene.storage.constant.StorageErrorCode
import com.weibo.rill.flow.olympicene.storage.exception.StorageException
import com.weibo.rill.flow.service.manager.DescriptorManager
import com.weibo.rill.flow.service.statistic.DAGSubmitChecker
import org.springframework.context.ApplicationEventPublisher
import spock.lang.Specification

class DAGDescriptorFacadeTest extends Specification {
DAGSubmitChecker dagSubmitChecker = Mock(DAGSubmitChecker)
DescriptorManager descriptorManager = Mock(DescriptorManager)
ApplicationEventPublisher applicationEventPublisher = Mock(ApplicationEventPublisher)
DAGDescriptorFacade facade = new DAGDescriptorFacade(
dagSubmitChecker: dagSubmitChecker,
applicationEventPublisher: applicationEventPublisher,
descriptorManager: descriptorManager
)

def "test addDescriptor"() {
given:
var descriptor_id = "testBusiness:testFeatureName_c_8921a32f"
applicationEventPublisher.publishEvent(*_) >> null
dagSubmitChecker.checkDAGInfoLengthByBusinessId(*_) >> null
descriptorManager.createDAGDescriptor(*_) >> descriptor_id
expect:
facade.addDescriptor(null, "testBusiness", "testFeatureName", "release", "hello world") == [ret: true, descriptor_id: descriptor_id]
facade.addDescriptor(null, "testBusiness", "testFeatureName", "release", "") == [ret: true, descriptor_id: descriptor_id]
}

def "test addDescriptor when check error"() {
given:
var descriptor_id = "testBusiness:testFeatureName_c_8921a32f"
applicationEventPublisher.publishEvent(*_) >> null
dagSubmitChecker.checkDAGInfoLengthByBusinessId(*_) >> {throw new StorageException(StorageErrorCode.DAG_LENGTH_LIMITATION.getCode(), "DAG length limitation")}
descriptorManager.createDAGDescriptor(*_) >> descriptor_id
when:
facade.addDescriptor(null, "testBusiness", "testFeatureName", "release", "hello world")
then:
thrown TaskException
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.weibo.rill.flow.service.statistic

import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager
import com.weibo.rill.flow.olympicene.storage.exception.StorageException
import com.weibo.rill.flow.service.dconfs.BizDConfs
import spock.lang.Specification

class DAGSubmitCheckerTest extends Specification {
SwitcherManager switcherManager = Mock(SwitcherManager)
BizDConfs bizDConfs = Mock(BizDConfs)
DAGSubmitChecker dagSubmitChecker = new DAGSubmitChecker(switcherManagerImpl: switcherManager, bizDConfs: bizDConfs)

def "test checkDAGInfoLength when switcher is off"() {
given:
switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> false
expect:
dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes])
}

def "test checkDAGInfoLength when switcher is on and be limited"() {
given:
switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true
bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> ["testBusiness1": 5]
when:
dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes])
then:
thrown StorageException
}

def "test checkDAGInfoLength when switcher is on"() {
given:
switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true
bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> ["testBusiness1": 5]
expect:
dagSubmitChecker.checkDAGInfoLength("testBusiness1:testFeatureName1_c_0dc48c1d-32a2", null)
dagSubmitChecker.checkDAGInfoLength("testBusiness2:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes])
}

def "test checkDAGInfoLength when switcher is on and dconfs return null"() {
given:
switcherManager.getSwitcherState("ENABLE_DAG_INFO_LENGTH_CHECK") >> true
bizDConfs.getRedisBusinessIdToDAGInfoMaxLength() >> null
expect:
dagSubmitChecker.checkDAGInfoLength("testBusiness2:testFeatureName1_c_0dc48c1d-32a2", ["hello world".bytes])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public RuntimeStorage runtimeStorage(
@Autowired @Qualifier("runtimeRedisClients") RedisClient redisClient,
@Autowired DAGInfoDeserializeService dagInfoDeserializeService,
@Autowired SwitcherManager switcherManagerImpl) {
return new RuntimeStorage(redisClient, dagClientPool.getRuntimeStorageClientIdToRedisClient(), bizDConfs, dagInfoDeserializeService, switcherManagerImpl);
return new RuntimeStorage(redisClient, dagClientPool.getRuntimeStorageClientIdToRedisClient(), bizDConfs,
dagInfoDeserializeService, switcherManagerImpl);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ import com.weibo.rill.flow.task.template.service.TaskTemplateService
import spock.lang.Specification

class TaskTemplateControllerTest extends Specification {
TaskTemplateController taskTemplateController = new TaskTemplateController()
TaskTemplateService taskTemplateService = Mock(TaskTemplateService)
def setup() {
taskTemplateController.taskTemplateService = taskTemplateService
}
TaskTemplateController taskTemplateController = new TaskTemplateController(taskTemplateService: taskTemplateService)

def "test getMetaDataList"() {
given:
Expand Down

0 comments on commit f8f0490

Please sign in to comment.