Skip to content

Commit

Permalink
streamnative#386: Restored private API c'tors.
Browse files Browse the repository at this point in the history
Turned out builder pattern wasn't necessary here. There's one path now for the CryptoKeyReader (plus the encryption keys) and possibly others where it's null (and the keys are empty).
  • Loading branch information
objecttrouve committed Sep 15, 2021
1 parent 05e3b78 commit 50b9e9b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 397 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public Builder<T> withEncryptionKeys(String... encryptionKeys) {
return this;
}

private Optional<String> getDefaultTopicName() {
return Optional.ofNullable(defaultTopicName);
}

public FlinkPulsarSink<T> build(){
if (adminUrl == null) {
throw new IllegalStateException("Admin URL must be set.");
Expand Down Expand Up @@ -140,17 +144,7 @@ public FlinkPulsarSink<T> build(){
private final PulsarSerializationSchema<T> serializationSchema;

private FlinkPulsarSink(final Builder<T> builder) {
super(
new FlinkPulsarSinkBase.Config<T>()
.withAdminUrl(builder.adminUrl)
.withDefaultTopicName(builder.defaultTopicName)
.withClientConf(builder.clientConf)
.withProperties(builder.properties)
.withSerializationSchema(builder.serializationSchema)
.withMessageRouter(builder.messageRouter)
.withSemantic(builder.semantic)
.withCryptoKeyReader(builder.cryptoKeyReader)
.withEncryptionKeys(builder.encryptionKeys));
super(builder.adminUrl, builder.getDefaultTopicName(), builder.clientConf, builder.properties, builder.serializationSchema, builder.messageRouter, builder.semantic, builder.cryptoKeyReader, builder.encryptionKeys);
this.serializationSchema = builder.serializationSchema;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,64 +89,6 @@
@Slf4j
abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, FlinkPulsarSinkBase.PulsarTransactionState<T>, Void> implements CheckpointedFunction {

public static class Config<T> {
private String adminUrl;
private Optional<String> defaultTopicName;
private ClientConfigurationData clientConf;
private Properties properties;
private PulsarSerializationSchema<T> serializationSchema;
private MessageRouter messageRouter;
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE;
private CryptoKeyReader cryptoKeyReader;
private Set<String> encryptionKeys = new HashSet<>();

public Config<T> withAdminUrl(final String adminUrl) {
this.adminUrl = adminUrl;
return this;
}

public Config<T> withDefaultTopicName(final String defaultTopicName) {
this.defaultTopicName = Optional.ofNullable(defaultTopicName);
return this;
}

public Config<T> withClientConf(ClientConfigurationData clientConf) {
this.clientConf = clientConf;
return this;
}

public Config<T> withProperties(final Properties properties) {
this.properties = properties;
return this;
}

public Config<T> withSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
return this;
}

public Config<T> withMessageRouter(final MessageRouter messageRouter) {
this.messageRouter = messageRouter;
return this;
}

public Config<T> withSemantic(final PulsarSinkSemantic semantic) {
this.semantic = semantic;
return this;
}

public Config<T> withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
return this;
}

public Config<T> withEncryptionKeys(final Set<String> encryptionKeys) {
this.encryptionKeys = encryptionKeys;
return this;
}

}

protected String adminUrl;

protected ClientConfigurationData clientConfigurationData;
Expand Down Expand Up @@ -215,32 +157,39 @@ public FlinkPulsarSinkBase(
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter) {
this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE);
this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE, null, new HashSet<>());
}

public FlinkPulsarSinkBase(final Config<T> config) {
public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
ClientConfigurationData clientConf,
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic,
final CryptoKeyReader cryptoKeyReader,
final Set<String> encryptionKeys) {
super(new TransactionStateSerializer(), VoidSerializer.INSTANCE);

this.adminUrl = checkNotNull(config.adminUrl);
this.semantic = config.semantic;
this.cryptoKeyReader = config.cryptoKeyReader;
this.encryptionKeys = config.encryptionKeys;
this.adminUrl = checkNotNull(adminUrl);
this.semantic = semantic;

if (config.defaultTopicName.isPresent()) {
if (defaultTopicName.isPresent()) {
this.forcedTopic = true;
this.defaultTopic = config.defaultTopicName.get();
this.defaultTopic = defaultTopicName.get();
} else {
this.forcedTopic = false;
this.defaultTopic = null;
}

this.serializationSchema = config.serializationSchema;
this.serializationSchema = serializationSchema;

this.messageRouter = config.messageRouter;
this.messageRouter = messageRouter;

this.clientConfigurationData = config.clientConf;
this.clientConfigurationData = clientConf;

this.properties = checkNotNull(config.properties);
this.properties = checkNotNull(properties);

this.caseInsensitiveParams =
SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties));
Expand Down Expand Up @@ -274,40 +223,9 @@ public FlinkPulsarSinkBase(final Config<T> config) {
if (this.clientConfigurationData.getServiceUrl() == null) {
throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
}
}

public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
ClientConfigurationData clientConf,
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {
this(new Config<T>()
.withAdminUrl(adminUrl)
.withDefaultTopicName(defaultTopicName.orElse(null))
.withClientConf(clientConf)
.withProperties(properties)
.withSerializationSchema(serializationSchema)
.withMessageRouter(messageRouter)
.withSemantic(semantic)
);
}

public FlinkPulsarSinkBase(
String serviceUrl,
String adminUrl,
Optional<String> defaultTopicName,
Properties properties,
PulsarSerializationSchema serializationSchema,
MessageRouter messageRouter) {
this(adminUrl,
defaultTopicName,
PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties),
properties,
serializationSchema,
messageRouter);
this.cryptoKeyReader = cryptoKeyReader;
this.encryptionKeys = encryptionKeys;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,25 +677,28 @@ protected PulsarFetcher<T> createFetcher(
StreamingRuntimeContext streamingRuntime,
boolean useMetrics,
Set<TopicRange> excludeStartMessageIds) throws Exception {
return new PulsarFetcher.Builder()
.withSourceContext(sourceContext)
.withSeedTopicsWithInitialOffsets(seedTopicsWithInitialOffsets)
.withExcludeStartMessageIds(excludeStartMessageIds)
.withWatermarkStrategy(watermarkStrategy)
.withProcessingTimeProvider(processingTimeProvider)
.withAutoWatermarkInterval(autoWatermarkInterval)
.withUserCodeClassLoader(userCodeClassLoader)
.withRuntimeContext(streamingRuntime)
.withClientConf(clientConfigurationData)
.withReaderConf(readerConf)
.withPollTimeoutMs(pollTimeoutMs)
.withCommitMaxRetries(commitMaxRetries)
.withDeserializer(deserializer)
.withMetadataReader(metadataReader)
.withConsumerMetricGroup(streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP))
.withUseMetrics(useMetrics)
.withCryptoKeyReader(cryptoKeyReader)
.build();

//readerConf.putIfAbsent(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, getSubscriptionName());

return new PulsarFetcher<>(
sourceContext,
seedTopicsWithInitialOffsets,
excludeStartMessageIds,
watermarkStrategy,
processingTimeProvider,
autoWatermarkInterval,
userCodeClassLoader,
streamingRuntime,
clientConfigurationData,
readerConf,
pollTimeoutMs,
commitMaxRetries,
deserializer,
metadataReader,
streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP),
useMetrics,
cryptoKeyReader
);
}

public void joinDiscoveryLoopThread() throws InterruptedException {
Expand Down
Loading

0 comments on commit 50b9e9b

Please sign in to comment.