Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve #1738 | Add jetty http client request processing time metrics #1756

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import pl.allegro.tech.hermes.metrics.HermesTimer;

import java.util.function.ToDoubleFunction;

Expand Down Expand Up @@ -66,4 +67,46 @@ public <T> void registerHttp2SerialClientConnectionsGauge(T obj, ToDoubleFunctio
public <T> void registerHttp2SerialClientPendingConnectionsGauge(T obj, ToDoubleFunction<T> f) {
gaugeRegistrar.registerGauge(CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_PENDING_CONNECTIONS, obj, f);
}

public HermesTimer http1SerialClientRequestQueueWaitingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME)
);
}

public HermesTimer http1BatchClientRequestQueueWaitingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_1_BATCH_CLIENT_REQUEST_QUEUE_WAITING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_1_BATCH_CLIENT_REQUEST_QUEUE_WAITING_TIME)
);
}

public HermesTimer http2SerialClientRequestQueueWaitingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME)
);
}

public HermesTimer http1SerialClientRequestProcessingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME)
);
}

public HermesTimer http1BatchClientRequestProcessingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_1_BATCH_CLIENT_REQUEST_PROCESSING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_1_BATCH_CLIENT_REQUEST_PROCESSING_TIME)
);
}

public HermesTimer http2SerialClientRequestProcessingTimer() {
return HermesTimer.from(
meterRegistry.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME),
hermesMetrics.timer(Timers.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,17 @@ public class Timers {
public static final String CONSUMER_IDLE_TIME = "idle-time." + GROUP + "." + TOPIC + "." + SUBSCRIPTION;

public static final String OAUTH_PROVIDER_TOKEN_REQUEST_LATENCY = "oauth.provider." + OAUTH_PROVIDER_NAME + ".token-request-latency";

public static final String CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME =
"http-clients.serial.http1.request-queue-waiting-time";
public static final String CONSUMER_SENDER_HTTP_1_SERIAL_CLIENT_REQUEST_PROCESSING_TIME =
"http-clients.serial.http1.request-processing-time";
public static final String CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_WAITING_TIME =
"http-clients.serial.http2.request-queue-waiting-time";
public static final String CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_PROCESSING_TIME =
"http-clients.serial.http2.request-processing-time";
public static final String CONSUMER_SENDER_HTTP_1_BATCH_CLIENT_REQUEST_QUEUE_WAITING_TIME =
"http-clients.batch.http1.request-queue-waiting-time";
public static final String CONSUMER_SENDER_HTTP_1_BATCH_CLIENT_REQUEST_PROCESSING_TIME =
"http-clients.batch.http1.request-processing-time";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import pl.allegro.tech.hermes.consumers.consumer.sender.http.HttpHeadersProvidersFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.http.HttpMessageBatchSenderFactory;
import pl.allegro.tech.hermes.consumers.consumer.sender.http.HttpRequestFactoryProvider;
import pl.allegro.tech.hermes.consumers.consumer.sender.http.JettyHttpClientMetrics;
import pl.allegro.tech.hermes.consumers.consumer.sender.http.JettyHttpMessageSenderProvider;
import pl.allegro.tech.hermes.consumers.consumer.sender.http.SendingResultHandlers;
import pl.allegro.tech.hermes.consumers.consumer.sender.http.SslContextFactoryProvider;
Expand All @@ -52,6 +53,7 @@
import pl.allegro.tech.hermes.consumers.consumer.sender.resolver.InterpolatingEndpointAddressResolver;
import pl.allegro.tech.hermes.consumers.consumer.sender.timeout.FutureAsyncTimeout;
import pl.allegro.tech.hermes.consumers.consumer.trace.MetadataAppender;
import pl.allegro.tech.hermes.metrics.HermesTimer;

import java.io.IOException;
import java.util.List;
Expand All @@ -61,10 +63,10 @@

@Configuration
@EnableConfigurationProperties({
SslContextProperties.class,
HttpClientsMonitoringProperties.class,
SenderAsyncTimeoutProperties.class,
BatchProperties.class
SslContextProperties.class,
HttpClientsMonitoringProperties.class,
SenderAsyncTimeoutProperties.class,
BatchProperties.class
})
public class ConsumerSenderConfiguration {

Expand All @@ -76,9 +78,19 @@ public Http1ClientProperties http1SerialClientProperties() {

@Bean(name = "http1-serial-client")
public HttpClient http1SerialClient(
HttpClientsFactory httpClientsFactory,
@Named("http1-serial-client-parameters") Http1ClientParameters http1ClientParameters) {
return httpClientsFactory.createClientForHttp1("jetty-http1-serial-client", http1ClientParameters);
HttpClientsFactory httpClientsFactory,
@Named("http1-serial-client-parameters") Http1ClientParameters http1ClientParameters,
MetricsFacade metricsFacade
) {
HttpClient client = httpClientsFactory.createClientForHttp1("jetty-http1-serial-client", http1ClientParameters);
if (http1ClientParameters.isRequestProcessingMonitoringEnabled()) {
var metrics = metricsFacade.consumerSender();
enrichWithMetrics(
client, metrics.http1SerialClientRequestQueueWaitingTimer(),
metrics.http1SerialClientRequestProcessingTimer()
);
}
return client;
}

@Bean(name = "http2-serial-client-parameters")
Expand All @@ -89,12 +101,22 @@ public Http2ClientProperties http2SerialClientProperties() {

@Bean
public Http2ClientHolder http2ClientHolder(
HttpClientsFactory httpClientsFactory,
@Named("http2-serial-client-parameters") Http2ClientProperties http2ClientProperties) {
HttpClientsFactory httpClientsFactory,
@Named("http2-serial-client-parameters") Http2ClientProperties http2ClientProperties,
MetricsFacade metricsFacade
) {
if (!http2ClientProperties.isEnabled()) {
return new Http2ClientHolder(null);
} else {
return new Http2ClientHolder(httpClientsFactory.createClientForHttp2("jetty-http2-serial-client", http2ClientProperties));
HttpClient client = httpClientsFactory.createClientForHttp2("jetty-http2-serial-client", http2ClientProperties);
if (http2ClientProperties.isRequestProcessingMonitoringEnabled()) {
var metrics = metricsFacade.consumerSender();
enrichWithMetrics(
client, metrics.http2SerialClientRequestQueueWaitingTimer(),
metrics.http2SerialClientRequestProcessingTimer()
);
}
return new Http2ClientHolder(client);
}
}

Expand All @@ -106,9 +128,18 @@ public Http1ClientProperties http1BatchClientProperties() {

@Bean(name = "http1-batch-client")
public HttpClient http1BatchClient(
HttpClientsFactory httpClientsFactory,
@Named("http1-batch-client-parameters") Http1ClientParameters http1ClientParameters) {
return httpClientsFactory.createClientForHttp1("jetty-http1-batch-client", http1ClientParameters);
HttpClientsFactory httpClientsFactory,
@Named("http1-batch-client-parameters") Http1ClientParameters http1ClientParameters,
MetricsFacade metricsFacade) {
HttpClient client = httpClientsFactory.createClientForHttp1("jetty-http1-batch-client", http1ClientParameters);
if (http1ClientParameters.isRequestProcessingMonitoringEnabled()) {
var metrics = metricsFacade.consumerSender();
enrichWithMetrics(
client, metrics.http1BatchClientRequestQueueWaitingTimer(),
metrics.http1BatchClientRequestProcessingTimer()
);
}
return client;
}

@Bean(name = "oauth-http-client")
Expand All @@ -128,8 +159,8 @@ public MessageBatchSenderFactory httpMessageBatchSenderFactory(SendingResultHand
BatchHttpRequestFactory batchHttpRequestFactory
) {
return new HttpMessageBatchSenderFactory(
resultHandlers,
batchHttpRequestFactory);
resultHandlers,
batchHttpRequestFactory);
}

@Bean(initMethod = "start")
Expand All @@ -139,12 +170,12 @@ public HttpClientsWorkloadReporter httpClientsWorkloadReporter(MetricsFacade met
Http2ClientHolder http2ClientHolder,
HttpClientsMonitoringProperties monitoringProperties) {
return new HttpClientsWorkloadReporter(
metrics,
http1SerialClient,
http1BatchClient,
http2ClientHolder,
monitoringProperties.isRequestQueueMonitoringEnabled(),
monitoringProperties.isConnectionPoolMonitoringEnabled());
metrics,
http1SerialClient,
http1BatchClient,
http2ClientHolder,
monitoringProperties.isRequestQueueMonitoringEnabled(),
monitoringProperties.isConnectionPoolMonitoringEnabled());
}

@Bean(destroyMethod = "closeProviders")
Expand All @@ -162,15 +193,15 @@ public ProtocolMessageSenderProvider jettyHttpMessageSenderProvider(@Named("http
SendingResultHandlers sendingResultHandlers,
HttpRequestFactoryProvider requestFactoryProvider) {
return new JettyHttpMessageSenderProvider(
httpClient,
http2ClientHolder,
endpointAddressResolver,
metadataAppender,
authorizationProviderFactory,
httpHeadersProviderFactory,
sendingResultHandlers,
requestFactoryProvider,
ImmutableSet.of("http", "https")
httpClient,
http2ClientHolder,
endpointAddressResolver,
metadataAppender,
authorizationProviderFactory,
httpHeadersProviderFactory,
sendingResultHandlers,
requestFactoryProvider,
ImmutableSet.of("http", "https")
);
}

Expand Down Expand Up @@ -224,21 +255,21 @@ public MetadataAppender<Message> jmsMetadataAppender() {

@Bean(name = "defaultPubSubMessageSenderProvider")
public ProtocolMessageSenderProvider pubSubMessageSenderProvider(
GooglePubSubSenderTargetResolver targetResolver,
CredentialsProvider credentialsProvider,
ExecutorProvider executorProvider,
RetrySettings retrySettings,
BatchingSettings batchingSettings,
GooglePubSubMessageTransformerCreator googlePubSubMessageTransformerCreator,
TransportChannelProvider transportChannelProvider) {
GooglePubSubSenderTargetResolver targetResolver,
CredentialsProvider credentialsProvider,
ExecutorProvider executorProvider,
RetrySettings retrySettings,
BatchingSettings batchingSettings,
GooglePubSubMessageTransformerCreator googlePubSubMessageTransformerCreator,
TransportChannelProvider transportChannelProvider) {
return new GooglePubSubMessageSenderProvider(
targetResolver,
credentialsProvider,
executorProvider,
retrySettings,
batchingSettings,
transportChannelProvider,
googlePubSubMessageTransformerCreator
targetResolver,
credentialsProvider,
executorProvider,
retrySettings,
batchingSettings,
transportChannelProvider,
googlePubSubMessageTransformerCreator
);
}

Expand All @@ -263,10 +294,18 @@ public EndpointAddressResolver interpolatingEndpointAddressResolver(UriInterpola
public FutureAsyncTimeout futureAsyncTimeoutFactory(InstrumentedExecutorServiceFactory executorFactory,
SenderAsyncTimeoutProperties senderAsyncTimeoutProperties) {
ScheduledExecutorService timeoutExecutorService = executorFactory.getScheduledExecutorService(
"async-timeout",
senderAsyncTimeoutProperties.getThreadPoolSize(),
senderAsyncTimeoutProperties.isThreadPoolMonitoringEnabled()
"async-timeout",
senderAsyncTimeoutProperties.getThreadPoolSize(),
senderAsyncTimeoutProperties.isThreadPoolMonitoringEnabled()
);
return new FutureAsyncTimeout(timeoutExecutorService);
}

private static void enrichWithMetrics(
HttpClient client, HermesTimer requestQueueWaitingTimer, HermesTimer requestProcessingTimer
) {
client.getRequestListeners().addListener(
new JettyHttpClientMetrics(requestQueueWaitingTimer, requestProcessingTimer)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class Http1ClientProperties implements Http1ClientParameters {

private Duration connectionTimeout = Duration.ofSeconds(15);

private boolean requestProcessingMonitoringEnabled = false;

@Override
public int getThreadPoolSize() {
Expand Down Expand Up @@ -83,4 +84,13 @@ public Duration getConnectionTimeout() {
public void setConnectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

@Override
public boolean isRequestProcessingMonitoringEnabled() {
return requestProcessingMonitoringEnabled;
}

public void setRequestProcessingMonitoringEnabled(boolean requestProcessingMonitoringEnabled) {
this.requestProcessingMonitoringEnabled = requestProcessingMonitoringEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class Http2ClientProperties implements Http2ClientParameters {

private Duration connectionTimeout = Duration.ofSeconds(15);

private boolean requestProcessingMonitoringEnabled = false;

public boolean isEnabled() {
return enabled;
}
Expand Down Expand Up @@ -81,4 +83,13 @@ public Duration getConnectionTimeout() {
public void setConnectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

@Override
public boolean isRequestProcessingMonitoringEnabled() {
return this.requestProcessingMonitoringEnabled;
}

public void setRequestProcessingMonitoringEnabled(boolean requestProcessingMonitoringEnabled) {
this.requestProcessingMonitoringEnabled = requestProcessingMonitoringEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ public interface HttpClientParameters {
int getMaxRequestsQueuedPerDestination();

Duration getConnectionTimeout();

boolean isRequestProcessingMonitoringEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,24 @@ public class HttpClientsFactory {
private final SslContextFactoryProvider sslContextFactoryProvider;

public HttpClientsFactory(
InstrumentedExecutorServiceFactory executorFactory,
SslContextFactoryProvider sslContextFactoryProvider) {
InstrumentedExecutorServiceFactory executorFactory,
SslContextFactoryProvider sslContextFactoryProvider
) {
this.executorFactory = executorFactory;
this.sslContextFactoryProvider = sslContextFactoryProvider;
}

public HttpClient createClientForHttp1(String name, Http1ClientParameters http1ClientParameters) {
ClientConnector clientConnector = new ClientConnector();
sslContextFactoryProvider.provideSslContextFactory()
.ifPresent(clientConnector::setSslContextFactory);
.ifPresent(clientConnector::setSslContextFactory);
HttpClientTransportOverHTTP transport = new HttpClientTransportOverHTTP(clientConnector);
HttpClient client = new HttpClient(transport);

ExecutorService executor = executorFactory.getExecutorService(
name,
http1ClientParameters.getThreadPoolSize(),
http1ClientParameters.isThreadPoolMonitoringEnabled());
name,
http1ClientParameters.getThreadPoolSize(),
http1ClientParameters.isThreadPoolMonitoringEnabled());
client.setExecutor(executor);
client.setMaxConnectionsPerDestination(http1ClientParameters.getMaxConnectionsPerDestination());
client.setMaxRequestsQueuedPerDestination(http1ClientParameters.getMaxRequestsQueuedPerDestination());
Expand All @@ -46,16 +47,16 @@ public HttpClient createClientForHttp1(String name, Http1ClientParameters http1C
public HttpClient createClientForHttp2(String name, Http2ClientParameters http2ClientParameters) {
ClientConnector clientConnector = new ClientConnector();
sslContextFactoryProvider.provideSslContextFactory()
.ifPresentOrElse(clientConnector::setSslContextFactory,
() -> {
throw new IllegalStateException("Cannot create http/2 client due to lack of ssl context factory");
});
.ifPresentOrElse(clientConnector::setSslContextFactory,
() -> {
throw new IllegalStateException("Cannot create http/2 client due to lack of ssl context factory");
});
HTTP2Client http2Client = new HTTP2Client(clientConnector);

ExecutorService executor = executorFactory.getExecutorService(
name,
http2ClientParameters.getThreadPoolSize(),
http2ClientParameters.isThreadPoolMonitoringEnabled());
name,
http2ClientParameters.getThreadPoolSize(),
http2ClientParameters.isThreadPoolMonitoringEnabled());
http2Client.setExecutor(executor);

HttpClientTransportOverHTTP2 transport = new HttpClientTransportOverHTTP2(http2Client);
Expand Down
Loading
Loading