Skip to content

Commit

Permalink
Flag 'fallback to remote datacenter' (#1825)
Browse files Browse the repository at this point in the history
This is a preliminary step to introduce fallback to a remote datacenter in case of any failures with the local Kafka cluster while publishing. The flag introduced here, when set to true, will enable the fallback for a given topic and disable persistent buffers for it. The mechanism respecting this flag will be added as a follow-up.
  • Loading branch information
piotrrzysko authored Feb 29, 2024
1 parent 07f33db commit f29ae97
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 19 deletions.
21 changes: 15 additions & 6 deletions hermes-api/src/main/java/pl/allegro/tech/hermes/api/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class Topic {
private boolean jsonToAvroDryRunEnabled = false;
@NotNull
private Ack ack;
private boolean fallbackToRemoteDatacenterEnabled;
@NotNull
private ContentType contentType;
@Min(MIN_MESSAGE_SIZE)
Expand All @@ -56,7 +57,8 @@ public class Topic {
private Instant modifiedAt;

public Topic(TopicName name, String description, OwnerId owner, RetentionTime retentionTime,
boolean migratedFromJsonType, Ack ack, boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled,
boolean migratedFromJsonType, Ack ack, boolean fallbackToRemoteDatacenterEnabled,
boolean trackingEnabled, ContentType contentType, boolean jsonToAvroDryRunEnabled,
@JacksonInject(value = DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, useInput = OptBoolean.TRUE)
Boolean schemaIdAwareSerializationEnabled,
int maxMessageSize, PublishingAuth publishingAuth, boolean subscribingRestricted,
Expand All @@ -66,6 +68,7 @@ public Topic(TopicName name, String description, OwnerId owner, RetentionTime re
this.owner = owner;
this.retentionTime = retentionTime;
this.ack = (ack == null ? Ack.LEADER : ack);
this.fallbackToRemoteDatacenterEnabled = fallbackToRemoteDatacenterEnabled;
this.trackingEnabled = trackingEnabled;
this.migratedFromJsonType = migratedFromJsonType;
this.contentType = contentType;
Expand All @@ -88,6 +91,7 @@ public Topic(
@JsonProperty("retentionTime") RetentionTime retentionTime,
@JsonProperty("jsonToAvroDryRun") boolean jsonToAvroDryRunEnabled,
@JsonProperty("ack") Ack ack,
@JsonProperty("fallbackToRemoteDatacenterEnabled") boolean fallbackToRemoteDatacenterEnabled,
@JsonProperty("trackingEnabled") boolean trackingEnabled,
@JsonProperty("migratedFromJsonType") boolean migratedFromJsonType,
@JsonProperty("schemaIdAwareSerializationEnabled")
Expand All @@ -103,8 +107,8 @@ public Topic(
@JsonProperty("modifiedAt") Instant modifiedAt
) {
this(TopicName.fromQualifiedName(qualifiedName), description, owner, retentionTime, migratedFromJsonType, ack,
trackingEnabled, contentType, jsonToAvroDryRunEnabled, schemaIdAwareSerializationEnabled,
maxMessageSize == null ? DEFAULT_MAX_MESSAGE_SIZE : maxMessageSize,
fallbackToRemoteDatacenterEnabled, trackingEnabled, contentType, jsonToAvroDryRunEnabled,
schemaIdAwareSerializationEnabled, maxMessageSize == null ? DEFAULT_MAX_MESSAGE_SIZE : maxMessageSize,
publishingAuth == null ? PublishingAuth.disabled() : publishingAuth,
subscribingRestricted,
offlineStorage == null ? TopicDataOfflineStorage.defaultOfflineStorage() : offlineStorage,
Expand All @@ -119,9 +123,9 @@ public RetentionTime getRetentionTime() {

@Override
public int hashCode() {
return Objects.hash(name, description, owner, retentionTime, migratedFromJsonType, trackingEnabled, ack, contentType,
jsonToAvroDryRunEnabled, schemaIdAwareSerializationEnabled, maxMessageSize,
publishingAuth, subscribingRestricted, offlineStorage, labels);
return Objects.hash(name, description, owner, retentionTime, migratedFromJsonType, trackingEnabled, ack,
fallbackToRemoteDatacenterEnabled, contentType, jsonToAvroDryRunEnabled, schemaIdAwareSerializationEnabled,
maxMessageSize, publishingAuth, subscribingRestricted, offlineStorage, labels);
}

@Override
Expand All @@ -143,6 +147,7 @@ public boolean equals(Object obj) {
&& Objects.equals(this.migratedFromJsonType, other.migratedFromJsonType)
&& Objects.equals(this.schemaIdAwareSerializationEnabled, other.schemaIdAwareSerializationEnabled)
&& Objects.equals(this.ack, other.ack)
&& Objects.equals(this.fallbackToRemoteDatacenterEnabled, other.fallbackToRemoteDatacenterEnabled)
&& Objects.equals(this.contentType, other.contentType)
&& Objects.equals(this.maxMessageSize, other.maxMessageSize)
&& Objects.equals(this.subscribingRestricted, other.subscribingRestricted)
Expand Down Expand Up @@ -178,6 +183,10 @@ public Ack getAck() {
return ack;
}

public boolean isFallbackToRemoteDatacenterEnabled() {
return fallbackToRemoteDatacenterEnabled;
}

public ContentType getContentType() {
return contentType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public class TopicWithSchema extends Topic {

public TopicWithSchema(Topic topic, String schema) {
this(schema, topic.getQualifiedName(), topic.getDescription(), topic.getOwner(), topic.getRetentionTime(),
topic.isJsonToAvroDryRunEnabled(), topic.getAck(), topic.isTrackingEnabled(), topic.wasMigratedFromJsonType(),
topic.isSchemaIdAwareSerializationEnabled(), topic.getContentType(), topic.getMaxMessageSize(),
topic.getPublishingAuth(), topic.isSubscribingRestricted(), topic.getOfflineStorage(), topic.getLabels(),
topic.getCreatedAt(), topic.getModifiedAt());
topic.isJsonToAvroDryRunEnabled(), topic.getAck(), topic.isFallbackToRemoteDatacenterEnabled(),
topic.isTrackingEnabled(), topic.wasMigratedFromJsonType(), topic.isSchemaIdAwareSerializationEnabled(),
topic.getContentType(), topic.getMaxMessageSize(), topic.getPublishingAuth(), topic.isSubscribingRestricted(),
topic.getOfflineStorage(), topic.getLabels(), topic.getCreatedAt(), topic.getModifiedAt());
}

@JsonCreator
Expand All @@ -32,6 +32,7 @@ public TopicWithSchema(@JsonProperty("schema") String schema,
@JsonProperty("retentionTime") RetentionTime retentionTime,
@JsonProperty("jsonToAvroDryRun") boolean jsonToAvroDryRunEnabled,
@JsonProperty("ack") Ack ack,
@JsonProperty("fallbackToRemoteDatacenterEnabled") boolean fallbackToRemoteDatacenterEnabled,
@JsonProperty("trackingEnabled") boolean trackingEnabled,
@JsonProperty("migratedFromJsonType") boolean migratedFromJsonType,
@JsonProperty("schemaIdAwareSerializationEnabled")
Expand All @@ -45,9 +46,10 @@ public TopicWithSchema(@JsonProperty("schema") String schema,
@JsonProperty("labels") Set<TopicLabel> labels,
@JsonProperty("createdAt") Instant createdAt,
@JsonProperty("modifiedAt") Instant modifiedAt) {
super(qualifiedName, description, owner, retentionTime, jsonToAvroDryRunEnabled, ack, trackingEnabled,
migratedFromJsonType, schemaIdAwareSerializationEnabled, contentType, maxMessageSize,
publishingAuth, subscribingRestricted, offlineStorage, labels, createdAt, modifiedAt);
super(qualifiedName, description, owner, retentionTime, jsonToAvroDryRunEnabled, ack,
fallbackToRemoteDatacenterEnabled, trackingEnabled, migratedFromJsonType, schemaIdAwareSerializationEnabled,
contentType, maxMessageSize, publishingAuth, subscribingRestricted, offlineStorage, labels, createdAt,
modifiedAt);
this.topic = convertToTopic();
this.schema = schema;
}
Expand All @@ -62,10 +64,10 @@ public static TopicWithSchema topicWithSchema(Topic topic) {

private Topic convertToTopic() {
return new Topic(this.getQualifiedName(), this.getDescription(), this.getOwner(), this.getRetentionTime(),
this.isJsonToAvroDryRunEnabled(), this.getAck(), this.isTrackingEnabled(), this.wasMigratedFromJsonType(),
this.isSchemaIdAwareSerializationEnabled(), this.getContentType(), this.getMaxMessageSize(),
this.getPublishingAuth(), this.isSubscribingRestricted(), this.getOfflineStorage(), this.getLabels(),
this.getCreatedAt(), this.getModifiedAt());
this.isJsonToAvroDryRunEnabled(), this.getAck(), this.isFallbackToRemoteDatacenterEnabled(),
this.isTrackingEnabled(), this.wasMigratedFromJsonType(), this.isSchemaIdAwareSerializationEnabled(),
this.getContentType(), this.getMaxMessageSize(), this.getPublishingAuth(), this.isSubscribingRestricted(),
this.getOfflineStorage(), this.getLabels(), this.getCreatedAt(), this.getModifiedAt());
}

public String getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public void ensureCreatedTopicIsValid(Topic created, RequestUser createdBy, Crea
checkContentType(created);
checkTopicLabels(created);

if (created.isFallbackToRemoteDatacenterEnabled() && !createdBy.isAdmin()) {
throw new TopicValidationException("User is not allowed to enable fallback to remote datacenter");
}

if (created.wasMigratedFromJsonType()) {
throw new TopicValidationException("Newly created topic cannot have migratedFromJsonType flag set to true");
}
Expand All @@ -54,6 +58,10 @@ public void ensureUpdatedTopicIsValid(Topic updated, Topic previous, RequestUser
checkOwner(updated);
checkTopicLabels(updated);

if (!previous.isFallbackToRemoteDatacenterEnabled() && updated.isFallbackToRemoteDatacenterEnabled() && !modifiedBy.isAdmin()) {
throw new TopicValidationException("User is not allowed to enable fallback to remote datacenter");
}

if (migrationFromJsonTypeFlagChangedToTrue(updated, previous)) {
if (updated.getContentType() != ContentType.AVRO) {
throw new TopicValidationException("Change content type to AVRO together with setting migratedFromJsonType flag");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class TopicBuilder {

private Topic.Ack ack = Topic.Ack.LEADER;

private boolean fallbackToRemoteDatacenterEnabled = false;

private ContentType contentType = ContentType.JSON;

private RetentionTime retentionTime = RetentionTime.of(1, TimeUnit.DAYS);
Expand Down Expand Up @@ -85,8 +87,8 @@ public static TopicBuilder topic(String qualifiedName) {

public Topic build() {
return new Topic(
name, description, owner, retentionTime, migratedFromJsonType, ack, trackingEnabled, contentType,
jsonToAvroDryRunEnabled, schemaIdAwareSerialization, maxMessageSize,
name, description, owner, retentionTime, migratedFromJsonType, ack, fallbackToRemoteDatacenterEnabled,
trackingEnabled, contentType, jsonToAvroDryRunEnabled, schemaIdAwareSerialization, maxMessageSize,
new PublishingAuth(publishers, authEnabled, unauthenticatedAccessEnabled), subscribingRestricted,
offlineStorage, labels, null, null
);
Expand Down Expand Up @@ -122,6 +124,11 @@ public TopicBuilder withAck(Topic.Ack ack) {
return this;
}

public TopicBuilder withFallbackToRemoteDatacenterEnabled() {
this.fallbackToRemoteDatacenterEnabled = true;
return this;
}

public TopicBuilder withTrackingEnabled(boolean enabled) {
this.trackingEnabled = enabled;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.jayway.awaitility.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.test.web.reactive.server.WebTestClient;
Expand All @@ -16,6 +17,7 @@
import pl.allegro.tech.hermes.api.TopicLabel;
import pl.allegro.tech.hermes.api.TopicWithSchema;
import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension;
import pl.allegro.tech.hermes.management.TestSecurityProvider;
import pl.allegro.tech.hermes.test.helper.avro.AvroUserSchemaLoader;

import java.util.Arrays;
Expand Down Expand Up @@ -43,6 +45,11 @@ public class TopicManagementTest {

private static final String SCHEMA = AvroUserSchemaLoader.load().toString();

@AfterEach
public void cleanup() {
TestSecurityProvider.reset();
}

@Test
public void shouldEmitAuditEventWhenTopicCreated() {
//when
Expand Down Expand Up @@ -591,6 +598,93 @@ public void shouldCreateTopicEvenIfExistsInBrokers() {
hermes.api().getTopicResponse(qualifiedTopicName).expectStatus().isOk();
}

@Test
public void shouldNotAllowNonAdminUserCreateTopicWithFallbackToRemoteDatacenterEnabled() {
// given
TestSecurityProvider.setUserIsAdmin(false);
TopicWithSchema topic = topicWithSchema(
topicWithRandomName()
.withFallbackToRemoteDatacenterEnabled()
.build()
);
hermes.initHelper().createGroup(Group.from(topic.getName().getGroupName()));

// when
WebTestClient.ResponseSpec response = hermes.api().createTopic(topic);

//then
response.expectStatus().isBadRequest();
assertThat(response.expectBody(String.class).returnResult().getResponseBody())
.contains("User is not allowed to enable fallback to remote datacenter");
}

@Test
public void shouldAllowAdminUserCreateTopicWithFallbackToRemoteDatacenterEnabled() {
// given
TestSecurityProvider.setUserIsAdmin(true);
TopicWithSchema topic = topicWithSchema(
topicWithRandomName()
.withFallbackToRemoteDatacenterEnabled()
.build()
);
hermes.initHelper().createGroup(Group.from(topic.getName().getGroupName()));

// when
WebTestClient.ResponseSpec response = hermes.api().createTopic(topic);

//then
response.expectStatus().isCreated();
}

@Test
public void shouldNotAllowNonAdminUserToEnableFallbackToRemoteDatacenter() {
// given
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
TestSecurityProvider.setUserIsAdmin(false);
PatchData patchData = PatchData.from(ImmutableMap.of("fallbackToRemoteDatacenterEnabled", true));

// when
WebTestClient.ResponseSpec response = hermes.api().updateTopic(topic.getQualifiedName(), patchData);

//then
response.expectStatus().isBadRequest();
assertThat(response.expectBody(String.class).returnResult().getResponseBody())
.contains("User is not allowed to enable fallback to remote datacenter");
}

@Test
public void shouldAllowAdminUserToEnableFallbackToRemoteDatacenter() {
// given
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
TestSecurityProvider.setUserIsAdmin(true);
PatchData patchData = PatchData.from(ImmutableMap.of("fallbackToRemoteDatacenterEnabled", true));

// when
WebTestClient.ResponseSpec response = hermes.api().updateTopic(topic.getQualifiedName(), patchData);

//then
response.expectStatus().isOk();
}

@Test
public void shouldAllowNonAdminUserToModifyTopicWithFallbackToRemoteDatacenterEnabled() {
// given
TestSecurityProvider.setUserIsAdmin(true);
Topic topic = hermes.initHelper().createTopic(
topicWithRandomName()
.withFallbackToRemoteDatacenterEnabled()
.build()
);
TestSecurityProvider.setUserIsAdmin(false);
PatchData patchData = PatchData.from(ImmutableMap.of("description", "new description"));

// when
WebTestClient.ResponseSpec response = hermes.api().updateTopic(topic.getQualifiedName(), patchData);

//then
response.expectStatus().isOk();
}

private static List<String> getGroupTopicsList(String groupName) {
return Arrays.stream(Objects.requireNonNull(hermes.api().listTopics(groupName)
.expectStatus()
Expand Down

0 comments on commit f29ae97

Please sign in to comment.