diff --git a/.github/workflows/checkstyle.yml b/.github/workflows/checkstyle.yml index 8d99e40474..afe90e1e74 100644 --- a/.github/workflows/checkstyle.yml +++ b/.github/workflows/checkstyle.yml @@ -10,17 +10,19 @@ jobs: checkstyle: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 - uses: reviewdog/action-setup@v1 with: reviewdog_version: latest - name: Set up JDK 17 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: 17 distribution: 'temurin' + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 - name: Run check style # ignore lengthy console setup tasks run: ./gradlew --continue clean checkstyleMain checkstyleTest checkstyleIntegration checkstyleJmh -PmaxCheckstyleWarnings=0 -x attachHermesConsole -x prepareIndexTemplate diff --git a/.github/workflows/ci-console.yml b/.github/workflows/ci-console.yml index a4c6f41272..c104c881e9 100644 --- a/.github/workflows/ci-console.yml +++ b/.github/workflows/ci-console.yml @@ -12,9 +12,9 @@ jobs: working-directory: ./hermes-console steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup node - uses: actions/setup-node@v3 + uses: actions/setup-node@v4 with: node-version: 18 - name: Run linter diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2faf77b4af..badc1e1f12 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,22 +19,16 @@ jobs: fail-fast: false name: ${{ matrix.tasks.alias }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 - - uses: actions/cache@v3 - with: - path: | - ~/.gradle/caches - ~/.gradle/wrapper - key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} - restore-keys: | - ${{ runner.os }}-gradle- - name: Set up JDK 17 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: 17 distribution: 'temurin' + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 - name: Build with Gradle run: ./gradlew assemble - name: Run task with Gradle diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 56c5ff5084..c43d90c2db 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -35,17 +35,17 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up JDK 17 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: 17 distribution: 'temurin' # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v1 + uses: github/codeql-action/init@v2 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -56,7 +56,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v1 + uses: github/codeql-action/autobuild@v2 # ℹī¸ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -70,4 +70,4 @@ jobs: # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml index d3c1263959..eda69f9d27 100644 --- a/.github/workflows/gradle-wrapper-validation.yml +++ b/.github/workflows/gradle-wrapper-validation.yml @@ -12,5 +12,5 @@ jobs: name: "Validation" runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: gradle/wrapper-validation-action@v1 diff --git a/.github/workflows/markdown-links-check.yml b/.github/workflows/markdown-links-check.yml index 6920411bd5..4d186efa1e 100644 --- a/.github/workflows/markdown-links-check.yml +++ b/.github/workflows/markdown-links-check.yml @@ -12,7 +12,7 @@ jobs: check-links: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: gaurav-nelson/github-action-markdown-link-check@v1 with: use-quiet-mode: 'yes' diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index c6485311cc..9d0bec195f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,15 +16,16 @@ jobs: environment: ci steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 - - uses: gradle/wrapper-validation-action@v1 - name: Set up JDK - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: '17' distribution: 'temurin' + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 - name: Release if: github.ref == 'refs/heads/master' run: ./gradlew release -Prelease.customPassword=${GITHUB_TOKEN} -Prelease.customUsername=${GITHUB_ACTOR} -Prelease.forceVersion=${FORCE_VERSION} diff --git a/.github/workflows/test_report.yml b/.github/workflows/test_report.yml index 2b52cbedb8..20d78c3a41 100644 --- a/.github/workflows/test_report.yml +++ b/.github/workflows/test_report.yml @@ -8,14 +8,14 @@ jobs: name: "Test report" runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Grant execute permission for report downloader run: chmod +x ./.github/scripts/download_reports.sh - name: Download past reports run: ./.github/scripts/download_reports.sh env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - uses: actions/setup-python@v4 + - uses: actions/setup-python@v5 with: python-version: '3.10' - name: Aggregate reports diff --git a/build.gradle b/build.gradle index be812a0157..02013d9f32 100644 --- a/build.gradle +++ b/build.gradle @@ -52,22 +52,22 @@ allprojects { guava : '23.0', jackson : '2.15.2', jersey : '3.1.2', - jetty : '11.0.15', + jetty : '12.0.5', curator : '5.4.0', dropwizard_metrics: '4.1.0', micrometer_metrics: '1.11.1', - wiremock : '3.0.1', + wiremock : '3.3.1', spock : '2.4-M1-groovy-4.0', groovy : '4.0.12', avro : '1.9.1', json2avro : '0.2.14', okhttp : '3.9.1', undertow : '2.0.29.Final', - spring_web : '6.0.8', + spring_web : '6.1.2', failsafe : '2.3.1', junit_jupiter : '5.8.2', testcontainers : '1.18.1', - spring : '3.0.6', + spring : '3.2.1', assertj : '3.22.0' ] diff --git a/docs/docs/user/publishing.md b/docs/docs/user/publishing.md index 16ba94f0ff..034d2ce623 100644 --- a/docs/docs/user/publishing.md +++ b/docs/docs/user/publishing.md @@ -168,3 +168,7 @@ buffered events. Default implementation uses [OpenHFT ChronicleMap](https://github.com/OpenHFT/Chronicle-Map) to persist unsent messages to disk. Map structure is continuously persisted to disk, as it is stored in offheap memory as [memory mapped file](https://en.wikipedia.org/wiki/Memory-mapped_file). + +## Partition assignment +`Partition-Key` header can be used by publishers to specify Kafka `key` which will be used for partition assignment for a message. This will ensure +that all messages with given `Partition-Key` will be sent to the same Kafka partition. diff --git a/docs/docs/user/subscribing.md b/docs/docs/user/subscribing.md index df0321155e..15ce53350e 100644 --- a/docs/docs/user/subscribing.md +++ b/docs/docs/user/subscribing.md @@ -39,22 +39,17 @@ Minimal request: All options: -Option | Description | Default value ------------------------------------- | ----------------------------------------------------| ------------- -trackingMode | track outgoing messages | trackingOff -subscriptionPolicy.rate | maximum sending speed in rps (per DC) | 400 -subscriptionPolicy.messageTtl | inflight Time To Live in seconds | 3600 -subscriptionPolicy.messageBackoff | backoff time between retry attempts in millis | 1000 -subscriptionPolicy.retryClientErrors | retry on receiving 4xx status | false -subscriptionPolicy.requestTimeout | request timeout in millis | 1000 -subscriptionPolicy.socketTimeout | maximum time of inactivity between two data packets | infinity -subscriptionPolicy.inflightSize | max number of pending requests | 100 -subscriptionPolicy.backoffMultiplier | backoff multiplier for calculating message backoff | 1 -subscriptionPolicy.backoffMaxIntervalInSec | maximal retry backoff in seconds | 600 -headers | additional HTTP request headers | [] (array of headers) -filters | used for skipping unwanted messages | [] (array of filters) -endpointAddressResolverMetadata | additional address resolver metadata | {} (map) -subscriptionIdentityHeadersEnabled | attach HTTP headers with subscription identity | false +| Option | Description | Default value | +|------------------------------------|---------------------------------------------------------------------------------------|------------------------------------------------------| +| trackingMode | track outgoing messages | trackingOff | +| contentType | delivered message format (JSON or BATCH) | JSON | +| deliveryType | delivery type (SERIAL or BATCH) | SERIAL | +| subscriptionPolicy | see [delivery types](#delivery-type) | [serial](#serial-delivery), [batch](#batch-delivery) | +| mode | whether to send message to single (ANYCAST) or all (BROADCAST) subscription endpoints | ANYCAST | +| headers | additional HTTP request headers | [] (array of headers) | +| filters | used for skipping unwanted messages | [] (array of filters) | +| endpointAddressResolverMetadata | additional address resolver metadata | {} (map) | +| subscriptionIdentityHeadersEnabled | attach HTTP headers with subscription identity | false | Possible values for **trackingMode** are: @@ -76,6 +71,7 @@ Request that specifies all available options: "id": "My Team" }, "contact": "my-team@my-company.com", + "deliveryType": "SERIAL", "subscriptionPolicy": { "rate": 100, "messageTtl": 3600, @@ -87,6 +83,7 @@ Request that specifies all available options: "backoffMultiplier": 1.0, "backoffMaxIntervalInSec": 600 }, + "mode": "ANYCAST", "headers": [ {"name": "SOME_HEADER", "value": "ABC"}, {"name": "OTHER_HEADER", "value": "123"} @@ -141,6 +138,66 @@ no matter how many times Hermes would try to deliver it. This behavior can be ch flag on subscription. When this flag is set to true, message with response code **4xx** will be retried, also causing slowing down overall sending speed. See [back pressure section](#back-pressure) for more details. +## Content type + +Hermes can deliver messages to subscribers in two formats: + +- JSON - if topic content type was of type JSON or AVRO +- AVRO - if topic content type was of type AVRO + +## Delivery type + +Hermes supports two delivery types: SERIAL and BATCH. + +### Serial delivery +With serial delivery, each hermes consumer will process at most `inflightSize` messages concurrently. Messages are sent individually +(number of published messages = number of messages sent to subscriber). + + +Options for `subscriptionPolicy`: + +| Option | Description | Default value | +|-------------------------|-----------------------------------------------------|---------------| +| rate | maximum sending speed in rps (per DC) | 400 | +| messageTtl | inflight Time To Live in seconds | 3600 | +| messageBackoff | backoff time between retry attempts in millis | 1000 | +| retryClientErrors | retry on receiving 4xx status | false | +| requestTimeout | request timeout in millis | 1000 | +| socketTimeout | maximum time of inactivity between two data packets | infinity | +| inflightSize | max number of pending requests | 100 | +| backoffMultiplier | backoff multiplier for calculating message backoff | 1 | +| backoffMaxIntervalInSec | maximal retry backoff in seconds | 600 | + + +### Batch delivery +With batch delivery hermes consumer aggregates messages in a batch before sending the batch to the subscriber. There are 3 +configurable thresholds that determine when the batch will be ready to be sent - number of messages in the batch, duration of the batch +and size of the batch in bytes. Batch is considered ready whenever one of these thresholds is surpassed. + +Messages will be sent as an array of JSON messages, e.g. + +```json +[{"foo": "bar1"}, {"foo": "bar2"}, ...] +``` + +Options for `subscriptionPolicy`: + +| Option | Description | Default value | +|-------------------------|-----------------------------------------------------------------|---------------| +| messageTtl | inflight Time To Live in seconds | 3600 | +| messageBackoff | backoff time between retry attempts in millis | 500 | +| retryClientErrors | retry on receiving 4xx status | false | +| requestTimeout | request timeout in millis | 30000 | +| batchSize | maximum number of messages in a batch | 100 | +| batchTime | maximum duration in millis for which messages can be aggregated | 30000 | +| batchVolume | maximum batch size in bytes | 64000 | + +#### Limitations +Following subscription options are not available with batch delivery: + +- AVRO subscription contentType +- BROADCAST mode + ## Retries Hermes Consumers have been optimized towards maximizing chances of successful message delivery. Retry policy is @@ -178,12 +235,11 @@ current_backoff = previous_backoff * backoff_multiplier ``` This has the following consequences: -Backoff multiplier | Retry policy type ----------------------------------|-------------------------------- -1 | Constant retry backoff - above 1 | Exponential retry backoff - - +| Backoff multiplier | Retry policy type | +|--------------------|---------------------------| +| 1 | Constant retry backoff | +| above 1 | Exponential retry backoff | + The hard limit to current backoff is defined by maximum backoff parameter and by default is equal to 600 s. It is worth mentioning that the calculation of current backoff is ignored when the **Retry-After** header is used. @@ -261,6 +317,17 @@ It's ignored by the default implementation. See [console integration](../configuration/console.md#subscription-configuration) for more information. +## Mode + +Hermes supports two delivery modes: + +- ANYCAST - messages will be sent to endpoint returned by `EndpointAddressResolver#resolve` +- BROADCAST - messages will be sent to endpoint returned by `EndpointAddressResolver#resolveAll` + +Example usage of this feature would be to provide `EndpointAddressResolver` implementation which returns any subscriber address (e.g. single +service instance) for `resolve` and all subscriber addresses for `resolveAll` (e.g. all instances of a service). ANYCAST subscription messages +would then be delivered to any subscribing service instance and BROADCAST subscription messages would then be delivered to all subscribing service instances. + ## Message filtering Each subscription can define set of filters that are going to be applied after receiving message from kafka in order @@ -271,10 +338,10 @@ of their declaration. This mainly concerns message content type. Filtering is done *before* any conversion takes place so all messages have the same content type as topic on which they were published. -Topic content-type | Filter type ---------------------- | ----------- -avro | avropath -json | jsonpath +| Topic content-type | Filter type | +|--------------------|-------------| +| avro | avropath | +| json | jsonpath | ### Matching strategy @@ -304,12 +371,12 @@ In case when `matchingStrategy` would be set to `any` then all messages with *GB JsonPath filter is based on popular [library](https://github.com/jayway/JsonPath) of the same name that can query json documents. In this case it is used as a selector to retrieve value that is later matched by regexp. -Option | Description ---------------------- | --------------------------------------------------- -type | type of filter -path | JsonPath expression to query json document -matcher | regexp expression to match value from json document -matchingStrategy | type of matching strategy. Default is `all` +| Option | Description | +|------------------|-----------------------------------------------------| +| type | type of filter | +| path | JsonPath expression to query json document | +| matcher | regexp expression to match value from json document | +| matchingStrategy | type of matching strategy. Default is `all` | Example: ``` @@ -323,12 +390,12 @@ avro so we decided to introduce very simple dotted path format without any advan understand if you're familiar with JsonPath. Right now array and basic selectors that point to specific fields are supported. -Option | Description ---------------------- | --------------------------------------------------- -type | type of filter -path | dotted expression to query avro document. When array selector is used then wildcard sign `*` can be used as index -matcher | regexp expression to match value from avro document -matchingStrategy | type of matching strategy. Default is `all` +| Option | Description | +|------------------|-------------------------------------------------------------------------------------------------------------------| +| type | type of filter | +| path | dotted expression to query avro document. When array selector is used then wildcard sign `*` can be used as index | +| matcher | regexp expression to match value from avro document | +| matchingStrategy | type of matching strategy. Default is `all` | Example: ``` @@ -664,3 +731,14 @@ It returns array of message tracking information in following format: Sending delay can be defined for each serial subscription. Consumers will wait for a given time before trying to deliver a message. This might be useful in situations when there are multiple topics that sends events in the same time, but you want to increase chance that events from one topic will be delivered later than events from another topic. + +## Ordering guarantees +For subscriptions with `SERIAL` deliveryType hermes will deliver `inflightSize` messages concurrently. +Because of that messages may be delivered out of partition order (unless `inflightSize=1` but this can have poor performance). + +With `BATCH` deliveryType messages are guaranteed to be delivered in partition order (batches are sent sequentially). + +Note that by default Hermes does not give any guarantees about assigning messages to partitions. To do that, publishers must specify [partition key explicitly](publishing.md#partition-assignment). + +When messages are published with `parition-key` and consumed with `BATCH` mode (or `SERIAL` with `inflightSize=1`) they will be ordered as long as they were published to one DC. +Publishing messages with same `parition-key` to multiple DCs does not guarantee ordering because messages are stored in separate kafka clusters. diff --git a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java index c6dc6d21df..c0961fa63c 100644 --- a/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java +++ b/hermes-benchmark/src/jmh/java/pl/allegro/tech/hermes/benchmark/environment/HermesServerFactory.java @@ -15,6 +15,7 @@ import pl.allegro.tech.hermes.frontend.config.SchemaProperties; import pl.allegro.tech.hermes.frontend.config.SslProperties; import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners; +import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.publishing.handlers.HandlersChainFactory; import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter; @@ -85,6 +86,9 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite HandlersChainProperties handlersChainProperties = new HandlersChainProperties(); TrackingHeadersExtractor trackingHeadersExtractor = new DefaultTrackingHeaderExtractor(); SchemaProperties schemaProperties = new SchemaProperties(); + BrokerLatencyReporter brokerLatencyReporter = new BrokerLatencyReporter( + false, null, null, null + ); return new HandlersChainFactory( topicsCache, @@ -107,7 +111,8 @@ private static HttpHandler provideHttpHandler(ThroughputLimiter throughputLimite throughputLimiter, null, false, - handlersChainProperties + handlersChainProperties, + brokerLatencyReporter ).provide(); } } diff --git a/hermes-client/build.gradle b/hermes-client/build.gradle index 4839a6e378..01d5625a15 100644 --- a/hermes-client/build.gradle +++ b/hermes-client/build.gradle @@ -25,6 +25,7 @@ dependencies { testImplementation group: 'org.glassfish.jersey.core', name: 'jersey-client', version: versions.jersey testImplementation group: 'org.glassfish.jersey.inject', name: 'jersey-hk2', version: versions.jersey testImplementation group: 'org.springframework', name: 'spring-web', version: versions.spring_web + testImplementation group: 'org.springframework', name: 'spring-context', version: versions.spring_web testImplementation group: 'org.springframework', name: 'spring-webflux', version: versions.spring_web testImplementation group: 'com.squareup.okhttp3', name: 'okhttp', version: versions.okhttp testImplementation group: 'io.projectreactor.netty', name: 'reactor-netty', version: '1.0.25' diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/BrokerMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/BrokerMetrics.java new file mode 100644 index 0000000000..bbfeac1554 --- /dev/null +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/BrokerMetrics.java @@ -0,0 +1,25 @@ +package pl.allegro.tech.hermes.common.metric; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import pl.allegro.tech.hermes.api.Topic; + +import java.time.Duration; + +public class BrokerMetrics { + private final MeterRegistry meterRegistry; + + public BrokerMetrics(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + + public void recordBrokerLatency(String broker, Topic.Ack ack, Duration duration) { + Timer.builder("broker.latency") + .tag("broker", broker) + .tag("ack", ack.name()) + .publishPercentileHistogram() + .maximumExpectedValue(Duration.ofSeconds(5)) + .register(meterRegistry) + .record(duration); + } +} diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java index 43ace4994b..e3adfe3abc 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/MetricsFacade.java @@ -42,6 +42,7 @@ public class MetricsFacade { private final ConsumerSenderMetrics consumerSenderMetrics; private final OffsetCommitsMetrics offsetCommitsMetrics; private final MaxRateMetrics maxRateMetrics; + private final BrokerMetrics brokerMetrics; public MetricsFacade(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) { this.meterRegistry = meterRegistry; @@ -60,6 +61,7 @@ public MetricsFacade(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) { this.consumerSenderMetrics = new ConsumerSenderMetrics(hermesMetrics, meterRegistry); this.offsetCommitsMetrics = new OffsetCommitsMetrics(hermesMetrics, meterRegistry); this.maxRateMetrics = new MaxRateMetrics(hermesMetrics, meterRegistry); + this.brokerMetrics = new BrokerMetrics(meterRegistry); } public TopicMetrics topics() { @@ -118,6 +120,10 @@ public MaxRateMetrics maxRate() { return maxRateMetrics; } + public BrokerMetrics broker() { + return brokerMetrics; + } + public void unregisterAllMetricsRelatedTo(SubscriptionName subscription) { Collection meters = Search.in(meterRegistry) .tags(subscriptionTags(subscription)) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/executor/InstrumentedExecutorServiceFactory.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/executor/InstrumentedExecutorServiceFactory.java index a9b03796b3..f137e1cf7f 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/executor/InstrumentedExecutorServiceFactory.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/executor/InstrumentedExecutorServiceFactory.java @@ -11,6 +11,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + public class InstrumentedExecutorServiceFactory { private final ThreadPoolMetrics threadPoolMetrics; @@ -21,8 +22,12 @@ public InstrumentedExecutorServiceFactory(ThreadPoolMetrics threadPoolMetrics) { } public ExecutorService getExecutorService(String name, int size, boolean monitoringEnabled) { + return getExecutorService(name, size, monitoringEnabled, Integer.MAX_VALUE); + } + + public ExecutorService getExecutorService(String name, int size, boolean monitoringEnabled, int queueCapacity) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-executor-%d").build(); - ThreadPoolExecutor executor = newFixedThreadPool(name, size, threadFactory); + ThreadPoolExecutor executor = newFixedThreadPool(name, size, threadFactory, queueCapacity); executor.prestartAllCoreThreads(); if (monitoringEnabled) { @@ -51,23 +56,21 @@ private void monitor(String threadPoolName, ThreadPoolExecutor executor) { threadPoolMetrics.createGauges(threadPoolName, executor); } - /** - * Copy of {@link java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)}. + * Copy of {@link java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)} + * with configurable queue capacity. */ - private ThreadPoolExecutor newFixedThreadPool(String executorName, int size, ThreadFactory threadFactory) { + private ThreadPoolExecutor newFixedThreadPool(String executorName, int size, ThreadFactory threadFactory, int queueCapacity) { ThreadPoolExecutor executor = new ThreadPoolExecutor( size, size, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), + new LinkedBlockingQueue<>(queueCapacity), threadFactory, getMeteredRejectedExecutionHandler(executorName) ); - return executor; - } RejectedExecutionHandler getMeteredRejectedExecutionHandler(String executorName) { diff --git a/hermes-consumers/build.gradle b/hermes-consumers/build.gradle index 44c66d5dc3..292efde6fe 100644 --- a/hermes-consumers/build.gradle +++ b/hermes-consumers/build.gradle @@ -17,7 +17,7 @@ dependencies { api group: 'org.springframework.boot', name: 'spring-boot-starter', version: versions.spring api group: 'org.eclipse.jetty', name: 'jetty-alpn-java-client', version: versions.jetty - api group: 'org.eclipse.jetty.http2', name: 'http2-http-client-transport', version: versions.jetty + api group: 'org.eclipse.jetty.http2', name: 'jetty-http2-client-transport', version: versions.jetty implementation group: 'org.jctools', name: 'jctools-core', version: '1.2' diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java index ee80dfab6e..668b8ef8a3 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/ConsumerSenderConfiguration.java @@ -11,7 +11,7 @@ import com.google.common.collect.ImmutableSet; import jakarta.inject.Named; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/oauth/client/OAuthHttpClient.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/oauth/client/OAuthHttpClient.java index 338085a1b0..cc0dc18849 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/oauth/client/OAuthHttpClient.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/oauth/client/OAuthHttpClient.java @@ -2,9 +2,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.entity.ContentType; +import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; @@ -40,21 +40,28 @@ private Request createHttpRequest(OAuthTokenRequest request) { .timeout(request.getRequestTimeout(), TimeUnit.MILLISECONDS) .idleTimeout(request.getSocketTimeout(), TimeUnit.MILLISECONDS) .method(HttpMethod.POST) - .header(HttpHeader.KEEP_ALIVE, "true") - .header(HttpHeader.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.toString()) - .param(OAuthTokenRequest.Param.GRANT_TYPE, request.getGrantType()) - .param(OAuthTokenRequest.Param.SCOPE, request.getScope()) - .param(OAuthTokenRequest.Param.CLIENT_ID, request.getClientId()) - .param(OAuthTokenRequest.Param.CLIENT_SECRET, request.getClientSecret()); + .headers(headers -> { + headers.add(HttpHeader.KEEP_ALIVE, "true"); + headers.add(HttpHeader.CONTENT_TYPE, ContentType.APPLICATION_FORM_URLENCODED.toString()); + }); + addParamIfNotNull(httpRequest, OAuthTokenRequest.Param.GRANT_TYPE, request.getGrantType()); + addParamIfNotNull(httpRequest, OAuthTokenRequest.Param.SCOPE, request.getScope()); + addParamIfNotNull(httpRequest, OAuthTokenRequest.Param.CLIENT_ID, request.getClientId()); + addParamIfNotNull(httpRequest, OAuthTokenRequest.Param.CLIENT_SECRET, request.getClientSecret()); if (OAuthTokenRequest.GrantTypeValue.RESOURCE_OWNER_USERNAME_PASSWORD.equals(request.getGrantType())) { - httpRequest - .param(OAuthTokenRequest.Param.USERNAME, request.getUsername()) - .param(OAuthTokenRequest.Param.PASSWORD, request.getPassword()); + addParamIfNotNull(httpRequest, OAuthTokenRequest.Param.USERNAME, request.getUsername()); + addParamIfNotNull(httpRequest, OAuthTokenRequest.Param.PASSWORD, request.getPassword()); } return httpRequest; } + private void addParamIfNotNull(Request request, String name, String value) { + if (value != null) { + request.param(name, value); + } + } + private ContentResponse performHttpRequest(OAuthTokenRequest request) { try { return createHttpRequest(request).send(); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSendingResult.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSendingResult.java index c5458fbaee..6ce56cfa0a 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSendingResult.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/MessageSendingResult.java @@ -1,6 +1,6 @@ package pl.allegro.tech.hermes.consumers.consumer.sender; -import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.Result; import pl.allegro.tech.hermes.consumers.consumer.sender.resolver.EndpointAddressResolutionException; import java.net.URI; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/SingleMessageSendingResult.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/SingleMessageSendingResult.java index 0690740b76..5ca3c26236 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/SingleMessageSendingResult.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/SingleMessageSendingResult.java @@ -3,7 +3,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import jakarta.ws.rs.core.Response; -import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.Result; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/BatchHttpRequestFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/BatchHttpRequestFactory.java index b5d31e5f07..737576ba15 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/BatchHttpRequestFactory.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/BatchHttpRequestFactory.java @@ -1,6 +1,6 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch; import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.HttpRequestHeaders; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultBatchHttpRequestFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultBatchHttpRequestFactory.java index a2bd0ab2f8..5a76d1646d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultBatchHttpRequestFactory.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultBatchHttpRequestFactory.java @@ -1,8 +1,8 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; +import org.eclipse.jetty.client.ByteBufferRequestContent; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.util.ByteBufferContentProvider; +import org.eclipse.jetty.client.Request; import org.eclipse.jetty.http.HttpMethod; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch; @@ -19,16 +19,11 @@ public DefaultBatchHttpRequestFactory(HttpClient client) { } public Request buildRequest(MessageBatch message, URI uri, HttpRequestHeaders headers, int requestTimeout) { - Request request = client.newRequest(uri) + return client.newRequest(uri) .method(HttpMethod.POST) .timeout(requestTimeout, TimeUnit.MILLISECONDS) - .content(new ByteBufferContentProvider(message.getContent())); - - - headers.asMap() - .forEach(request::header); - - return request; + .body(new ByteBufferRequestContent(message.getContent())) + .headers(httpHeaders -> headers.asMap().forEach(httpHeaders::add)); } private static HttpClient started(HttpClient httpClient) { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpMetadataAppender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpMetadataAppender.java index a982c5c814..804eebe17c 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpMetadataAppender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpMetadataAppender.java @@ -1,6 +1,6 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.trace.MetadataAppender; @@ -8,7 +8,7 @@ public class DefaultHttpMetadataAppender implements MetadataAppender { @Override public Request append(Request target, Message message) { - message.getExternalMetadata().forEach(target::header); + target.headers(httpFields -> message.getExternalMetadata().forEach(httpFields::add)); return target; } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpRequestFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpRequestFactory.java index 65e3886f7b..db1a88d7c9 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpRequestFactory.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpRequestFactory.java @@ -1,8 +1,8 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; +import org.eclipse.jetty.client.BytesRequestContent; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.client.Request; import org.eclipse.jetty.http.HttpMethod; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.HttpRequestHeaders; @@ -46,12 +46,8 @@ private class HttpRequestBuilder { } Request build() { - Request request = baseRequest(); - - headers.asMap() - .forEach(request::header); - - return request; + return baseRequest() + .headers(httpHeaders -> headers.asMap().forEach(httpHeaders::add)); } private Request baseRequest() { @@ -59,7 +55,7 @@ private Request baseRequest() { .method(HttpMethod.POST) .timeout(timeout, TimeUnit.MILLISECONDS) .idleTimeout(socketTimeout, TimeUnit.MILLISECONDS) - .content(new BytesContentProvider(message.getData())); + .body(new BytesRequestContent(message.getData())); metadataAppender.append(baseRequest, message); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpRequestFactoryProvider.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpRequestFactoryProvider.java index 6b34c2a983..6cba998e03 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpRequestFactoryProvider.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultHttpRequestFactoryProvider.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.consumers.consumer.trace.MetadataAppender; @@ -19,6 +19,4 @@ public HttpRequestFactory provideRequestFactory(Subscription subscription, metadataAppender ); } - - } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultSendingResultHandlers.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultSendingResultHandlers.java index d7c25c94ed..1d435a62f2 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultSendingResultHandlers.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/DefaultSendingResultHandlers.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.ContentResponse; +import org.eclipse.jetty.client.Response; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult; import pl.allegro.tech.hermes.consumers.consumer.sender.SingleMessageSendingResult; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsFactory.java index 362c018496..fb99474a4d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsFactory.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsFactory.java @@ -1,9 +1,10 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; +import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP; +import org.eclipse.jetty.http.HttpCookieStore; import org.eclipse.jetty.http2.client.HTTP2Client; -import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; +import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.util.HttpCookieStore; import pl.allegro.tech.hermes.common.metric.ConsumerSenderMetrics; @@ -17,8 +18,8 @@ public class HttpClientsFactory { private final SslContextFactoryProvider sslContextFactoryProvider; public HttpClientsFactory( - InstrumentedExecutorServiceFactory executorFactory, - SslContextFactoryProvider sslContextFactoryProvider + InstrumentedExecutorServiceFactory executorFactory, + SslContextFactoryProvider sslContextFactoryProvider ) { this.executorFactory = executorFactory; this.sslContextFactoryProvider = sslContextFactoryProvider; @@ -27,18 +28,18 @@ public HttpClientsFactory( public HttpClient createClientForHttp1(String name, Http1ClientParameters http1ClientParameters) { ClientConnector clientConnector = new ClientConnector(); sslContextFactoryProvider.provideSslContextFactory() - .ifPresent(clientConnector::setSslContextFactory); + .ifPresent(clientConnector::setSslContextFactory); HttpClientTransportOverHTTP transport = new HttpClientTransportOverHTTP(clientConnector); HttpClient client = new HttpClient(transport); ExecutorService executor = executorFactory.getExecutorService( - name, - http1ClientParameters.getThreadPoolSize(), - http1ClientParameters.isThreadPoolMonitoringEnabled()); + name, + http1ClientParameters.getThreadPoolSize(), + http1ClientParameters.isThreadPoolMonitoringEnabled()); client.setExecutor(executor); client.setMaxConnectionsPerDestination(http1ClientParameters.getMaxConnectionsPerDestination()); client.setMaxRequestsQueuedPerDestination(http1ClientParameters.getMaxRequestsQueuedPerDestination()); - client.setCookieStore(new HttpCookieStore.Empty()); + client.setHttpCookieStore(new HttpCookieStore.Empty()); client.setIdleTimeout(http1ClientParameters.getIdleTimeout().toMillis()); client.setFollowRedirects(http1ClientParameters.isFollowRedirectsEnabled()); client.setConnectTimeout(http1ClientParameters.getConnectionTimeout().toMillis()); @@ -48,23 +49,23 @@ public HttpClient createClientForHttp1(String name, Http1ClientParameters http1C public HttpClient createClientForHttp2(String name, Http2ClientParameters http2ClientParameters) { ClientConnector clientConnector = new ClientConnector(); sslContextFactoryProvider.provideSslContextFactory() - .ifPresentOrElse(clientConnector::setSslContextFactory, - () -> { - throw new IllegalStateException("Cannot create http/2 client due to lack of ssl context factory"); - }); + .ifPresentOrElse(clientConnector::setSslContextFactory, + () -> { + throw new IllegalStateException("Cannot create http/2 client due to lack of ssl context factory"); + }); HTTP2Client http2Client = new HTTP2Client(clientConnector); ExecutorService executor = executorFactory.getExecutorService( - name, - http2ClientParameters.getThreadPoolSize(), - http2ClientParameters.isThreadPoolMonitoringEnabled()); + name, + http2ClientParameters.getThreadPoolSize(), + http2ClientParameters.isThreadPoolMonitoringEnabled()); http2Client.setExecutor(executor); HttpClientTransportOverHTTP2 transport = new HttpClientTransportOverHTTP2(http2Client); HttpClient client = new HttpClient(transport); client.setMaxRequestsQueuedPerDestination(http2ClientParameters.getMaxRequestsQueuedPerDestination()); - client.setCookieStore(new HttpCookieStore.Empty()); + client.setHttpCookieStore(new HttpCookieStore.Empty()); client.setIdleTimeout(http2ClientParameters.getIdleTimeout().toMillis()); client.setFollowRedirects(http2ClientParameters.isFollowRedirectsEnabled()); client.setConnectTimeout(http2ClientParameters.getConnectionTimeout().toMillis()); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsWorkloadReporter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsWorkloadReporter.java index f6d36ba6ec..2692c3d7c5 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsWorkloadReporter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsWorkloadReporter.java @@ -3,8 +3,8 @@ import org.eclipse.jetty.client.ConnectionPool; import org.eclipse.jetty.client.DuplexConnectionPool; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.transport.HttpDestination; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import java.util.Queue; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpRequestFactory.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpRequestFactory.java index dcd432d4c3..d31e7354a8 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpRequestFactory.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpRequestFactory.java @@ -1,6 +1,6 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.HttpRequestHeaders; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpRequestFactoryProvider.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpRequestFactoryProvider.java index 7e0ffe4408..7cb84395a9 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpRequestFactoryProvider.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpRequestFactoryProvider.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.consumers.consumer.trace.MetadataAppender; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java index 07ec0a5536..3b1607d3be 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSender.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; import com.google.common.collect.ImmutableList; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastResponseListener.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastResponseListener.java index 980ea7a3f8..a0be25467c 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastResponseListener.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastResponseListener.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.BufferingResponseListener; +import org.eclipse.jetty.client.Result; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult; import pl.allegro.tech.hermes.consumers.consumer.sender.SingleMessageSendingResult; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpMessageSenderProvider.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpMessageSenderProvider.java index ab0eaf966b..809e456c4a 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpMessageSenderProvider.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyHttpMessageSenderProvider.java @@ -2,7 +2,7 @@ import com.google.common.collect.ImmutableSet; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.EndpointAddress; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageBatchSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageBatchSender.java index 21ef73498b..386a449feb 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageBatchSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageBatchSender.java @@ -2,8 +2,8 @@ import org.apache.http.entity.ContentType; import org.apache.http.protocol.HTTP; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.ContentResponse; +import org.eclipse.jetty.client.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.EndpointAddress; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSender.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSender.java index 03b7a36264..dc2690af06 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSender.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyMessageSender.java @@ -1,6 +1,6 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.sender.CompletableFutureAwareMessageSender; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyResponseListener.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyResponseListener.java index e6c2bdfc40..fd6d7bb6d4 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyResponseListener.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyResponseListener.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.BufferingResponseListener; +import org.eclipse.jetty.client.Result; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult; import java.util.concurrent.CompletableFuture; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/SendingResultHandlers.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/SendingResultHandlers.java index d8b252f4bb..80fdc90f59 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/SendingResultHandlers.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/sender/http/SendingResultHandlers.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Response.CompleteListener; +import org.eclipse.jetty.client.ContentResponse; +import org.eclipse.jetty.client.Response.CompleteListener; import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult; import pl.allegro.tech.hermes.consumers.consumer.sender.SingleMessageSendingResult; diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsWorkloadReporterTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsWorkloadReporterTest.groovy index e573c84281..8f653948f8 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsWorkloadReporterTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/HttpClientsWorkloadReporterTest.groovy @@ -1,8 +1,8 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http import org.eclipse.jetty.client.HttpClient -import org.eclipse.jetty.client.HttpDestination -import org.eclipse.jetty.client.HttpExchange +import org.eclipse.jetty.client.transport.HttpDestination +import org.eclipse.jetty.client.transport.HttpExchange import pl.allegro.tech.hermes.common.metric.MetricsFacade import spock.lang.Specification diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy index b565f11801..7445574d47 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/consumer/sender/http/JettyBroadCastMessageSenderTest.groovy @@ -2,17 +2,17 @@ package pl.allegro.tech.hermes.consumers.consumer.sender.http import com.github.tomakehurst.wiremock.WireMockServer import org.eclipse.jetty.client.HttpClient -import org.eclipse.jetty.util.HttpCookieStore +import org.eclipse.jetty.http.HttpCookieStore import pl.allegro.tech.hermes.api.EndpointAddress import pl.allegro.tech.hermes.api.EndpointAddressResolverMetadata import pl.allegro.tech.hermes.api.Subscription import pl.allegro.tech.hermes.api.SubscriptionName import pl.allegro.tech.hermes.consumers.consumer.Message +import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender import pl.allegro.tech.hermes.consumers.consumer.rate.ConsumerRateLimiter import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSendingResult import pl.allegro.tech.hermes.consumers.consumer.sender.MultiMessageSendingResult -import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.AuthHeadersProvider import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.HermesHeadersProvider import pl.allegro.tech.hermes.consumers.consumer.sender.http.headers.Http1HeadersProvider @@ -61,7 +61,7 @@ class JettyBroadCastMessageSenderTest extends Specification { wireMockServers.forEach { it.start() } client = new HttpClient() - client.setCookieStore(new HttpCookieStore.Empty()) + client.setHttpCookieStore(new HttpCookieStore.Empty()) client.setConnectTimeout(1000) client.setIdleTimeout(1000) client.start() diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/BrokerLatencyReporterConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/BrokerLatencyReporterConfiguration.java new file mode 100644 index 0000000000..6a257099f3 --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/BrokerLatencyReporterConfiguration.java @@ -0,0 +1,35 @@ +package pl.allegro.tech.hermes.frontend.config; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import pl.allegro.tech.hermes.common.metric.MetricsFacade; +import pl.allegro.tech.hermes.common.metric.executor.InstrumentedExecutorServiceFactory; +import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter; + +import java.util.concurrent.ExecutorService; + + +@Configuration +@EnableConfigurationProperties(BrokerLatencyReporterProperties.class) +public class BrokerLatencyReporterConfiguration { + + @Bean + BrokerLatencyReporter brokerLatencyReporter(BrokerLatencyReporterProperties properties, + MetricsFacade metricsFacade, + InstrumentedExecutorServiceFactory executorServiceFactory) { + ExecutorService executorService = executorServiceFactory.getExecutorService( + "broker-latency-reporter", + properties.getThreadPoolSize(), + true, + properties.getThreadPoolQueueCapacity() + ); + + return new BrokerLatencyReporter( + properties.isEnabled(), + metricsFacade, + properties.getSlowResponseLoggingThreshold(), + executorService + ); + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/BrokerLatencyReporterProperties.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/BrokerLatencyReporterProperties.java new file mode 100644 index 0000000000..1688a8b271 --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/BrokerLatencyReporterProperties.java @@ -0,0 +1,45 @@ +package pl.allegro.tech.hermes.frontend.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.time.Duration; + +@ConfigurationProperties(prefix = "frontend.broker-latency-reporter") +public class BrokerLatencyReporterProperties { + private boolean enabled; + private Duration slowResponseLoggingThreshold = Duration.ofMillis(100); + private int threadPoolSize = 8; + private int threadPoolQueueCapacity = 1_000_000; + + public int getThreadPoolSize() { + return threadPoolSize; + } + + public void setThreadPoolSize(int threadPoolSize) { + this.threadPoolSize = threadPoolSize; + } + + public int getThreadPoolQueueCapacity() { + return threadPoolQueueCapacity; + } + + public void setThreadPoolQueueCapacity(int threadPoolQueueCapacity) { + this.threadPoolQueueCapacity = threadPoolQueueCapacity; + } + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public Duration getSlowResponseLoggingThreshold() { + return slowResponseLoggingThreshold; + } + + public void setSlowResponseLoggingThreshold(Duration slowResponseLoggingThreshold) { + this.slowResponseLoggingThreshold = slowResponseLoggingThreshold; + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendPublishingConfiguration.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendPublishingConfiguration.java index 8931afc318..596ffc43ac 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendPublishingConfiguration.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FrontendPublishingConfiguration.java @@ -10,6 +10,7 @@ import pl.allegro.tech.hermes.domain.topic.preview.MessagePreviewRepository; import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners; +import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.publishing.handlers.HandlersChainFactory; import pl.allegro.tech.hermes.frontend.publishing.handlers.ThroughputLimiter; @@ -48,10 +49,11 @@ public HttpHandler httpHandler(TopicsCache topicsCache, MessageErrorProcessor me MessageEndProcessor messageEndProcessor, MessageFactory messageFactory, BrokerMessageProducer brokerMessageProducer, MessagePreviewLog messagePreviewLog, ThroughputLimiter throughputLimiter, Optional authConfig, - MessagePreviewProperties messagePreviewProperties, HandlersChainProperties handlersChainProperties) { + MessagePreviewProperties messagePreviewProperties, HandlersChainProperties handlersChainProperties, + BrokerLatencyReporter brokerLatencyReporter) { return new HandlersChainFactory(topicsCache, messageErrorProcessor, messageEndProcessor, messageFactory, brokerMessageProducer, messagePreviewLog, throughputLimiter, authConfig, messagePreviewProperties.isEnabled(), - handlersChainProperties).provide(); + handlersChainProperties, brokerLatencyReporter).provide(); } @Bean diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerLatencyReporter.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerLatencyReporter.java new file mode 100644 index 0000000000..797d5e6a8f --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/BrokerLatencyReporter.java @@ -0,0 +1,66 @@ +package pl.allegro.tech.hermes.frontend.producer; + +import jakarta.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.metric.MetricsFacade; +import pl.allegro.tech.hermes.frontend.publishing.message.Message; +import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata; +import pl.allegro.tech.hermes.metrics.HermesTimerContext; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; + +public class BrokerLatencyReporter { + + private static final Logger logger = LoggerFactory.getLogger(BrokerLatencyReporter.class); + + private final boolean perBrokerLatencyEnabled; + private final MetricsFacade metricsFacade; + private final Duration slowResponseThreshold; + private final ExecutorService reporterExecutorService; + + public BrokerLatencyReporter(boolean perBrokerLatencyEnabled, + MetricsFacade metricsFacade, + Duration slowResponseThreshold, + ExecutorService reporterExecutorService) { + this.perBrokerLatencyEnabled = perBrokerLatencyEnabled; + this.metricsFacade = metricsFacade; + this.slowResponseThreshold = slowResponseThreshold; + this.reporterExecutorService = reporterExecutorService; + } + + public void report(HermesTimerContext timerContext, + Message message, + Topic.Ack ack, + @Nullable Supplier produceMetadata) { + Duration duration = timerContext.closeAndGet(); + if (perBrokerLatencyEnabled) { + try { + reporterExecutorService.submit(() -> doReport(duration, message.getId(), ack, produceMetadata)); + } catch (RejectedExecutionException ignored) { + // don't propagate the exception - allow metrics to be dropped if executor is overloaded + // executor service should already be instrumented to meter rejected executions so no action is needed + } + } + + } + + private void doReport(Duration duration, + String messageId, + Topic.Ack ack, + @Nullable Supplier produceMetadata) { + String broker = Optional.ofNullable(produceMetadata).flatMap(metadata -> metadata.get().getBroker()).orElse("unknown"); + + if (duration.compareTo(slowResponseThreshold) > 0) { + logger.debug("Slow produce request, broker response time: {} ms, ackLevel: {}, messageId: {}, broker: {}", + duration.toMillis(), ack, messageId, broker); + } + + metricsFacade.broker().recordBrokerLatency(broker, ack, duration); + } +} diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.java index e400101425..d5461abd44 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaBrokerMessageProducer.java @@ -4,16 +4,19 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InterruptException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.frontend.metric.CachedTopic; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback; import pl.allegro.tech.hermes.frontend.publishing.message.Message; +import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata; import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; @Singleton public class KafkaBrokerMessageProducer implements BrokerMessageProducer { @@ -41,7 +44,7 @@ public void send(Message message, CachedTopic cachedTopic, final PublishingCallb messageConverter.convertToProducerRecord(message, cachedTopic.getKafkaTopics().getPrimary().name()); try { - producers.get(cachedTopic.getTopic()).send(producerRecord, new SendCallback(message, cachedTopic.getTopic(), callback)); + producers.get(cachedTopic.getTopic()).send(producerRecord, new SendCallback(message, cachedTopic, callback)); } catch (Exception e) { // message didn't get to internal producer buffer and it will not be send to a broker callback.onUnpublished(message, cachedTopic.getTopic(), e); @@ -74,6 +77,28 @@ public boolean isTopicAvailable(CachedTopic cachedTopic) { return false; } + private Supplier produceMetadataSupplier(CachedTopic topic, RecordMetadata recordMetadata) { + return () -> { + String kafkaTopicName = topic.getKafkaTopics().getPrimary().name().asString(); + try { + List topicPartitions = producers.get(topic.getTopic()).partitionsFor(kafkaTopicName); + + Optional partitionInfo = topicPartitions.stream() + .filter(p -> p.partition() == recordMetadata.partition()) + .findFirst(); + + return partitionInfo.map(partition -> partition.leader().host()) + .map(ProduceMetadata::new) + .orElse(ProduceMetadata.empty()); + } catch (InterruptException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.warn("Could not read information about partitions for topic {}. {}", kafkaTopicName, e.getMessage()); + } + return ProduceMetadata.empty(); + }; + } + private boolean anyPartitionWithoutLeader(List partitionInfos) { return partitionInfos.stream().anyMatch(p -> p.leader() == null); } @@ -86,10 +111,10 @@ private boolean anyUnderReplicatedPartition(List partitionInfos, private class SendCallback implements org.apache.kafka.clients.producer.Callback { private final Message message; - private final Topic topic; + private final CachedTopic topic; private final PublishingCallback callback; - public SendCallback(Message message, Topic topic, PublishingCallback callback) { + public SendCallback(Message message, CachedTopic topic, PublishingCallback callback) { this.message = message; this.topic = topic; this.callback = callback; @@ -97,11 +122,12 @@ public SendCallback(Message message, Topic topic, PublishingCallback callback) { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { + Supplier produceMetadata = produceMetadataSupplier(topic, recordMetadata); if (e == null) { - callback.onPublished(message, topic); + callback.onPublished(message, topic.getTopic(), produceMetadata); producers.maybeRegisterNodeMetricsGauges(metricsFacade); } else { - callback.onUnpublished(message, topic, e); + callback.onUnpublished(message, topic.getTopic(), produceMetadata, e); } } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/PublishingCallback.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/PublishingCallback.java index 0742bea8f5..53a83ccf62 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/PublishingCallback.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/PublishingCallback.java @@ -2,11 +2,21 @@ import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.frontend.publishing.message.Message; +import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata; + +import java.util.function.Supplier; public interface PublishingCallback { void onUnpublished(Message message, Topic topic, Exception exception); + default void onUnpublished(Message message, Topic topic, Supplier produceMetadata, Exception exception) { + onUnpublished(message, topic, exception); + } + void onPublished(Message message, Topic topic); + default void onPublished(Message message, Topic topic, Supplier produceMetadata) { + onPublished(message, topic); + } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/HandlersChainFactory.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/HandlersChainFactory.java index 3818ce95b4..953b9dd0d9 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/HandlersChainFactory.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/HandlersChainFactory.java @@ -8,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache; +import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageEndProcessor; import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageErrorProcessor; @@ -32,12 +33,14 @@ public class HandlersChainFactory { private final ThroughputLimiter throughputLimiter; private final Optional authenticationConfiguration; private final HandlersChainParameters handlersChainParameters; + private final BrokerLatencyReporter brokerLatencyReporter; public HandlersChainFactory(TopicsCache topicsCache, MessageErrorProcessor messageErrorProcessor, MessageEndProcessor messageEndProcessor, MessageFactory messageFactory, BrokerMessageProducer brokerMessageProducer, MessagePreviewLog messagePreviewLog, ThroughputLimiter throughputLimiter, Optional authenticationConfiguration, - boolean messagePreviewEnabled, HandlersChainParameters handlersChainParameters) { + boolean messagePreviewEnabled, HandlersChainParameters handlersChainParameters, + BrokerLatencyReporter brokerLatencyReporter) { this.topicsCache = topicsCache; this.messageErrorProcessor = messageErrorProcessor; this.messageEndProcessor = messageEndProcessor; @@ -48,10 +51,12 @@ public HandlersChainFactory(TopicsCache topicsCache, MessageErrorProcessor messa this.throughputLimiter = throughputLimiter; this.authenticationConfiguration = authenticationConfiguration; this.handlersChainParameters = handlersChainParameters; + this.brokerLatencyReporter = brokerLatencyReporter; } public HttpHandler provide() { - HttpHandler publishing = new PublishingHandler(brokerMessageProducer, messageErrorProcessor, messageEndProcessor); + HttpHandler publishing = new PublishingHandler(brokerMessageProducer, messageErrorProcessor, + messageEndProcessor, brokerLatencyReporter); HttpHandler messageCreateHandler = new MessageCreateHandler(publishing, messageFactory, messageErrorProcessor); HttpHandler timeoutHandler = new TimeoutHandler(messageEndProcessor, messageErrorProcessor); HttpHandler handlerAfterRead = previewEnabled ? new PreviewHandler(messageCreateHandler, previewLog) : messageCreateHandler; diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/PublishingHandler.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/PublishingHandler.java index 40b5ee8dc3..4ca09c80a7 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/PublishingHandler.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/handlers/PublishingHandler.java @@ -3,14 +3,18 @@ import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter; import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer; import pl.allegro.tech.hermes.frontend.publishing.PublishingCallback; import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageEndProcessor; import pl.allegro.tech.hermes.frontend.publishing.handlers.end.MessageErrorProcessor; import pl.allegro.tech.hermes.frontend.publishing.message.Message; import pl.allegro.tech.hermes.frontend.publishing.message.MessageState; +import pl.allegro.tech.hermes.frontend.publishing.metadata.ProduceMetadata; import pl.allegro.tech.hermes.metrics.HermesTimerContext; +import java.util.function.Supplier; + import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCauseMessage; import static pl.allegro.tech.hermes.api.ErrorCode.INTERNAL_ERROR; import static pl.allegro.tech.hermes.api.ErrorDescription.error; @@ -20,12 +24,14 @@ class PublishingHandler implements HttpHandler { private final BrokerMessageProducer brokerMessageProducer; private final MessageErrorProcessor messageErrorProcessor; private final MessageEndProcessor messageEndProcessor; + private final BrokerLatencyReporter brokerBrokerLatencyReporter; PublishingHandler(BrokerMessageProducer brokerMessageProducer, MessageErrorProcessor messageErrorProcessor, - MessageEndProcessor messageEndProcessor) { + MessageEndProcessor messageEndProcessor, BrokerLatencyReporter brokerLatencyReporter) { this.brokerMessageProducer = brokerMessageProducer; this.messageErrorProcessor = messageErrorProcessor; this.messageEndProcessor = messageEndProcessor; + this.brokerBrokerLatencyReporter = brokerLatencyReporter; } @Override @@ -51,9 +57,9 @@ private void handle(HttpServerExchange exchange) { // called from kafka producer thread @Override - public void onPublished(Message message, Topic topic) { + public void onPublished(Message message, Topic topic, Supplier produceMetadata) { exchange.getConnection().getWorker().execute(() -> { - brokerLatencyTimers.close(); + brokerBrokerLatencyReporter.report(brokerLatencyTimers, message, topic.getAck(), produceMetadata); if (messageState.setSentToKafka()) { attachment.removeTimeout(); messageEndProcessor.sent(exchange, attachment); @@ -63,15 +69,26 @@ public void onPublished(Message message, Topic topic) { }); } + @Override + public void onPublished(Message message, Topic topic) { + onPublished(message, topic, null); + } + + // in most cases this method should be called from worker thread, // therefore there is no need to switch it to another worker thread @Override - public void onUnpublished(Message message, Topic topic, Exception exception) { + public void onUnpublished(Message message, Topic topic, Supplier produceMetadata, Exception exception) { messageState.setErrorInSendingToKafka(); - brokerLatencyTimers.close(); + brokerBrokerLatencyReporter.report(brokerLatencyTimers, message, topic.getAck(), produceMetadata); attachment.removeTimeout(); handleNotPublishedMessage(exchange, topic, attachment.getMessageId(), exception); } + + @Override + public void onUnpublished(Message message, Topic topic, Exception exception) { + onUnpublished(message, topic, null, exception); + } }); if (messageState.setSendingToKafka() && messageState.setDelayedProcessing()) { diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/metadata/ProduceMetadata.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/metadata/ProduceMetadata.java new file mode 100644 index 0000000000..f57e581d7d --- /dev/null +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/publishing/metadata/ProduceMetadata.java @@ -0,0 +1,22 @@ +package pl.allegro.tech.hermes.frontend.publishing.metadata; + +import jakarta.annotation.Nullable; + +import java.util.Optional; + +public class ProduceMetadata { + @Nullable + private final String broker; + + public ProduceMetadata(@Nullable String broker) { + this.broker = broker; + } + + public Optional getBroker() { + return Optional.ofNullable(broker); + } + + public static ProduceMetadata empty() { + return new ProduceMetadata(null); + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/console/ConsoleConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/console/ConsoleConfiguration.java index 9faa8cabc8..e9f589414e 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/console/ConsoleConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/console/ConsoleConfiguration.java @@ -1,6 +1,9 @@ package pl.allegro.tech.hermes.management.config.console; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; +import org.apache.hc.core5.http.io.SocketConfig; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.context.annotation.Bean; @@ -13,6 +16,8 @@ import pl.allegro.tech.hermes.management.infrastructure.console.HttpConsoleConfigurationRepository; import pl.allegro.tech.hermes.management.infrastructure.console.SpringConfigConsoleConfigurationRepository; +import java.util.concurrent.TimeUnit; + @Configuration @EnableConfigurationProperties({ConsoleConfigProperties.class, ConsoleProperties.class}) public class ConsoleConfiguration { @@ -40,9 +45,18 @@ ConsoleConfigurationRepository consoleConfigurationRepository( } private ConsoleConfigurationRepository httpConsoleConfigurationRepository(ConsoleConfigProperties properties) { - HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(); - requestFactory.setConnectTimeout((int) properties.getHttpClient().getConnectTimeout().toMillis()); - requestFactory.setReadTimeout((int) properties.getHttpClient().getReadTimeout().toMillis()); + var httpClientProperties = properties.getHttpClient(); + var socketConfig = SocketConfig.custom() + .setSoTimeout((int) httpClientProperties.getReadTimeout().toMillis(), TimeUnit.MILLISECONDS) + .build(); + var connectionManager = PoolingHttpClientConnectionManagerBuilder.create() + .setDefaultSocketConfig(socketConfig) + .build(); + var client = HttpClientBuilder.create() + .setConnectionManager(connectionManager) + .build(); + HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(client); + requestFactory.setConnectTimeout(properties.getHttpClient().getConnectTimeout()); RestTemplate restTemplate = new RestTemplate(requestFactory); return new HttpConsoleConfigurationRepository(properties, restTemplate); } diff --git a/hermes-metrics/src/main/java/pl/allegro/tech/hermes/metrics/HermesTimerContext.java b/hermes-metrics/src/main/java/pl/allegro/tech/hermes/metrics/HermesTimerContext.java index 451cd2aec6..b3a39a349b 100644 --- a/hermes-metrics/src/main/java/pl/allegro/tech/hermes/metrics/HermesTimerContext.java +++ b/hermes-metrics/src/main/java/pl/allegro/tech/hermes/metrics/HermesTimerContext.java @@ -4,6 +4,7 @@ import io.micrometer.core.instrument.Timer; import java.io.Closeable; +import java.time.Duration; import java.util.concurrent.TimeUnit; public class HermesTimerContext implements Closeable { @@ -26,8 +27,17 @@ public static HermesTimerContext from(Timer micrometerTimer, com.codahale.metric @Override public void close() { + reportTimer(); + } + + public Duration closeAndGet() { + return Duration.ofNanos(reportTimer()); + } + + private long reportTimer() { long amount = clock.monotonicTime() - startNanos; graphiteTimer.update(amount, TimeUnit.NANOSECONDS); micrometerTimer.record(amount, TimeUnit.NANOSECONDS); + return amount; } } diff --git a/hermes-mock/build.gradle b/hermes-mock/build.gradle index 1bb8e4c147..f88ff3671b 100644 --- a/hermes-mock/build.gradle +++ b/hermes-mock/build.gradle @@ -6,7 +6,7 @@ plugins { dependencies { implementation group: 'junit', name: 'junit', version: '4.11' - api group: 'org.wiremock', name: 'wiremock', version: versions.wiremock + api group: 'org.wiremock', name: 'wiremock-standalone', version: versions.wiremock implementation group: 'com.jayway.awaitility', name: 'awaitility', version: '1.6.1' api group: 'org.apache.avro', name: 'avro', version: versions.avro implementation group: 'tech.allegro.schema.json2avro', name: 'converter', version: versions.json2avro diff --git a/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/HermesMockDefine.java b/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/HermesMockDefine.java index 6d522b1f79..8fadfc3378 100644 --- a/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/HermesMockDefine.java +++ b/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/HermesMockDefine.java @@ -4,9 +4,9 @@ import com.github.tomakehurst.wiremock.matching.ValueMatcher; import com.github.tomakehurst.wiremock.stubbing.StubMapping; import org.apache.avro.Schema; -import org.apache.hc.core5.http.HttpStatus; import pl.allegro.tech.hermes.mock.exchange.Response; import pl.allegro.tech.hermes.mock.matching.ContentMatchers; +import wiremock.org.apache.hc.core5.http.HttpStatus; import java.util.function.Predicate; diff --git a/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/HermesMockQuery.java b/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/HermesMockQuery.java index f8afb82789..9b7af13069 100644 --- a/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/HermesMockQuery.java +++ b/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/HermesMockQuery.java @@ -1,9 +1,9 @@ package pl.allegro.tech.hermes.mock; import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder; -import com.google.common.collect.Lists; import org.apache.avro.Schema; import pl.allegro.tech.hermes.mock.exchange.Request; +import wiremock.com.google.common.collect.Lists; import java.util.List; import java.util.Optional; diff --git a/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/exchange/Response.java b/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/exchange/Response.java index 9703618549..725006af6e 100644 --- a/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/exchange/Response.java +++ b/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/exchange/Response.java @@ -1,8 +1,6 @@ package pl.allegro.tech.hermes.mock.exchange; - - -import org.apache.hc.core5.http.HttpStatus; +import wiremock.org.apache.hc.core5.http.HttpStatus; import java.time.Duration; diff --git a/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/matching/StartsWithPattern.java b/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/matching/StartsWithPattern.java index 64f31af075..2783d91c69 100644 --- a/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/matching/StartsWithPattern.java +++ b/hermes-mock/src/main/java/pl/allegro/tech/hermes/mock/matching/StartsWithPattern.java @@ -1,8 +1,8 @@ package pl.allegro.tech.hermes.mock.matching; -import com.fasterxml.jackson.annotation.JsonProperty; import com.github.tomakehurst.wiremock.matching.MatchResult; import com.github.tomakehurst.wiremock.matching.StringValuePattern; +import wiremock.com.fasterxml.jackson.annotation.JsonProperty; public class StartsWithPattern extends StringValuePattern { diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/frontend/FrontendConfigurationProperties.java b/integration/src/integration/java/pl/allegro/tech/hermes/frontend/FrontendConfigurationProperties.java index 09004a7358..a431f43a4d 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/frontend/FrontendConfigurationProperties.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/frontend/FrontendConfigurationProperties.java @@ -31,4 +31,6 @@ public class FrontendConfigurationProperties { public static String ZOOKEEPER_CONNECTION_STRING = "frontend.zookeeper.clusters.[0].connectionString"; public static String KAFKA_PRODUCER_METADATA_MAX_AGE = "frontend.kafka.producer.metadataMaxAge"; public static String KAFKA_BROKER_LIST = "frontend.kafka.clusters.[0].brokerList"; + public static String BROKER_LATENCY_REPORTER_ENABLED = "frontend.brokerLatencyReporter.enabled"; + } diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/BrokerLatencyReportingTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/BrokerLatencyReportingTest.java new file mode 100644 index 0000000000..d65685f9ed --- /dev/null +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/BrokerLatencyReportingTest.java @@ -0,0 +1,82 @@ +package pl.allegro.tech.hermes.integration; + +import jakarta.ws.rs.client.Client; +import jakarta.ws.rs.client.ClientBuilder; +import jakarta.ws.rs.core.Response; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.frontend.FrontendConfigurationProperties; +import pl.allegro.tech.hermes.integration.env.FrontendStarter; +import pl.allegro.tech.hermes.test.helper.endpoint.HermesPublisher; +import pl.allegro.tech.hermes.test.helper.message.TestMessage; +import pl.allegro.tech.hermes.test.helper.util.Ports; + +import java.util.Arrays; +import java.util.Optional; + +import static jakarta.ws.rs.core.Response.Status.CREATED; +import static pl.allegro.tech.hermes.integration.test.HermesAssertions.assertThat; +import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.randomTopic; + +public class BrokerLatencyReportingTest extends IntegrationTest { + + private static final int FRONTEND_PORT = Ports.nextAvailable(); + private static final String FRONTEND_URL = "http://127.0.0.1:" + FRONTEND_PORT; + private HermesPublisher publisher; + private FrontendStarter frontendStarter; + private final Client client = ClientBuilder.newClient(); + + + @BeforeClass + public void setup() throws Exception { + frontendStarter = FrontendStarter.withCommonIntegrationTestConfig(FRONTEND_PORT); + frontendStarter.overrideProperty( + FrontendConfigurationProperties.KAFKA_BROKER_LIST, kafkaClusterOne.getBootstrapServersForExternalClients() + ); + frontendStarter.overrideProperty( + FrontendConfigurationProperties.ZOOKEEPER_CONNECTION_STRING, hermesZookeeperOne.getConnectionString() + ); + frontendStarter.overrideProperty(FrontendConfigurationProperties.SCHEMA_REPOSITORY_SERVER_URL, schemaRegistry.getUrl()); + frontendStarter.overrideProperty(FrontendConfigurationProperties.METRICS_GRAPHITE_REPORTER_ENABLED, true); + frontendStarter.overrideProperty(FrontendConfigurationProperties.GRAPHITE_PORT, 18023); + frontendStarter.overrideProperty(FrontendConfigurationProperties.BROKER_LATENCY_REPORTER_ENABLED, true); + frontendStarter.start(); + + publisher = new HermesPublisher(FRONTEND_URL); + } + + @AfterClass + public void tearDown() throws Exception { + frontendStarter.stop(); + } + + @Test + public void shouldReportBrokerLatencyMetrics() { + // given + Topic topic = operations.buildTopic(randomTopic("brokerLatency", "topic").build()); + + TestMessage message = TestMessage.of("hello", "world"); + + // when + Response response = publisher.publish(topic.getQualifiedName(), message.body()); + + // then + assertThat(response).hasStatus(CREATED); + wait.until(() -> { + Double metricValue = getMetricValue("hermes_frontend_broker_latency_seconds_count{ack=\"LEADER\",broker=\"localhost\",") + .orElse(0.0); + assertThat(metricValue).isGreaterThan(0.0d); + }, 5); + } + + private Optional getMetricValue(String metricPrefix) { + String metricsResponse = client.target(FRONTEND_URL + "/status/prometheus").request().get(String.class); + return Arrays.stream(metricsResponse.split("\n")) + .filter(metricName -> metricName.startsWith(metricPrefix)) + .findFirst() + .map(line -> line.split(" ")[1]) // metrics have format " " + .map(Double::valueOf); + } +} diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/helper/Waiter.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/helper/Waiter.java index 597ac63708..9a437e7c02 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/helper/Waiter.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/helper/Waiter.java @@ -102,6 +102,10 @@ public void until(Runnable runnable) { awaitAtMost(adjust(new Duration(30, TimeUnit.SECONDS))).until(runnable); } + public void until(Runnable runnable, int seconds) { + awaitAtMost(adjust(new Duration(seconds, TimeUnit.SECONDS))).until(runnable); + } + public void untilTopicIsUpdatedAfter(final long currentTime, Topic topic, String subscription) { until(Duration.TEN_SECONDS, topic, subscription, sub -> sub.getSignalTimesheet().getOrDefault(UPDATE_TOPIC, 0L) > currentTime);