Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
enable pulsar topic offset auto commit
Browse files Browse the repository at this point in the history
  • Loading branch information
zouyunhe committed Jan 7, 2022
1 parent f79d868 commit f15bab0
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscription;
import org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer;
import org.apache.flink.streaming.connectors.pulsar.serialization.PulsarDeserializationSchema;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TimeUtils;

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;

Expand All @@ -73,6 +75,7 @@
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -85,6 +88,9 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -238,6 +244,12 @@ public class FlinkPulsarSource<T> extends RichParallelSourceFunction<T>

private long startupOffsetsTimestamp = -1L;

private boolean enableOffsetAutoCommit;

private Duration offsetAutoCommitInterval;

private ScheduledExecutorService offsetCommitScheduler;

public FlinkPulsarSource(
String adminUrl,
ClientConfigurationData clientConf,
Expand All @@ -264,6 +276,22 @@ public FlinkPulsarSource(
}
this.oldStateVersion =
SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion);

this.enableOffsetAutoCommit =
Boolean.parseBoolean(
properties
.getOrDefault(
PulsarTableOptions.ENABLE_OFFSET_AUTO_COMMIT.key(), "true")
.toString());
if (enableOffsetAutoCommit) {
this.offsetAutoCommitInterval =
TimeUtils.parseDuration(
properties
.getOrDefault(
PulsarTableOptions.OFFSET_AUTO_COMMIT_INTERVAL.key(),
"60 s")
.toString());
}
}

public FlinkPulsarSource(
Expand Down Expand Up @@ -551,6 +579,27 @@ public void open(Configuration parameters) throws Exception {
ownedTopicStarts);
}
}

if (!((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()
&& enableOffsetAutoCommit) {
this.offsetCommitScheduler = Executors.newScheduledThreadPool(1);
this.offsetCommitScheduler.scheduleAtFixedRate(
() -> {
if (pulsarFetcher != null) {
Map<TopicRange, MessageId> consumedOffsets =
pulsarFetcher.snapshotCurrentState();
try {
pulsarFetcher.commitOffsetToPulsar(
consumedOffsets, offsetCommitCallback);
} catch (InterruptedException e) {

}
}
},
0,
offsetAutoCommitInterval != null ? offsetAutoCommitInterval.getSeconds() : 60,
TimeUnit.SECONDS);
}
}

protected String getSubscriptionName() {
Expand Down Expand Up @@ -742,6 +791,10 @@ public void close() throws Exception {
}
}

if (offsetCommitScheduler != null) {
offsetCommitScheduler.shutdown();
}

try {
super.close();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.impl.MessageIdImpl;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -245,6 +246,18 @@ public class PulsarTableOptions {
.defaultValue(Collections.emptyMap())
.withDescription("Optional pulsar config.");

public static final ConfigOption<Boolean> ENABLE_OFFSET_AUTO_COMMIT =
ConfigOptions.key("enable.auto.commit")
.booleanType()
.defaultValue(true)
.withDescription("enable offset auto commit by pulsar connector");

public static final ConfigOption<Duration> OFFSET_AUTO_COMMIT_INTERVAL =
ConfigOptions.key("offset.auto.commit.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription("offset auto commit interval for pulsar source.");

// --------------------------------------------------------------------------------------------
// Option enumerations
// --------------------------------------------------------------------------------------------
Expand Down

0 comments on commit f15bab0

Please sign in to comment.