Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Kafka consumer group on Hermes subscription deletion #1921

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public enum ErrorCode {
INVALID_QUERY(BAD_REQUEST),
IMPLEMENTATION_ABSENT(NOT_FOUND),
MOVING_SUBSCRIPTION_OFFSETS_VALIDATION_ERROR(BAD_REQUEST),
CONSUMER_GROUP_DELETION_ERROR(INTERNAL_SERVER_ERROR),
SENDING_TO_KAFKA_TIMEOUT(SERVICE_UNAVAILABLE);

private final int httpCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ public String toString() {
return "Subscription(" + getQualifiedName() + ")";
}

public static SubscriptionName getSubscriptionNameFromString(String input) {
if (!input.startsWith("Subscription(") || !input.endsWith(")")) {
throw new IllegalArgumentException("Invalid input: " + input);
}
return SubscriptionName.fromString(
input.replaceFirst("^Subscription\\(", "").replaceFirst("\\)$", ""));
}

public enum State {
PENDING,
ACTIVE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pl.allegro.tech.hermes.management.api;

import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
import static pl.allegro.tech.hermes.management.infrastructure.audit.AuditEventType.REMOVED;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import org.springframework.beans.factory.annotation.Autowired;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionService;
import pl.allegro.tech.hermes.management.infrastructure.audit.AuditEvent;

@Path("subscriptions/termination-cleanup")
public class SubscriptionTerminationCleanupEndpoint {

private final SubscriptionService subscriptionService;

@Autowired
public SubscriptionTerminationCleanupEndpoint(SubscriptionService subscriptionService) {
this.subscriptionService = subscriptionService;
}

@POST
@Consumes(APPLICATION_JSON)
@Path("/from-audit-event")
public Response subscriptionTerminationCleanup(AuditEvent auditEvent) {
if (auditEvent.getEventType() != REMOVED
|| !auditEvent.getPayloadClass().equals(Subscription.class.getName())) {
return Response.ok().build();
}

SubscriptionName subscriptionName =
Subscription.getSubscriptionNameFromString(auditEvent.getResourceName());
subscriptionService.subscriptionTerminationCleanup(subscriptionName);
return Response.ok().build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,10 @@ public SubscriptionService subscriptionService(
@Bean
public SubscriptionRemover subscriptionRemover(
Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDatacenterRepositoryCommandExecutor,
MultiDCAwareService multiDCAwareService,
SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
return new SubscriptionRemover(
auditor,
multiDatacenterRepositoryCommandExecutor,
subscriptionOwnerCache,
subscriptionRepository);
auditor, multiDCAwareService, subscriptionOwnerCache, subscriptionRepository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ MultiDCAwareService multiDCAwareService(
new OffsetsAvailableChecker(consumerPool, storage),
new LogEndOffsetChecker(consumerPool),
brokerAdminClient,
createConsumerGroupManager(kafkaProperties, kafkaNamesMapper),
createConsumerGroupManager(
kafkaProperties, kafkaNamesMapper, brokerAdminClient),
createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
})
.collect(toList());
Expand All @@ -126,13 +127,16 @@ MultiDCAwareService multiDCAwareService(
}

private ConsumerGroupManager createConsumerGroupManager(
KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) {
KafkaProperties kafkaProperties,
KafkaNamesMapper kafkaNamesMapper,
AdminClient kafkaAdminClient) {
return subscriptionProperties.isCreateConsumerGroupManuallyEnabled()
? new KafkaConsumerGroupManager(
kafkaNamesMapper,
kafkaProperties.getQualifiedClusterName(),
kafkaProperties.getBrokerList(),
kafkaProperties)
kafkaProperties,
kafkaAdminClient)
: new NoOpConsumerGroupManager();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package pl.allegro.tech.hermes.management.domain.subscription;

import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;

public interface ConsumerGroupManager {

void createConsumerGroup(Topic topic, Subscription subscription);

void deleteConsumerGroup(SubscriptionName subscriptionName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.domain.subscription.SubscriptionRepository;
import pl.allegro.tech.hermes.domain.topic.TopicNotEmptyException;
import pl.allegro.tech.hermes.management.domain.Auditor;
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;

public class SubscriptionRemover {

private static final Logger logger = LoggerFactory.getLogger(SubscriptionRemover.class);
private final Auditor auditor;
private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor;
private final MultiDCAwareService multiDCAwareService;
private final SubscriptionOwnerCache subscriptionOwnerCache;
private final SubscriptionRepository subscriptionRepository;

public SubscriptionRemover(
Auditor auditor,
MultiDatacenterRepositoryCommandExecutor multiDcExecutor,
MultiDCAwareService multiDCAwareService,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really scope of this PR but MultiDCAwareService vs MultiDatacenterRepositoryCommandExecutor is a mess. What is the difference between these two? MultiDCAwareService even uses MultiDatacenterRepositoryCommandExecutor so using both of them in the same class is confusing

SubscriptionOwnerCache subscriptionOwnerCache,
SubscriptionRepository subscriptionRepository) {
this.auditor = auditor;
this.multiDcExecutor = multiDcExecutor;
this.multiDCAwareService = multiDCAwareService;
this.subscriptionOwnerCache = subscriptionOwnerCache;
this.subscriptionRepository = subscriptionRepository;
}
Expand All @@ -38,8 +38,7 @@ public void removeSubscription(
removedBy.getUsername(), Subscription.class.getSimpleName(), subscriptionName);
Subscription subscription =
subscriptionRepository.getSubscriptionDetails(topicName, subscriptionName);
multiDcExecutor.executeByUser(
new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy);
multiDCAwareService.removeSubscription(topicName, subscriptionName, removedBy);
auditor.objectRemoved(removedBy.getUsername(), subscription);
subscriptionOwnerCache.onRemovedSubscription(subscriptionName, topicName);
}
Expand All @@ -57,6 +56,10 @@ public void removeSubscriptionRelatedToTopic(Topic topic, RequestUser removedBy)
System.currentTimeMillis() - start);
}

public void cleanupAfterSubscription(SubscriptionName subscriptionName) {
multiDCAwareService.deleteConsumerGroups(subscriptionName);
}

private void ensureSubscriptionsHaveAutoRemove(
List<Subscription> subscriptions, TopicName topicName) {
boolean anySubscriptionWithoutAutoRemove =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ public void removeSubscription(
subscriptionRemover.removeSubscription(topicName, subscriptionName, removedBy);
}

public void subscriptionTerminationCleanup(SubscriptionName subscriptionName) {
if (subscriptionRepository.subscriptionExists(
subscriptionName.getTopicName(), subscriptionName.getName())) {
logger.info(
"Aborting subscription termination cleanup for subscription {} as it still exists.",
subscriptionName);
return;
}

subscriptionRemover.cleanupAfterSubscription(subscriptionName);
}

public void updateSubscription(
TopicName topicName, String subscriptionName, PatchData patch, RequestUser modifiedBy) {
auditor.beforeObjectUpdate(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pl.allegro.tech.hermes.management.infrastructure.kafka;

import static java.lang.String.format;
import static pl.allegro.tech.hermes.api.ErrorCode.CONSUMER_GROUP_DELETION_ERROR;

import pl.allegro.tech.hermes.api.ErrorCode;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.management.domain.ManagementException;

public class ConsumerGroupDeletionException extends ManagementException {

public ConsumerGroupDeletionException(SubscriptionName subscriptionName, Throwable e) {
super(format("Failed to delete consumer group, for subscription %s ", subscriptionName), e);
}

@Override
public ErrorCode getCode() {
return CONSUMER_GROUP_DELETION_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand;
import pl.allegro.tech.hermes.management.domain.subscription.commands.RemoveSubscriptionRepositoryCommand;
import pl.allegro.tech.hermes.management.domain.topic.BrokerTopicManagement;
import pl.allegro.tech.hermes.management.domain.topic.UnableToMoveOffsetsException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
Expand Down Expand Up @@ -113,10 +115,20 @@ public void removeTopicByName(String topicName) {
clusters.forEach(brokersClusterService -> brokersClusterService.removeTopicByName(topicName));
}

public void removeSubscription(
TopicName topicName, String subscriptionName, RequestUser removedBy) {
multiDcExecutor.executeByUser(
new RemoveSubscriptionRepositoryCommand(topicName, subscriptionName), removedBy);
}

public void createConsumerGroups(Topic topic, Subscription subscription) {
clusters.forEach(clusterService -> clusterService.createConsumerGroup(topic, subscription));
}

public void deleteConsumerGroups(SubscriptionName subscriptionName) {
clusters.forEach(clusterService -> clusterService.deleteConsumerGroup(subscriptionName));
}

private void waitUntilOffsetsAreMoved(Topic topic, String subscriptionName) {
Instant abortAttemptsInstant = clock.instant().plus(offsetsMovedTimeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
consumerGroupManager.createConsumerGroup(topic, subscription);
}

public void deleteConsumerGroup(SubscriptionName subscriptionName) {
consumerGroupManager.deleteConsumerGroup(subscriptionName);
}

public Optional<ConsumerGroup> describeConsumerGroup(Topic topic, String subscriptionName) {
return consumerGroupsDescriber.describeConsumerGroup(topic, subscriptionName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,27 @@
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.management.config.kafka.KafkaProperties;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.ConsumerGroupDeletionException;

public class KafkaConsumerGroupManager implements ConsumerGroupManager {

Expand All @@ -25,15 +32,18 @@ public class KafkaConsumerGroupManager implements ConsumerGroupManager {
private final KafkaNamesMapper kafkaNamesMapper;
private final String clusterName;
private final KafkaConsumerManager consumerManager;
private final AdminClient kafkaAdminClient;

public KafkaConsumerGroupManager(
KafkaNamesMapper kafkaNamesMapper,
String clusterName,
String brokerList,
KafkaProperties kafkaProperties) {
KafkaProperties kafkaProperties,
AdminClient kafkaAdminClient) {
this.kafkaNamesMapper = kafkaNamesMapper;
this.clusterName = clusterName;
this.consumerManager = new KafkaConsumerManager(kafkaProperties, kafkaNamesMapper, brokerList);
this.kafkaAdminClient = kafkaAdminClient;
}

@Override
Expand Down Expand Up @@ -80,4 +90,40 @@ public void createConsumerGroup(Topic topic, Subscription subscription) {
e);
}
}

@Override
public void deleteConsumerGroup(SubscriptionName subscriptionName)
throws ConsumerGroupDeletionException {
logger.info(
"Deleting consumer group for subscription {}, cluster: {}", subscriptionName, clusterName);

try {
ConsumerGroupId groupId = kafkaNamesMapper.toConsumerGroupId(subscriptionName);
kafkaAdminClient
.deleteConsumerGroups(Collections.singletonList(groupId.asString()))
.all()
.get();

logger.info(
"Successfully deleted consumer group for subscription {}, cluster: {}",
subscriptionName,
clusterName);

} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof GroupIdNotFoundException) {
logger.info(
"Consumer group for subscription {} not found, cluster: {}",
subscriptionName,
clusterName);
return;
}

logger.error(
"Failed to delete consumer group for subscription {}, cluster: {}",
subscriptionName,
clusterName,
e);
throw new ConsumerGroupDeletionException(subscriptionName, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.management.infrastructure.kafka.service;

import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.management.domain.subscription.ConsumerGroupManager;

Expand All @@ -9,4 +10,9 @@ public class NoOpConsumerGroupManager implements ConsumerGroupManager {
public void createConsumerGroup(Topic topic, Subscription subscription) {
// no operation
}

@Override
public void deleteConsumerGroup(SubscriptionName subscriptionName) {
// no operation
}
}
Loading
Loading