diff --git a/rill-flow-impl/src/main/java/com/weibo/rill/flow/impl/service/HttpInvokeHelperImpl.java b/rill-flow-impl/src/main/java/com/weibo/rill/flow/impl/service/HttpInvokeHelperImpl.java index 1321a8e06..9a529e3e1 100644 --- a/rill-flow-impl/src/main/java/com/weibo/rill/flow/impl/service/HttpInvokeHelperImpl.java +++ b/rill-flow-impl/src/main/java/com/weibo/rill/flow/impl/service/HttpInvokeHelperImpl.java @@ -16,6 +16,7 @@ package com.weibo.rill.flow.impl.service; +import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Maps; import com.weibo.rill.flow.common.exception.TaskException; import com.weibo.rill.flow.common.model.BizError; @@ -27,14 +28,17 @@ import com.weibo.rill.flow.service.auth.AuthHeaderGenerator; import com.weibo.rill.flow.service.invoke.HttpInvokeHelper; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.apache.http.client.utils.URIBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; -import org.springframework.web.client.RestClientResponseException; import org.springframework.web.client.RestTemplate; import java.net.URI; @@ -51,6 +55,8 @@ public class HttpInvokeHelperImpl implements HttpInvokeHelper { @Qualifier("authHeaderGenerator") private AuthHeaderGenerator authHeaderGenerator; + private final Logger httpAccessLogger = LoggerFactory.getLogger("httpclientaccess"); + @Override public void appendRequestHeader(HttpHeaders httpHeaders, String executionId, TaskInfo task, Map input) { authHeaderGenerator.appendRequestHeader(httpHeaders, executionId, task, input); @@ -128,20 +134,47 @@ public String invokeRequest(String executionId, String taskInfoName, String url, RestTemplate restTemplate = defaultRestTemplate; String cause = null; for (int i = 1; i <= maxInvokeTime; i++) { + long startTime = System.currentTimeMillis(); + ResponseEntity responseEntity = null; try { - String result; if (method == HttpMethod.GET) { - result = restTemplate.getForObject(url, String.class); + responseEntity = restTemplate.exchange(new URI(url), method, requestEntity, String.class); } else { - result = restTemplate.postForObject(new URI(url), requestEntity, String.class); + responseEntity = restTemplate.postForEntity(new URI(url), requestEntity, String.class); } - return result; - } catch (RestClientResponseException e) { - throw e; + return responseEntity.getBody(); } catch (Exception e) { cause = e.getMessage(); + } finally { + postHttpProcess(url, requestEntity, method, System.currentTimeMillis() - startTime, responseEntity); } } throw new TaskException(BizError.ERROR_INVOKE_URI.getCode(), String.format("dispatchTask http fails due to %s", cause)); } + + + private void postHttpProcess(String url, HttpEntity requestEntity, HttpMethod method, long timeout, ResponseEntity responseEntity) { + try { + int code = responseEntity == null ? 500 : responseEntity.getStatusCode().value(); + // 打印 http_access 日志 + Object requestBody = parseBodyForHttpAccessLog(requestEntity.getBody()); + String responseBody = (responseEntity == null || responseEntity.getBody() == null) ? "" : responseEntity.getBody(); + int responseLength = responseBody.length(); + responseBody = StringUtils.substring(responseBody, 0, 1000); + httpAccessLogger.info("{} {} {} {} {} {} {}", timeout, method, code, responseLength, url, requestBody, responseBody); + } catch (Exception e) { + log.warn("httpAccess log error", e); + } + } + + private Object parseBodyForHttpAccessLog(Object body) { + try { + if (body instanceof Map) { + return new JSONObject((Map) body).toJSONString(); + } + } catch (Exception e) { + log.warn("parseBodyForHttpAccessLog error: ", e); + } + return body; + } } diff --git a/rill-flow-impl/src/test/groovy/com/weibo/rill/flow/impl/service/HttpInvokeHelperImplTest.groovy b/rill-flow-impl/src/test/groovy/com/weibo/rill/flow/impl/service/HttpInvokeHelperImplTest.groovy index fee9d16d5..ccf10f9d8 100644 --- a/rill-flow-impl/src/test/groovy/com/weibo/rill/flow/impl/service/HttpInvokeHelperImplTest.groovy +++ b/rill-flow-impl/src/test/groovy/com/weibo/rill/flow/impl/service/HttpInvokeHelperImplTest.groovy @@ -1,12 +1,19 @@ package com.weibo.rill.flow.impl.service - +import com.weibo.rill.flow.common.exception.TaskException import com.weibo.rill.flow.service.invoke.HttpInvokeHelper +import org.springframework.http.HttpEntity +import org.springframework.http.HttpMethod +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.web.client.RestClientResponseException +import org.springframework.web.client.RestTemplate import spock.lang.Specification import spock.lang.Unroll class HttpInvokeHelperImplTest extends Specification { - HttpInvokeHelper httpInvokeHelper = new HttpInvokeHelperImpl() + RestTemplate defaultRestTemplate = Mock(RestTemplate) + HttpInvokeHelper httpInvokeHelper = new HttpInvokeHelperImpl(defaultRestTemplate: defaultRestTemplate) @Unroll def "functionRequestParams test"() { @@ -30,4 +37,45 @@ class HttpInvokeHelperImplTest extends Specification { ['query': ['user': 'Bob'], 'header': ['content-type': 'application/x-www-form-urlencoded'], 'body': ['title': 'first post']] || ['execution_id': "xxx", 'name': "testFunctionRequestParams", 'user': 'Bob'] | ['execution_id': "xxx", 'title': 'first post'] | ['content-type': 'application/x-www-form-urlencoded'] ['input_num': 10, 'query': ['user': 'Bob'], 'header': ['content-type': 'application/x-www-form-urlencoded'], 'body': ['title': 'first post']] || ['execution_id': "xxx", 'name': "testFunctionRequestParams", 'user': 'Bob'] | ['execution_id': "xxx", 'input_num': 10, 'title': 'first post'] | ['content-type': 'application/x-www-form-urlencoded'] } + + @Unroll + def "test invokeRequest"() { + when: + ResponseEntity responseEntity = new ResponseEntity<>("response body", HttpStatus.OK) + HttpEntity requestEntity = new HttpEntity<>(Map.of("data", "hello world"), null) + defaultRestTemplate.exchange(*_) >> responseEntity + defaultRestTemplate.postForEntity(*_) >> responseEntity + then: + httpInvokeHelper.invokeRequest("testExecutionId", "testTaskName", + "http://localhost:8080/testurl", requestEntity, HttpMethod.GET, 1) == "response body" + httpInvokeHelper.invokeRequest("testExecutionId", "testTaskName", + "http://localhost:8080/testurl", requestEntity, HttpMethod.POST, 1) == "response body" + } + + @Unroll + def "test invokeRequest return null"() { + when: + ResponseEntity responseEntity = new ResponseEntity<>(null, HttpStatus.BAD_GATEWAY) + HttpEntity requestEntity = new HttpEntity<>(null, null) + defaultRestTemplate.exchange(*_) >> responseEntity + defaultRestTemplate.postForEntity(*_) >> responseEntity + then: + noExceptionThrown() + httpInvokeHelper.invokeRequest("testExecutionId", "testTaskName", + "http://localhost:8080/testurl", requestEntity, HttpMethod.GET, 1) == null + httpInvokeHelper.invokeRequest("testExecutionId", "testTaskName", + "http://localhost:8080/testurl", requestEntity, HttpMethod.POST, 1) == null + } + + @Unroll + def "test invokeRequest throw exception"() { + given: + HttpEntity requestEntity = new HttpEntity<>(null, null) + defaultRestTemplate.exchange(*_) >> { throw new RestClientResponseException("Bad Gateway Timeout", 504, "Bad Gateway Timeout", null, null, null) } + when: + httpInvokeHelper.invokeRequest("testExecutionId", "testTaskName", + "http://localhost:8080/testurl", requestEntity, HttpMethod.GET, 1) + then: + thrown(TaskException.class) + } }