-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
resolve #1645 | stop subscription immediately even when there are pen… #1755
base: master
Are you sure you want to change the base?
Conversation
…are pending retries
@@ -116,6 +116,7 @@ private void process(Signal signal) { | |||
case STOP: | |||
logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType()); | |||
this.running = false; | |||
stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pkasperowicz are you sure we need this ? The stop()
method is actually called in a finally
block in the run()
method. The main try
block in the run()
method is stopped when running.flag
is set to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the stop signal is received, we need to inform the consumer to stop consuming events.
stop()
invokes consumer.tearDown()
and it sets BatchConsumer.consuming = false
.
It informs Retryer
to stop retrying messages. Without that we need to wait for all reties to be finished and it is blocking that way the main loop.
Alternative approach might be extending the API of BatchConsumer to receive stop signal and mark it as some boolean e.g. stopSignalReceived
. This flag would be informative for Retryer
to stop the process of retries.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I see it will work, can we avoid duplicated calling stop()
method ? For example we can write method like this:
private void stopWithExceptionHandling() {
if (!running) {
return;
}
try {
stop();
} catch (Exception exceptionWhileStopping) {
logger.error("An error occurred while stopping consumer process of subscription {}",
getSubscriptionName(), exceptionWhileStopping);
} finally {
onConsumerStopped.accept(getSubscriptionName());
Thread.currentThread().setName("consumer-released-thread");
}
}
``` and use it in both `finally` block of run method and in `process` method:
...
case STOP:
logger.info("Stopping main loop for consumer {}. {}", signal.getTarget(), signal.getLogWithIdAndType());
stopWithExceptionHandling();
this.running = false;
break;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@faderskd ok changed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pkasperowicz thank you, +1
…pite-of-pending-message-retries
…en stopping the subscription
…pite-of-pending-message-retries
…pite-of-pending-message-retries
…ding retries