From f29ae97be27ea93951d12d9fbb47958082dee5d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20R=C5=BCysko?= Date: Thu, 29 Feb 2024 11:16:00 +0100 Subject: [PATCH] Flag 'fallback to remote datacenter' (#1825) This is a preliminary step to introduce fallback to a remote datacenter in case of any failures with the local Kafka cluster while publishing. The flag introduced here, when set to true, will enable the fallback for a given topic and disable persistent buffers for it. The mechanism respecting this flag will be added as a follow-up. --- .../pl/allegro/tech/hermes/api/Topic.java | 21 +++-- .../tech/hermes/api/TopicWithSchema.java | 24 ++--- .../topic/validator/TopicValidator.java | 8 ++ .../test/helper/builder/TopicBuilder.java | 11 ++- .../management/TopicManagementTest.java | 94 +++++++++++++++++++ 5 files changed, 139 insertions(+), 19 deletions(-) diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java index 069511a0f5..2b1353ccfa 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java @@ -37,6 +37,7 @@ public class Topic { private boolean jsonToAvroDryRunEnabled = false; @NotNull private Ack ack; + private boolean fallbackToRemoteDatacenterEnabled; @NotNull private ContentType contentType; @Min(MIN_MESSAGE_SIZE) @@ -56,7 +57,8 @@ public class Topic { private Instant modifiedAt; public Topic(TopicName name, String description, OwnerId owner, RetentionTime retentionTime, - boolean migratedFromJsonType, Ack ack, boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled, + boolean migratedFromJsonType, Ack ack, boolean fallbackToRemoteDatacenterEnabled, + boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled, @JacksonInject(value = DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, useInput = OptBoolean.TRUE) Boolean schemaIdAwareSerializationEnabled, int maxMessageSize, PublishingAuth publishingAuth, boolean subscribingRestricted, @@ -66,6 +68,7 @@ public Topic(TopicName name, String description, OwnerId owner, RetentionTime re this.owner = owner; this.retentionTime = retentionTime; this.ack = (ack == null ? Ack.LEADER : ack); + this.fallbackToRemoteDatacenterEnabled = fallbackToRemoteDatacenterEnabled; this.trackingEnabled = trackingEnabled; this.migratedFromJsonType = migratedFromJsonType; this.contentType = contentType; @@ -88,6 +91,7 @@ public Topic( @JsonProperty("retentionTime") RetentionTime retentionTime, @JsonProperty("jsonToAvroDryRun") boolean jsonToAvroDryRunEnabled, @JsonProperty("ack") Ack ack, + @JsonProperty("fallbackToRemoteDatacenterEnabled") boolean fallbackToRemoteDatacenterEnabled, @JsonProperty("trackingEnabled") boolean trackingEnabled, @JsonProperty("migratedFromJsonType") boolean migratedFromJsonType, @JsonProperty("schemaIdAwareSerializationEnabled") @@ -103,8 +107,8 @@ public Topic( @JsonProperty("modifiedAt") Instant modifiedAt ) { this(TopicName.fromQualifiedName(qualifiedName), description, owner, retentionTime, migratedFromJsonType, ack, - trackingEnabled, contentType, jsonToAvroDryRunEnabled, schemaIdAwareSerializationEnabled, - maxMessageSize == null ? DEFAULT_MAX_MESSAGE_SIZE : maxMessageSize, + fallbackToRemoteDatacenterEnabled, trackingEnabled, contentType, jsonToAvroDryRunEnabled, + schemaIdAwareSerializationEnabled, maxMessageSize == null ? DEFAULT_MAX_MESSAGE_SIZE : maxMessageSize, publishingAuth == null ? PublishingAuth.disabled() : publishingAuth, subscribingRestricted, offlineStorage == null ? TopicDataOfflineStorage.defaultOfflineStorage() : offlineStorage, @@ -119,9 +123,9 @@ public RetentionTime getRetentionTime() { @Override public int hashCode() { - return Objects.hash(name, description, owner, retentionTime, migratedFromJsonType, trackingEnabled, ack, contentType, - jsonToAvroDryRunEnabled, schemaIdAwareSerializationEnabled, maxMessageSize, - publishingAuth, subscribingRestricted, offlineStorage, labels); + return Objects.hash(name, description, owner, retentionTime, migratedFromJsonType, trackingEnabled, ack, + fallbackToRemoteDatacenterEnabled, contentType, jsonToAvroDryRunEnabled, schemaIdAwareSerializationEnabled, + maxMessageSize, publishingAuth, subscribingRestricted, offlineStorage, labels); } @Override @@ -143,6 +147,7 @@ public boolean equals(Object obj) { && Objects.equals(this.migratedFromJsonType, other.migratedFromJsonType) && Objects.equals(this.schemaIdAwareSerializationEnabled, other.schemaIdAwareSerializationEnabled) && Objects.equals(this.ack, other.ack) + && Objects.equals(this.fallbackToRemoteDatacenterEnabled, other.fallbackToRemoteDatacenterEnabled) && Objects.equals(this.contentType, other.contentType) && Objects.equals(this.maxMessageSize, other.maxMessageSize) && Objects.equals(this.subscribingRestricted, other.subscribingRestricted) @@ -178,6 +183,10 @@ public Ack getAck() { return ack; } + public boolean isFallbackToRemoteDatacenterEnabled() { + return fallbackToRemoteDatacenterEnabled; + } + public ContentType getContentType() { return contentType; } diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicWithSchema.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicWithSchema.java index c3c1a75059..8dafae5495 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicWithSchema.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicWithSchema.java @@ -18,10 +18,10 @@ public class TopicWithSchema extends Topic { public TopicWithSchema(Topic topic, String schema) { this(schema, topic.getQualifiedName(), topic.getDescription(), topic.getOwner(), topic.getRetentionTime(), - topic.isJsonToAvroDryRunEnabled(), topic.getAck(), topic.isTrackingEnabled(), topic.wasMigratedFromJsonType(), - topic.isSchemaIdAwareSerializationEnabled(), topic.getContentType(), topic.getMaxMessageSize(), - topic.getPublishingAuth(), topic.isSubscribingRestricted(), topic.getOfflineStorage(), topic.getLabels(), - topic.getCreatedAt(), topic.getModifiedAt()); + topic.isJsonToAvroDryRunEnabled(), topic.getAck(), topic.isFallbackToRemoteDatacenterEnabled(), + topic.isTrackingEnabled(), topic.wasMigratedFromJsonType(), topic.isSchemaIdAwareSerializationEnabled(), + topic.getContentType(), topic.getMaxMessageSize(), topic.getPublishingAuth(), topic.isSubscribingRestricted(), + topic.getOfflineStorage(), topic.getLabels(), topic.getCreatedAt(), topic.getModifiedAt()); } @JsonCreator @@ -32,6 +32,7 @@ public TopicWithSchema(@JsonProperty("schema") String schema, @JsonProperty("retentionTime") RetentionTime retentionTime, @JsonProperty("jsonToAvroDryRun") boolean jsonToAvroDryRunEnabled, @JsonProperty("ack") Ack ack, + @JsonProperty("fallbackToRemoteDatacenterEnabled") boolean fallbackToRemoteDatacenterEnabled, @JsonProperty("trackingEnabled") boolean trackingEnabled, @JsonProperty("migratedFromJsonType") boolean migratedFromJsonType, @JsonProperty("schemaIdAwareSerializationEnabled") @@ -45,9 +46,10 @@ public TopicWithSchema(@JsonProperty("schema") String schema, @JsonProperty("labels") Set labels, @JsonProperty("createdAt") Instant createdAt, @JsonProperty("modifiedAt") Instant modifiedAt) { - super(qualifiedName, description, owner, retentionTime, jsonToAvroDryRunEnabled, ack, trackingEnabled, - migratedFromJsonType, schemaIdAwareSerializationEnabled, contentType, maxMessageSize, - publishingAuth, subscribingRestricted, offlineStorage, labels, createdAt, modifiedAt); + super(qualifiedName, description, owner, retentionTime, jsonToAvroDryRunEnabled, ack, + fallbackToRemoteDatacenterEnabled, trackingEnabled, migratedFromJsonType, schemaIdAwareSerializationEnabled, + contentType, maxMessageSize, publishingAuth, subscribingRestricted, offlineStorage, labels, createdAt, + modifiedAt); this.topic = convertToTopic(); this.schema = schema; } @@ -62,10 +64,10 @@ public static TopicWithSchema topicWithSchema(Topic topic) { private Topic convertToTopic() { return new Topic(this.getQualifiedName(), this.getDescription(), this.getOwner(), this.getRetentionTime(), - this.isJsonToAvroDryRunEnabled(), this.getAck(), this.isTrackingEnabled(), this.wasMigratedFromJsonType(), - this.isSchemaIdAwareSerializationEnabled(), this.getContentType(), this.getMaxMessageSize(), - this.getPublishingAuth(), this.isSubscribingRestricted(), this.getOfflineStorage(), this.getLabels(), - this.getCreatedAt(), this.getModifiedAt()); + this.isJsonToAvroDryRunEnabled(), this.getAck(), this.isFallbackToRemoteDatacenterEnabled(), + this.isTrackingEnabled(), this.wasMigratedFromJsonType(), this.isSchemaIdAwareSerializationEnabled(), + this.getContentType(), this.getMaxMessageSize(), this.getPublishingAuth(), this.isSubscribingRestricted(), + this.getOfflineStorage(), this.getLabels(), this.getCreatedAt(), this.getModifiedAt()); } public String getSchema() { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/validator/TopicValidator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/validator/TopicValidator.java index 38d86d54bc..39bed768b0 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/validator/TopicValidator.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/validator/TopicValidator.java @@ -40,6 +40,10 @@ public void ensureCreatedTopicIsValid(Topic created, RequestUser createdBy, Crea checkContentType(created); checkTopicLabels(created); + if (created.isFallbackToRemoteDatacenterEnabled() && !createdBy.isAdmin()) { + throw new TopicValidationException("User is not allowed to enable fallback to remote datacenter"); + } + if (created.wasMigratedFromJsonType()) { throw new TopicValidationException("Newly created topic cannot have migratedFromJsonType flag set to true"); } @@ -54,6 +58,10 @@ public void ensureUpdatedTopicIsValid(Topic updated, Topic previous, RequestUser checkOwner(updated); checkTopicLabels(updated); + if (!previous.isFallbackToRemoteDatacenterEnabled() && updated.isFallbackToRemoteDatacenterEnabled() && !modifiedBy.isAdmin()) { + throw new TopicValidationException("User is not allowed to enable fallback to remote datacenter"); + } + if (migrationFromJsonTypeFlagChangedToTrue(updated, previous)) { if (updated.getContentType() != ContentType.AVRO) { throw new TopicValidationException("Change content type to AVRO together with setting migratedFromJsonType flag"); diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/builder/TopicBuilder.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/builder/TopicBuilder.java index 3e0818b759..b4cc5d9e3a 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/builder/TopicBuilder.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/builder/TopicBuilder.java @@ -32,6 +32,8 @@ public class TopicBuilder { private Topic.Ack ack = Topic.Ack.LEADER; + private boolean fallbackToRemoteDatacenterEnabled = false; + private ContentType contentType = ContentType.JSON; private RetentionTime retentionTime = RetentionTime.of(1, TimeUnit.DAYS); @@ -85,8 +87,8 @@ public static TopicBuilder topic(String qualifiedName) { public Topic build() { return new Topic( - name, description, owner, retentionTime, migratedFromJsonType, ack, trackingEnabled, contentType, - jsonToAvroDryRunEnabled, schemaIdAwareSerialization, maxMessageSize, + name, description, owner, retentionTime, migratedFromJsonType, ack, fallbackToRemoteDatacenterEnabled, + trackingEnabled, contentType, jsonToAvroDryRunEnabled, schemaIdAwareSerialization, maxMessageSize, new PublishingAuth(publishers, authEnabled, unauthenticatedAccessEnabled), subscribingRestricted, offlineStorage, labels, null, null ); @@ -122,6 +124,11 @@ public TopicBuilder withAck(Topic.Ack ack) { return this; } + public TopicBuilder withFallbackToRemoteDatacenterEnabled() { + this.fallbackToRemoteDatacenterEnabled = true; + return this; + } + public TopicBuilder withTrackingEnabled(boolean enabled) { this.trackingEnabled = enabled; return this; diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java index c1cc5a085f..0d6574d2be 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java @@ -3,6 +3,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.jayway.awaitility.Duration; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.springframework.test.web.reactive.server.WebTestClient; @@ -16,6 +17,7 @@ import pl.allegro.tech.hermes.api.TopicLabel; import pl.allegro.tech.hermes.api.TopicWithSchema; import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension; +import pl.allegro.tech.hermes.management.TestSecurityProvider; import pl.allegro.tech.hermes.test.helper.avro.AvroUserSchemaLoader; import java.util.Arrays; @@ -43,6 +45,11 @@ public class TopicManagementTest { private static final String SCHEMA = AvroUserSchemaLoader.load().toString(); + @AfterEach + public void cleanup() { + TestSecurityProvider.reset(); + } + @Test public void shouldEmitAuditEventWhenTopicCreated() { //when @@ -591,6 +598,93 @@ public void shouldCreateTopicEvenIfExistsInBrokers() { hermes.api().getTopicResponse(qualifiedTopicName).expectStatus().isOk(); } + @Test + public void shouldNotAllowNonAdminUserCreateTopicWithFallbackToRemoteDatacenterEnabled() { + // given + TestSecurityProvider.setUserIsAdmin(false); + TopicWithSchema topic = topicWithSchema( + topicWithRandomName() + .withFallbackToRemoteDatacenterEnabled() + .build() + ); + hermes.initHelper().createGroup(Group.from(topic.getName().getGroupName())); + + // when + WebTestClient.ResponseSpec response = hermes.api().createTopic(topic); + + //then + response.expectStatus().isBadRequest(); + assertThat(response.expectBody(String.class).returnResult().getResponseBody()) + .contains("User is not allowed to enable fallback to remote datacenter"); + } + + @Test + public void shouldAllowAdminUserCreateTopicWithFallbackToRemoteDatacenterEnabled() { + // given + TestSecurityProvider.setUserIsAdmin(true); + TopicWithSchema topic = topicWithSchema( + topicWithRandomName() + .withFallbackToRemoteDatacenterEnabled() + .build() + ); + hermes.initHelper().createGroup(Group.from(topic.getName().getGroupName())); + + // when + WebTestClient.ResponseSpec response = hermes.api().createTopic(topic); + + //then + response.expectStatus().isCreated(); + } + + @Test + public void shouldNotAllowNonAdminUserToEnableFallbackToRemoteDatacenter() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestSecurityProvider.setUserIsAdmin(false); + PatchData patchData = PatchData.from(ImmutableMap.of("fallbackToRemoteDatacenterEnabled", true)); + + // when + WebTestClient.ResponseSpec response = hermes.api().updateTopic(topic.getQualifiedName(), patchData); + + //then + response.expectStatus().isBadRequest(); + assertThat(response.expectBody(String.class).returnResult().getResponseBody()) + .contains("User is not allowed to enable fallback to remote datacenter"); + } + + @Test + public void shouldAllowAdminUserToEnableFallbackToRemoteDatacenter() { + // given + Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); + TestSecurityProvider.setUserIsAdmin(true); + PatchData patchData = PatchData.from(ImmutableMap.of("fallbackToRemoteDatacenterEnabled", true)); + + // when + WebTestClient.ResponseSpec response = hermes.api().updateTopic(topic.getQualifiedName(), patchData); + + //then + response.expectStatus().isOk(); + } + + @Test + public void shouldAllowNonAdminUserToModifyTopicWithFallbackToRemoteDatacenterEnabled() { + // given + TestSecurityProvider.setUserIsAdmin(true); + Topic topic = hermes.initHelper().createTopic( + topicWithRandomName() + .withFallbackToRemoteDatacenterEnabled() + .build() + ); + TestSecurityProvider.setUserIsAdmin(false); + PatchData patchData = PatchData.from(ImmutableMap.of("description", "new description")); + + // when + WebTestClient.ResponseSpec response = hermes.api().updateTopic(topic.getQualifiedName(), patchData); + + //then + response.expectStatus().isOk(); + } + private static List getGroupTopicsList(String groupName) { return Arrays.stream(Objects.requireNonNull(hermes.api().listTopics(groupName) .expectStatus()