diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ErrorCode.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ErrorCode.java index a000f436e1..c5f76e0d59 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ErrorCode.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ErrorCode.java @@ -63,6 +63,7 @@ public enum ErrorCode { INVALID_QUERY(BAD_REQUEST), IMPLEMENTATION_ABSENT(NOT_FOUND), MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST), + CONSUMER_GROUP_DELETION_ERROR(INTERNAL_SERVER_ERROR), SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE); private final int httpCode; diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Subscription.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Subscription.java index 359206d487..ac9a863d2a 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Subscription.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/Subscription.java @@ -531,6 +531,14 @@ public String toString() { return "Subscription(" + getQualifiedName() + ")"; } + public static SubscriptionName getSubscriptionNameFromString(String input) { + if (!input.startsWith("Subscription(") || !input.endsWith(")")) { + throw new IllegalArgumentException("Invalid input: " + input); + } + return SubscriptionName.fromString( + input.replaceFirst("^Subscription\\(", "").replaceFirst("\\)$", "")); + } + public enum State { PENDING, ACTIVE, diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionTerminationCleanupEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionTerminationCleanupEndpoint.java new file mode 100644 index 0000000000..7384cc3a2c --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionTerminationCleanupEndpoint.java @@ -0,0 +1,40 @@ +package pl.allegro.tech.hermes.management.api; + +import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; +import static pl.allegro.tech.hermes.management.infrastructure.audit.AuditEventType.REMOVED; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Response; +import org.springframework.beans.factory.annotation.Autowired; +import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService; +import pl.allegro.tech.hermes.management.infrastructure.audit.AuditEvent; + +@Path("subscriptions/termination-cleanup") +public class SubscriptionTerminationCleanupEndpoint { + + private final SubscriptionService subscriptionService; + + @Autowired + public SubscriptionTerminationCleanupEndpoint(SubscriptionService subscriptionService) { + this.subscriptionService = subscriptionService; + } + + @POST + @Consumes(APPLICATION_JSON) + @Path("/from-audit-event") + public Response subscriptionTerminationCleanup(AuditEvent auditEvent) { + if (auditEvent.getEventType() != REMOVED + || !auditEvent.getPayloadClass().equals(Subscription.class.getName())) { + return Response.ok().build(); + } + + SubscriptionName subscriptionName = + Subscription.getSubscriptionNameFromString(auditEvent.getResourceName()); + subscriptionService.subscriptionTerminationCleanup(subscriptionName); + return Response.ok().build(); + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SubscriptionHealthConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SubscriptionHealthConfiguration.java index 7918c413b3..da6d284312 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SubscriptionHealthConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/SubscriptionHealthConfiguration.java @@ -121,13 +121,10 @@ public SubscriptionService subscriptionService( @Bean public SubscriptionRemover subscriptionRemover( Auditor auditor, - MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor, + MultiDCAwareService multiDCAwareService, SubscriptionOwnerCache subscriptionOwnerCache, SubscriptionRepository subscriptionRepository) { return new SubscriptionRemover( - auditor, - multiDatacenterRepositoryCommandExecutor, - subscriptionOwnerCache, - subscriptionRepository); + auditor, multiDCAwareService, subscriptionOwnerCache, subscriptionRepository); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java index a09e205432..20e7104e33 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java @@ -112,7 +112,8 @@ MultiDCAwareService multiDCAwareService( new OffsetsAvailableChecker(consumerPool, storage), new LogEndOffsetChecker(consumerPool), brokerAdminClient, - createConsumerGroupManager(kafkaProperties, kafkaNamesMapper), + createConsumerGroupManager( + kafkaProperties, kafkaNamesMapper, brokerAdminClient), createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper)); }) .collect(toList()); @@ -126,13 +127,16 @@ MultiDCAwareService multiDCAwareService( } private ConsumerGroupManager createConsumerGroupManager( - KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) { + KafkaProperties kafkaProperties, + KafkaNamesMapper kafkaNamesMapper, + AdminClient kafkaAdminClient) { return subscriptionProperties.isCreateConsumerGroupManuallyEnabled() ? new KafkaConsumerGroupManager( kafkaNamesMapper, kafkaProperties.getQualifiedClusterName(), kafkaProperties.getBrokerList(), - kafkaProperties) + kafkaProperties, + kafkaAdminClient) : new NoOpConsumerGroupManager(); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/ConsumerGroupManager.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/ConsumerGroupManager.java index 9a8a99a1d3..0b6c505b49 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/ConsumerGroupManager.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/ConsumerGroupManager.java @@ -1,9 +1,12 @@ package pl.allegro.tech.hermes.management.domain.subscription; import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; public interface ConsumerGroupManager { void createConsumerGroup(Topic topic, Subscription subscription); + + void deleteConsumerGroup(SubscriptionName subscriptionName); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java index b83b57a72f..44d3e909c1 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionRemover.java @@ -4,30 +4,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository; import pl.allegro.tech.hermes.domain.topic.TopicNotEmptyException; import pl.allegro.tech.hermes.management.domain.Auditor; import pl.allegro.tech.hermes.management.domain.auth.RequestUser; -import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; -import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService; public class SubscriptionRemover { private static final Logger logger = LoggerFactory.getLogger(SubscriptionRemover.class); private final Auditor auditor; - private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor; + private final MultiDCAwareService multiDCAwareService; private final SubscriptionOwnerCache subscriptionOwnerCache; private final SubscriptionRepository subscriptionRepository; public SubscriptionRemover( Auditor auditor, - MultiDatacenterRepositoryCommandExecutor multiDcExecutor, + MultiDCAwareService multiDCAwareService, SubscriptionOwnerCache subscriptionOwnerCache, SubscriptionRepository subscriptionRepository) { this.auditor = auditor; - this.multiDcExecutor = multiDcExecutor; + this.multiDCAwareService = multiDCAwareService; this.subscriptionOwnerCache = subscriptionOwnerCache; this.subscriptionRepository = subscriptionRepository; } @@ -38,8 +38,7 @@ public void removeSubscription( removedBy.getUsername(), Subscription.class.getSimpleName(), subscriptionName); Subscription subscription = subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName); - multiDcExecutor.executeByUser( - new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy); + multiDCAwareService.removeSubscription(topicName, subscriptionName, removedBy); auditor.objectRemoved(removedBy.getUsername(), subscription); subscriptionOwnerCache.onRemovedSubscription(subscriptionName, topicName); } @@ -57,6 +56,10 @@ public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy) System.currentTimeMillis() - start); } + public void cleanupAfterSubscription(SubscriptionName subscriptionName) { + multiDCAwareService.deleteConsumerGroups(subscriptionName); + } + private void ensureSubscriptionsHaveAutoRemove( List subscriptions, TopicName topicName) { boolean anySubscriptionWithoutAutoRemove = diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java index c8d088e598..e43c976f50 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java @@ -194,6 +194,18 @@ public void removeSubscription( subscriptionRemover.removeSubscription(topicName, subscriptionName, removedBy); } + public void subscriptionTerminationCleanup(SubscriptionName subscriptionName) { + if (subscriptionRepository.subscriptionExists( + subscriptionName.getTopicName(), subscriptionName.getName())) { + logger.info( + "Aborting subscription termination cleanup for subscription {} as it still exists.", + subscriptionName); + return; + } + + subscriptionRemover.cleanupAfterSubscription(subscriptionName); + } + public void updateSubscription( TopicName topicName, String subscriptionName, PatchData patch, RequestUser modifiedBy) { auditor.beforeObjectUpdate( diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ConsumerGroupDeletionException.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ConsumerGroupDeletionException.java new file mode 100644 index 0000000000..c91967d456 --- /dev/null +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/ConsumerGroupDeletionException.java @@ -0,0 +1,20 @@ +package pl.allegro.tech.hermes.management.infrastructure.kafka; + +import static java.lang.String.format; +import static pl.allegro.tech.hermes.api.ErrorCode.CONSUMER_GROUP_DELETION_ERROR; + +import pl.allegro.tech.hermes.api.ErrorCode; +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.management.domain.ManagementException; + +public class ConsumerGroupDeletionException extends ManagementException { + + public ConsumerGroupDeletionException(SubscriptionName subscriptionName, Throwable e) { + super(format("Failed to delete consumer group, for subscription %s ", subscriptionName), e); + } + + @Override + public ErrorCode getCode() { + return CONSUMER_GROUP_DELETION_ERROR; + } +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java index 17729e6d21..2383abc722 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java @@ -18,10 +18,12 @@ import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; import pl.allegro.tech.hermes.management.domain.auth.RequestUser; import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; +import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand; import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement; import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; @@ -113,10 +115,20 @@ public void removeTopicByName(String topicName) { clusters.forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName)); } + public void removeSubscription( + TopicName topicName, String subscriptionName, RequestUser removedBy) { + multiDcExecutor.executeByUser( + new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy); + } + public void createConsumerGroups(Topic topic, Subscription subscription) { clusters.forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription)); } + public void deleteConsumerGroups(SubscriptionName subscriptionName) { + clusters.forEach(clusterService -> clusterService.deleteConsumerGroup(subscriptionName)); + } + private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) { Instant abortAttemptsInstant = clock.instant().plus(offsetsMovedTimeout); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java index 2dcf3103f2..dda2cef147 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java @@ -147,6 +147,10 @@ public void createConsumerGroup(Topic topic, Subscription subscription) { consumerGroupManager.createConsumerGroup(topic, subscription); } + public void deleteConsumerGroup(SubscriptionName subscriptionName) { + consumerGroupManager.deleteConsumerGroup(subscriptionName); + } + public Optional describeConsumerGroup(Topic topic, String subscriptionName) { return consumerGroupsDescriber.describeConsumerGroup(topic, subscriptionName); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.java index 8957d84e08..0527fb3b08 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManager.java @@ -3,20 +3,27 @@ import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; +import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId; import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper; import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties; import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager; +import pl.allegro.tech.hermes.management.infrastructure.kafka.ConsumerGroupDeletionException; public class KafkaConsumerGroupManager implements ConsumerGroupManager { @@ -25,15 +32,18 @@ public class KafkaConsumerGroupManager implements ConsumerGroupManager { private final KafkaNamesMapper kafkaNamesMapper; private final String clusterName; private final KafkaConsumerManager consumerManager; + private final AdminClient kafkaAdminClient; public KafkaConsumerGroupManager( KafkaNamesMapper kafkaNamesMapper, String clusterName, String brokerList, - KafkaProperties kafkaProperties) { + KafkaProperties kafkaProperties, + AdminClient kafkaAdminClient) { this.kafkaNamesMapper = kafkaNamesMapper; this.clusterName = clusterName; this.consumerManager = new KafkaConsumerManager(kafkaProperties, kafkaNamesMapper, brokerList); + this.kafkaAdminClient = kafkaAdminClient; } @Override @@ -80,4 +90,40 @@ public void createConsumerGroup(Topic topic, Subscription subscription) { e); } } + + @Override + public void deleteConsumerGroup(SubscriptionName subscriptionName) + throws ConsumerGroupDeletionException { + logger.info( + "Deleting consumer group for subscription {}, cluster: {}", subscriptionName, clusterName); + + try { + ConsumerGroupId groupId = kafkaNamesMapper.toConsumerGroupId(subscriptionName); + kafkaAdminClient + .deleteConsumerGroups(Collections.singletonList(groupId.asString())) + .all() + .get(); + + logger.info( + "Successfully deleted consumer group for subscription {}, cluster: {}", + subscriptionName, + clusterName); + + } catch (ExecutionException | InterruptedException e) { + if (e.getCause() instanceof GroupIdNotFoundException) { + logger.info( + "Consumer group for subscription {} not found, cluster: {}", + subscriptionName, + clusterName); + return; + } + + logger.error( + "Failed to delete consumer group for subscription {}, cluster: {}", + subscriptionName, + clusterName, + e); + throw new ConsumerGroupDeletionException(subscriptionName, e); + } + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/NoOpConsumerGroupManager.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/NoOpConsumerGroupManager.java index 9de89e9c81..c9084e7b04 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/NoOpConsumerGroupManager.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/NoOpConsumerGroupManager.java @@ -1,6 +1,7 @@ package pl.allegro.tech.hermes.management.infrastructure.kafka.service; import pl.allegro.tech.hermes.api.Subscription; +import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager; @@ -9,4 +10,9 @@ public class NoOpConsumerGroupManager implements ConsumerGroupManager { public void createConsumerGroup(Topic topic, Subscription subscription) { // no operation } + + @Override + public void deleteConsumerGroup(SubscriptionName subscriptionName) { + // no operation + } } diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManagerSpec.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManagerSpec.groovy index b43834d8ad..7bca887ce8 100644 --- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManagerSpec.groovy +++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/kafka/service/KafkaConsumerGroupManagerSpec.groovy @@ -69,7 +69,7 @@ class KafkaConsumerGroupManagerSpec extends Specification { } def setup() { - consumerGroupManager = new KafkaConsumerGroupManager(kafkaNamesMapper, "primary", kafkaContainer.bootstrapServers, new KafkaProperties()) + consumerGroupManager = new KafkaConsumerGroupManager(kafkaNamesMapper, "primary", kafkaContainer.bootstrapServers, new KafkaProperties(), adminClient) } def "should create consumer group with offset equal to last topic offset"() { @@ -100,6 +100,9 @@ class KafkaConsumerGroupManagerSpec extends Specification { and: output.toString().contains 'Creating consumer group for subscription pl.allegro.test.Foo$test-subscription, cluster: primary' output.toString().contains 'Successfully created consumer group for subscription pl.allegro.test.Foo$test-subscription, cluster: primary' + + cleanup: + deleteKafkaTopic(kafkaTopicName) } def "should override existing consumer group using offsets from the old consumer group"() { @@ -131,6 +134,9 @@ class KafkaConsumerGroupManagerSpec extends Specification { topicPartitionOffsets.get(new TopicPartition(kafkaTopicName, 0)).offset() == 10 topicPartitionOffsets.get(new TopicPartition(kafkaTopicName, 1)).offset() == 20 topicPartitionOffsets.get(new TopicPartition(kafkaTopicName, 2)).offset() == 15 + + cleanup: + deleteKafkaTopic(kafkaTopicName) } def "should not create consumer group and log exception in case of request timeout"() { @@ -151,6 +157,68 @@ class KafkaConsumerGroupManagerSpec extends Specification { kafkaContainer.dockerClient.unpauseContainerCmd(containerId).exec() } + def "Should delete specified consumer group and retain others"() { + given: + Topic topic = createAvroTopic("pl.allegro.test.DeletionOfConsumerGroup") + String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).primary.name().asString() + createTopicInKafka(kafkaTopicName, 3) + + and: + Subscription subscription = createTestSubscription(topic, "test-subscription") + ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription.qualifiedName) + consumerGroupManager.createConsumerGroup(topic, subscription) + + and: + Subscription subscriptionToDelete = createTestSubscription(topic, "test-subscription-to-delete") + consumerGroupManager.createConsumerGroup(topic, subscriptionToDelete) + + when: + consumerGroupManager.deleteConsumerGroup(subscriptionToDelete.getQualifiedName()) + + then: + adminClient.listConsumerGroups().all().get().collect { it.groupId() } == [consumerGroupId.asString()] + + cleanup: + consumerGroupManager.deleteConsumerGroup(subscription.getQualifiedName()) + deleteKafkaTopic(kafkaTopicName) + } + + def "Should handle deletion attempt for non-existing consumer group gracefully"() { + given: + Topic topic = createAvroTopic("pl.allegro.test.DeletionOfNonExistingConsumerGroup") + String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).primary.name().asString() + createTopicInKafka(kafkaTopicName, 3) + + and: + Subscription subscription = createTestSubscription(topic, "test-subscription") + ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription.qualifiedName) + consumerGroupManager.createConsumerGroup(topic, subscription) + + when: + consumerGroupManager.deleteConsumerGroup(createTestSubscription(topic, "test-subscription-to-delete").getQualifiedName()) + + then: + noExceptionThrown() + + and: + adminClient.listConsumerGroups().all().get().collect { it.groupId() } == [consumerGroupId.asString()] + + cleanup: + deleteKafkaTopic(kafkaTopicName) + } + + def "Should handle deletion attempt for non-existing topic gracefully"() { + given: + Topic topic = createAvroTopic("pl.allegro.test.Foo") + Subscription subscription = createTestSubscription(topic, "test-subscription-to-delete") + + when: + consumerGroupManager.deleteConsumerGroup(subscription.getQualifiedName()) + + then: + noExceptionThrown() + } + private def publishOnPartition(String kafkaTopicName, int partition, int messages) { CountDownLatch countDownLatch = new CountDownLatch(messages) for (int i = 0; i < messages; i++) { @@ -189,4 +257,8 @@ class KafkaConsumerGroupManagerSpec extends Specification { .collect { it.partition() } .containsAll(0..(partitionsNumber - 1)) } + + private def deleteKafkaTopic(String kafkaTopicName) { + adminClient.deleteTopics([kafkaTopicName]).all().get() + } } diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/containers/KafkaContainerCluster.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/containers/KafkaContainerCluster.java index 0af027a1ff..812985e550 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/containers/KafkaContainerCluster.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/containers/KafkaContainerCluster.java @@ -208,6 +208,17 @@ private int countPartitions(String option) throws IOException, InterruptedExcept return Integer.parseInt(sanitizedOutput); } + public int countConsumerGroups() throws java.io.IOException, InterruptedException { + KafkaContainer firstBroker = selectFirstRunningBroker(); + ExecResult result = + firstBroker.execInContainer( + "sh", + "-c", + "kafka-consumer-groups --bootstrap-server localhost:9092 --list | wc -l"); + String sanitizedOutput = result.getStdout().replaceAll("\"", "").replaceAll("\\s+", ""); + return Integer.parseInt(sanitizedOutput); + } + private KafkaContainer selectFirstRunningBroker() { return brokers.stream() .filter(KafkaContainer::isKafkaRunning) diff --git a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/setup/HermesExtension.java b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/setup/HermesExtension.java index 6f008480bd..0332f9b17d 100644 --- a/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/setup/HermesExtension.java +++ b/integration-tests/src/common/java/pl/allegro/tech/hermes/integrationtests/setup/HermesExtension.java @@ -106,6 +106,16 @@ public HermesInitHelper initHelper() { return hermesInitHelper; } + public int countConsumerGroups() { + try { + return kafka.countConsumerGroups(); + } catch (java.io.IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + public void cutOffConnectionsBetweenBrokersAndClients() { kafka.cutOffConnectionsBetweenBrokersAndClients(); } diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java index df422b6c2f..b0c65ad9f4 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/TopicManagementTest.java @@ -1,6 +1,7 @@ package pl.allegro.tech.hermes.integrationtests.management; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.waitAtMost; import static pl.allegro.tech.hermes.api.ContentType.AVRO; import static pl.allegro.tech.hermes.api.ContentType.JSON; @@ -186,6 +187,9 @@ public void shouldRemoveTopicWithRelatedSubscriptionsWhenAutoRemoveEnabled() { .withAutoDeleteWithTopicEnabled(true) .build()); + // then + assertThat(hermes.countConsumerGroups()).isEqualTo(1); + // when WebTestClient.ResponseSpec response = hermes.api().deleteTopic(topic.getQualifiedName()); @@ -198,6 +202,11 @@ public void shouldRemoveTopicWithRelatedSubscriptionsWhenAutoRemoveEnabled() { .getSubscriptionResponse(topic.getQualifiedName(), subscription.getName()) .expectStatus() .isBadRequest(); + + // and + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> + assertThat(hermes.countConsumerGroups()).isEqualTo(0) + ); } @Test