-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delete Kafka consumer group on Hermes subscription deletion #1921
base: master
Are you sure you want to change the base?
Conversation
@@ -80,4 +86,31 @@ public void createConsumerGroup(Topic topic, Subscription subscription) { | |||
e); | |||
} | |||
} | |||
|
|||
@Override | |||
public void deleteConsumerGroup(Topic topic, Subscription subscription) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a need to pass the whole Topic
and Subscription
as a parameters?
@@ -151,6 +157,68 @@ class KafkaConsumerGroupManagerSpec extends Specification { | |||
kafkaContainer.dockerClient.unpauseContainerCmd(containerId).exec() | |||
} | |||
|
|||
def "Should delete specified consumer group and retain others"() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Can we also add some integration test for that mechanism? I mean the one which is creating topic and subscription (with validation of creation of consumer-group), deleting it and then checking whether consumer group was deleted too. What do You think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Please add a short summary in the description on how consumer group removal worked before this change and why the changes was introduced.
@Override | ||
public void deleteConsumerGroup(Topic topic, Subscription subscription) { | ||
logger.info( | ||
"Deleting consumer group for subscription {}, cluster: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that this was already done in createConsumerGroup
but I would suggest we use DC instead of clusterName for logging purposes. It makes debugging easier
private final MultiDatacenterRepositoryCommandExecutor multiDcExecutor; | ||
private final SubscriptionOwnerCache subscriptionOwnerCache; | ||
private final SubscriptionRepository subscriptionRepository; | ||
|
||
public SubscriptionRemover( | ||
Auditor auditor, | ||
MultiDCAwareService multiDCAwareService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really scope of this PR but MultiDCAwareService
vs MultiDatacenterRepositoryCommandExecutor
is a mess. What is the difference between these two? MultiDCAwareService
even uses MultiDatacenterRepositoryCommandExecutor
so using both of them in the same class is confusing
…N-3271-KafkaConumerGroupDeletion
No description provided.