From 803d10069e994606bc65bb61166be058301c0006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20R=C5=BCysko?= Date: Wed, 13 Dec 2023 12:40:41 +0100 Subject: [PATCH] Fix sending the messageId header to subscribers (#1794) --- ...KafkaConsumerRecordToMessageConverter.java | 7 ++-- .../receiver/kafka/KafkaHeaderExtractor.java | 32 ++++++++++++------- .../hermes/frontend/buffer/BackupMessage.java | 3 +- .../integration/PublishingAvroTest.java | 27 ++++++++++++++++ 4 files changed, 52 insertions(+), 17 deletions(-) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaConsumerRecordToMessageConverter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaConsumerRecordToMessageConverter.java index 537d128de3..a6c6f2297a 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaConsumerRecordToMessageConverter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaConsumerRecordToMessageConverter.java @@ -38,11 +38,8 @@ public Message convertToMessage(ConsumerRecord record, long part KafkaTopic kafkaTopic = topics.get(record.topic()); UnwrappedMessageContent unwrappedContent = messageContentReader.read(record, kafkaTopic.contentType()); - Map externalMetadata = kafkaHeaderExtractor.extractHTTPHeadersIfEnabled(record.headers()); - // compatibility condition, can be removed when all messages have HTTP headers propagated via Kafka headers. - if (externalMetadata.isEmpty()) { - externalMetadata = unwrappedContent.getMessageMetadata().getExternalMetadata(); - } + Map externalMetadataFromBody = unwrappedContent.getMessageMetadata().getExternalMetadata(); + Map externalMetadata = kafkaHeaderExtractor.extractExternalMetadata(record.headers(), externalMetadataFromBody); return new Message( kafkaHeaderExtractor.extractMessageId(record.headers()), diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaHeaderExtractor.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaHeaderExtractor.java index f283b52498..01d2952e73 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaHeaderExtractor.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaHeaderExtractor.java @@ -4,12 +4,13 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import pl.allegro.tech.hermes.common.kafka.HTTPHeadersPropagationAsKafkaHeadersProperties; +import pl.allegro.tech.hermes.common.message.wrapper.AvroMetadataMarker; import pl.allegro.tech.hermes.consumers.config.KafkaHeaderNameProperties; +import java.util.HashMap; import java.util.Map; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toMap; import static java.util.stream.StreamSupport.stream; @@ -53,15 +54,24 @@ public String extractMessageId(Headers headers) { return new String(header.value(), UTF_8); } - public Map extractHTTPHeadersIfEnabled(Headers headers) { - return isHTTPheadersPropagationAsKafkaHeadersEnabled - ? - stream(headers.spliterator(), false) - .filter(h -> h.key().startsWith(httpHeadersPrefix)) - .collect(toMap( - h -> h.key().substring(httpHeadersPrefix.length()), - h -> new String(h.value(), UTF_8))) - : - emptyMap(); + public Map extractExternalMetadata(Headers headers, Map defaultExternalMetadata) { + if (isHTTPheadersPropagationAsKafkaHeadersEnabled) { + Map httpHeaders = stream(headers.spliterator(), false) + .filter(h -> h.key().startsWith(httpHeadersPrefix)) + .collect(toMap( + h -> h.key().substring(httpHeadersPrefix.length()), + h -> new String(h.value(), UTF_8)) + ); + if (httpHeaders.isEmpty()) { + // After completing the migration to the approach with Kafka headers, we should remove this condition. + return defaultExternalMetadata; + } + // The following is necessary to be compatible with building external metadata based on the message body. + // See: pl.allegro.tech.hermes.common.message.wrapper.AvroMessageContentWrapper.getMetadata + Map externalMetadata = new HashMap<>(httpHeaders); + externalMetadata.put(AvroMetadataMarker.METADATA_MESSAGE_ID_KEY.toString(), extractMessageId(headers)); + return externalMetadata; + } + return defaultExternalMetadata; } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessage.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessage.java index b21e6cc12b..81971e7d8d 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessage.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/buffer/BackupMessage.java @@ -2,6 +2,7 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -25,7 +26,7 @@ public BackupMessage(String messageId, byte[] data, long timestamp, String quali this.partitionKey = partitionKey; this.schemaVersion = schemaVersion; this.schemaId = schemaId; - this.propagatedHTTPHeaders = propagatedHTTPHeaders; + this.propagatedHTTPHeaders = propagatedHTTPHeaders == null ? Collections.emptyMap() : propagatedHTTPHeaders; } public String getMessageId() { diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/PublishingAvroTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/PublishingAvroTest.java index c32d9b237a..5974fbc74e 100644 --- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/PublishingAvroTest.java +++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/PublishingAvroTest.java @@ -1,5 +1,6 @@ package pl.allegro.tech.hermes.integration; +import com.github.tomakehurst.wiremock.http.HttpHeaders; import jakarta.ws.rs.client.ClientBuilder; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.client.WebTarget; @@ -446,6 +447,32 @@ public void shouldUpdateSchemaAndUseItImmediately() { }); } + @Test + public void shouldSendMessageIdHeaderToSubscriber() { + // given + Topic topic = randomTopic("sendMessageIdHeaderToSubscriber", "topic").withContentType(AVRO).build(); + operations.buildTopicWithSchema(topicWithSchema(topic, user.getSchemaAsString())); + operations.createSubscription(topic, "subscription", remoteService.getUrl()); + remoteService.expectMessages(user.asJson()); + WebTarget client = ClientBuilder.newClient().target(FRONTEND_URL).path("topics").path(topic.getQualifiedName()); + String traceId = UUID.randomUUID().toString(); + + // when + Response response = client + .request() + .header("Trace-Id", traceId) + .post(Entity.entity(user.asBytes(), MediaType.valueOf("avro/binary"))); + + // then + assertThat(response).hasStatus(Response.Status.CREATED); + remoteService.waitUntilRequestReceived(request -> { + String expectedMessageId = response.getHeaderString("Hermes-Message-Id"); + HttpHeaders headers = request.getHeaders(); + assertThat(headers.getHeader("messageId").firstValue()).isEqualTo(expectedMessageId); + assertThat(headers.getHeader("Trace-Id").firstValue()).isEqualTo(traceId); + }); + } + private void assertBodyDeserializesIntoUser(String body, AvroUser user) { AvroUser avroUser = AvroUser.create(user.getCompiledSchema(), body.getBytes()); assertThat(avroUser.getName()).isEqualTo(user.getName());