Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

fix: sink close blocking #499

Open
wants to merge 3 commits into
base: release-1.9
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -74,10 +75,14 @@ abstract class FlinkPulsarSinkBase<T> extends RichSinkFunction<T> implements Che

protected boolean failOnWrite;

/** Lock for accessing the pending records. */
/**
* Lock for accessing the pending records.
*/
protected final SerializableObject pendingRecordsLock = new SerializableObject();

/** Number of unacknowledged records. */
/**
* Number of unacknowledged records.
*/
protected long pendingRecords = 0L;

protected final boolean forcedTopic;
Expand All @@ -86,7 +91,7 @@ abstract class FlinkPulsarSinkBase<T> extends RichSinkFunction<T> implements Che

protected final TopicKeyExtractor<T> topicKeyExtractor;

protected transient volatile Throwable failedWrite;
protected final AtomicReference<Throwable> failedWrite = new AtomicReference<>();

protected transient PulsarAdmin admin;

Expand Down Expand Up @@ -155,12 +160,6 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {

if (flushOnCheckpoint) {
producerFlush();
synchronized (pendingRecordsLock) {
if (pendingRecords != 0) {
throw new IllegalStateException("Pending record count must be zero at this point " + pendingRecords);
}
checkErroneous();
}
}
}

Expand Down Expand Up @@ -194,22 +193,25 @@ protected void initializeSendCallback() {

if (failOnWrite) {
this.sendCallback = (t, u) -> {
if (failedWrite == null && u == null) {
acknowledgeMessage();
} else if (failedWrite == null && u != null) {
failedWrite = u;
} else { // failedWrite != null
if (failedWrite.get() != null) {
// do nothing and wait next checkForError to throw exception
return;
}
};
} else {
this.sendCallback = (t, u) -> {
if (failedWrite == null && u != null) {
log.error("Error while sending message to Pulsar: {}", ExceptionUtils.stringifyException(u));
if (u == null) {
acknowledgeMessage();
return;
}
acknowledgeMessage();
failedWrite.compareAndSet(null, u);
};
return;
}

this.sendCallback = (t, u) -> {
if (u != null) {
log.error("Error while sending message to Pulsar: {}", ExceptionUtils.stringifyException(u));
}
acknowledgeMessage();
};
}

private void uploadSchema(String topic) {
Expand Down Expand Up @@ -273,7 +275,8 @@ public void producerFlush() throws Exception {
synchronized (pendingRecordsLock) {
while (pendingRecords > 0) {
try {
pendingRecordsLock.wait();
pendingRecordsLock.wait(100);
checkErroneous();
} catch (InterruptedException e) {
// this can be interrupted when the Task has been cancelled.
// by throwing an exception, we ensure that this checkpoint doesn't get confirmed
Expand All @@ -284,7 +287,6 @@ public void producerFlush() throws Exception {
}

protected void producerClose() throws Exception {
producerFlush();
if (admin != null) {
admin.close();
}
Expand All @@ -301,11 +303,10 @@ protected void producerClose() throws Exception {
}

protected void checkErroneous() throws Exception {
Throwable e = failedWrite;
if (e != null) {
// prevent double throwing
failedWrite = null;
throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
Throwable t = failedWrite.get();
if (t != null) {
log.error("Failed to send data to Pulsar: " + t.getMessage(), t);
throw new Exception("Failed to send data to Pulsar: " + t.getMessage(), t);
}
}

Expand Down