Skip to content

Commit

Permalink
Fix sending the messageId header to subscribers (#1794)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrrzysko authored Dec 13, 2023
1 parent c489a86 commit 803d100
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ public Message convertToMessage(ConsumerRecord<byte[], byte[]> record, long part
KafkaTopic kafkaTopic = topics.get(record.topic());
UnwrappedMessageContent unwrappedContent = messageContentReader.read(record, kafkaTopic.contentType());

Map<String, String> externalMetadata = kafkaHeaderExtractor.extractHTTPHeadersIfEnabled(record.headers());
// compatibility condition, can be removed when all messages have HTTP headers propagated via Kafka headers.
if (externalMetadata.isEmpty()) {
externalMetadata = unwrappedContent.getMessageMetadata().getExternalMetadata();
}
Map<String, String> externalMetadataFromBody = unwrappedContent.getMessageMetadata().getExternalMetadata();
Map<String, String> externalMetadata = kafkaHeaderExtractor.extractExternalMetadata(record.headers(), externalMetadataFromBody);

return new Message(
kafkaHeaderExtractor.extractMessageId(record.headers()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import pl.allegro.tech.hermes.common.kafka.HTTPHeadersPropagationAsKafkaHeadersProperties;
import pl.allegro.tech.hermes.common.message.wrapper.AvroMetadataMarker;
import pl.allegro.tech.hermes.consumers.config.KafkaHeaderNameProperties;

import java.util.HashMap;
import java.util.Map;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.StreamSupport.stream;

Expand Down Expand Up @@ -53,15 +54,24 @@ public String extractMessageId(Headers headers) {
return new String(header.value(), UTF_8);
}

public Map<String, String> extractHTTPHeadersIfEnabled(Headers headers) {
return isHTTPheadersPropagationAsKafkaHeadersEnabled
?
stream(headers.spliterator(), false)
.filter(h -> h.key().startsWith(httpHeadersPrefix))
.collect(toMap(
h -> h.key().substring(httpHeadersPrefix.length()),
h -> new String(h.value(), UTF_8)))
:
emptyMap();
public Map<String, String> extractExternalMetadata(Headers headers, Map<String, String> defaultExternalMetadata) {
if (isHTTPheadersPropagationAsKafkaHeadersEnabled) {
Map<String, String> httpHeaders = stream(headers.spliterator(), false)
.filter(h -> h.key().startsWith(httpHeadersPrefix))
.collect(toMap(
h -> h.key().substring(httpHeadersPrefix.length()),
h -> new String(h.value(), UTF_8))
);
if (httpHeaders.isEmpty()) {
// After completing the migration to the approach with Kafka headers, we should remove this condition.
return defaultExternalMetadata;
}
// The following is necessary to be compatible with building external metadata based on the message body.
// See: pl.allegro.tech.hermes.common.message.wrapper.AvroMessageContentWrapper.getMetadata
Map<String, String> externalMetadata = new HashMap<>(httpHeaders);
externalMetadata.put(AvroMetadataMarker.METADATA_MESSAGE_ID_KEY.toString(), extractMessageId(headers));
return externalMetadata;
}
return defaultExternalMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

Expand All @@ -25,7 +26,7 @@ public BackupMessage(String messageId, byte[] data, long timestamp, String quali
this.partitionKey = partitionKey;
this.schemaVersion = schemaVersion;
this.schemaId = schemaId;
this.propagatedHTTPHeaders = propagatedHTTPHeaders;
this.propagatedHTTPHeaders = propagatedHTTPHeaders == null ? Collections.emptyMap() : propagatedHTTPHeaders;
}

public String getMessageId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.allegro.tech.hermes.integration;

import com.github.tomakehurst.wiremock.http.HttpHeaders;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.WebTarget;
Expand Down Expand Up @@ -446,6 +447,32 @@ public void shouldUpdateSchemaAndUseItImmediately() {
});
}

@Test
public void shouldSendMessageIdHeaderToSubscriber() {
// given
Topic topic = randomTopic("sendMessageIdHeaderToSubscriber", "topic").withContentType(AVRO).build();
operations.buildTopicWithSchema(topicWithSchema(topic, user.getSchemaAsString()));
operations.createSubscription(topic, "subscription", remoteService.getUrl());
remoteService.expectMessages(user.asJson());
WebTarget client = ClientBuilder.newClient().target(FRONTEND_URL).path("topics").path(topic.getQualifiedName());
String traceId = UUID.randomUUID().toString();

// when
Response response = client
.request()
.header("Trace-Id", traceId)
.post(Entity.entity(user.asBytes(), MediaType.valueOf("avro/binary")));

// then
assertThat(response).hasStatus(Response.Status.CREATED);
remoteService.waitUntilRequestReceived(request -> {
String expectedMessageId = response.getHeaderString("Hermes-Message-Id");
HttpHeaders headers = request.getHeaders();
assertThat(headers.getHeader("messageId").firstValue()).isEqualTo(expectedMessageId);
assertThat(headers.getHeader("Trace-Id").firstValue()).isEqualTo(traceId);
});
}

private void assertBodyDeserializesIntoUser(String body, AvroUser user) {
AvroUser avroUser = AvroUser.create(user.getCompiledSchema(), body.getBytes());
assertThat(avroUser.getName()).isEqualTo(user.getName());
Expand Down

0 comments on commit 803d100

Please sign in to comment.