From 79c2f629535b2e4008ed63140f6c37990eef91bf Mon Sep 17 00:00:00 2001 From: mmaxiaolei <412171946@qq.com> Date: Fri, 7 Jan 2022 21:09:11 +0800 Subject: [PATCH 1/3] fix: sink close blocking --- .../pulsar/FlinkPulsarSinkBase.java | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index acede80b..0bd4e157 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -14,11 +14,13 @@ package org.apache.flink.streaming.connectors.pulsar; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -28,10 +30,6 @@ import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializableObject; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; - -import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -44,6 +42,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -74,10 +73,14 @@ abstract class FlinkPulsarSinkBase extends RichSinkFunction implements Che protected boolean failOnWrite; - /** Lock for accessing the pending records. */ + /** + * Lock for accessing the pending records. + */ protected final SerializableObject pendingRecordsLock = new SerializableObject(); - /** Number of unacknowledged records. */ + /** + * Number of unacknowledged records. + */ protected long pendingRecords = 0L; protected final boolean forcedTopic; @@ -86,7 +89,7 @@ abstract class FlinkPulsarSinkBase extends RichSinkFunction implements Che protected final TopicKeyExtractor topicKeyExtractor; - protected transient volatile Throwable failedWrite; + private final transient AtomicReference failedWrite = new AtomicReference<>(); protected transient PulsarAdmin admin; @@ -155,12 +158,6 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { if (flushOnCheckpoint) { producerFlush(); - synchronized (pendingRecordsLock) { - if (pendingRecords != 0) { - throw new IllegalStateException("Pending record count must be zero at this point " + pendingRecords); - } - checkErroneous(); - } } } @@ -194,22 +191,25 @@ protected void initializeSendCallback() { if (failOnWrite) { this.sendCallback = (t, u) -> { - if (failedWrite == null && u == null) { - acknowledgeMessage(); - } else if (failedWrite == null && u != null) { - failedWrite = u; - } else { // failedWrite != null + if (failedWrite.get() != null) { // do nothing and wait next checkForError to throw exception + return; } - }; - } else { - this.sendCallback = (t, u) -> { - if (failedWrite == null && u != null) { - log.error("Error while sending message to Pulsar: {}", ExceptionUtils.stringifyException(u)); + if (u == null) { + acknowledgeMessage(); + return; } - acknowledgeMessage(); + failedWrite.compareAndSet(null, u); }; + return; } + + this.sendCallback = (t, u) -> { + if (u != null) { + log.error("Error while sending message to Pulsar: {}", ExceptionUtils.stringifyException(u)); + } + acknowledgeMessage(); + }; } private void uploadSchema(String topic) { @@ -273,7 +273,8 @@ public void producerFlush() throws Exception { synchronized (pendingRecordsLock) { while (pendingRecords > 0) { try { - pendingRecordsLock.wait(); + pendingRecordsLock.wait(100); + checkErroneous(); } catch (InterruptedException e) { // this can be interrupted when the Task has been cancelled. // by throwing an exception, we ensure that this checkpoint doesn't get confirmed @@ -284,7 +285,6 @@ public void producerFlush() throws Exception { } protected void producerClose() throws Exception { - producerFlush(); if (admin != null) { admin.close(); } @@ -301,11 +301,10 @@ protected void producerClose() throws Exception { } protected void checkErroneous() throws Exception { - Throwable e = failedWrite; - if (e != null) { - // prevent double throwing - failedWrite = null; - throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e); + Throwable t = failedWrite.get(); + if (t != null) { + log.error("Failed to send data to Pulsar: " + t.getMessage(), t); + throw new Exception("Failed to send data to Pulsar: " + t.getMessage(), t); } } From e729c7dcbffe80ab517f810f3f6792ac9f207c5f Mon Sep 17 00:00:00 2001 From: mmaxiaolei <412171946@qq.com> Date: Sat, 8 Jan 2022 20:41:22 +0800 Subject: [PATCH 2/3] chore: import fmt --- .../streaming/connectors/pulsar/FlinkPulsarSinkBase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index 0bd4e157..98820b94 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -14,13 +14,11 @@ package org.apache.flink.streaming.connectors.pulsar; -import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -30,6 +28,10 @@ import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializableObject; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; From 62e57cc5c32fcf7c971e67c5956f35af5332c64c Mon Sep 17 00:00:00 2001 From: mmaxiaolei <412171946@qq.com> Date: Fri, 21 Jan 2022 20:42:14 +0800 Subject: [PATCH 3/3] fix npe --- .../flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index 98820b94..485ed28a 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -91,7 +91,7 @@ abstract class FlinkPulsarSinkBase extends RichSinkFunction implements Che protected final TopicKeyExtractor topicKeyExtractor; - private final transient AtomicReference failedWrite = new AtomicReference<>(); + protected final AtomicReference failedWrite = new AtomicReference<>(); protected transient PulsarAdmin admin;