Skip to content

Commit

Permalink
streamnative#386: Implemented passing of CryptoKeyReader.
Browse files Browse the repository at this point in the history
As requested [here](streamnative#386), it's now possible to pass a `CryptoKeyReader` (and encryption keys) to `FlinkPulsarSource` and `FlinkPulsarSink`.

Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors.

Added integration test. (Maybe it can be moved to one of the other tests to avoid overhead.)
  • Loading branch information
objecttrouve committed Aug 8, 2021
1 parent 0b70d91 commit 42f0d4f
Show file tree
Hide file tree
Showing 6 changed files with 816 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -41,8 +45,115 @@
@Slf4j
public class FlinkPulsarSink<T> extends FlinkPulsarSinkBase<T> {

public static class Builder<T> {
private String adminUrl;
private String defaultTopicName;
private ClientConfigurationData clientConf;
private Properties properties;
private PulsarSerializationSchema<T> serializationSchema;
private MessageRouter messageRouter = null;
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE;
private String serviceUrl;
private CryptoKeyReader cryptoKeyReader;
private final Set<String> encryptionKeys = new HashSet<>();

public Builder<T> withAdminUrl(final String adminUrl) {
this.adminUrl = adminUrl;
return this;
}

public Builder<T> withDefaultTopicName(final String defaultTopicName) {
this.defaultTopicName = defaultTopicName;
return this;
}

public Builder<T> withClientConf(final ClientConfigurationData clientConf) {
this.clientConf = clientConf;
return this;
}

public Builder<T> withProperties(final Properties properties) {
this.properties = properties;
return this;
}

public Builder<T> withPulsarSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
return this;
}

public Builder<T> withMessageRouter(final MessageRouter messageRouter) {
this.messageRouter = messageRouter;
return this;
}

public Builder<T> withSemantic(final PulsarSinkSemantic semantic) {
this.semantic = semantic;
return this;
}

public Builder<T> withServiceUrl(final String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}

public Builder<T> withCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
return this;
}

public Builder<T> withEncryptionKeys(String... encryptionKeys) {
this.encryptionKeys.addAll(Arrays.asList(encryptionKeys));
return this;
}

public FlinkPulsarSink<T> build(){
if (adminUrl == null) {
throw new IllegalStateException("Admin URL must be set.");
}
if (serializationSchema == null) {
throw new IllegalStateException("Serialization schema must be set.");
}
if (semantic == null) {
throw new IllegalStateException("Semantic must be set.");
}
if (properties == null) {
throw new IllegalStateException("Properties must be set.");
}
if (serviceUrl != null && clientConf != null) {
throw new IllegalStateException("Set either client conf or service URL but not both.");
}
if (serviceUrl != null){
clientConf = PulsarClientUtils.newClientConf(checkNotNull(serviceUrl), properties);
}
if (clientConf == null){
throw new IllegalStateException("Client conf must be set.");
}
if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){
throw new IllegalStateException("Set crypto key reader and encryption keys in conjunction.");
}
return new FlinkPulsarSink<>(this);
}

}

private final PulsarSerializationSchema<T> serializationSchema;

public FlinkPulsarSink(final Builder<T> builder) {
super(
new FlinkPulsarSinkBase.Config<T>()
.withAdminUrl(builder.adminUrl)
.withDefaultTopicName(builder.defaultTopicName)
.withClientConf(builder.clientConf)
.withProperties(builder.properties)
.withSerializationSchema(builder.serializationSchema)
.withMessageRouter(builder.messageRouter)
.withSemantic(builder.semantic)
.withCryptoKeyReader(builder.cryptoKeyReader)
.withEncryptionKeys(builder.encryptionKeys));
this.serializationSchema = builder.serializationSchema;
}

public FlinkPulsarSink(
String adminUrl,
Optional<String> defaultTopicName,
Expand All @@ -51,9 +162,14 @@ public FlinkPulsarSink(
PulsarSerializationSchema serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {

super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic);
this.serializationSchema = serializationSchema;
this(new Builder<T>()
.withAdminUrl(adminUrl)
.withDefaultTopicName(defaultTopicName.orElse(null))
.withClientConf(clientConf)
.withProperties(properties)
.withPulsarSerializationSchema(serializationSchema)
.withMessageRouter(messageRouter)
.withSemantic(semantic));
}

public FlinkPulsarSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
Expand All @@ -62,10 +63,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -86,6 +89,64 @@
@Slf4j
abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, FlinkPulsarSinkBase.PulsarTransactionState<T>, Void> implements CheckpointedFunction {

public static class Config<T> {
private String adminUrl;
private Optional<String> defaultTopicName;
private ClientConfigurationData clientConf;
private Properties properties;
private PulsarSerializationSchema<T> serializationSchema;
private MessageRouter messageRouter;
private PulsarSinkSemantic semantic = PulsarSinkSemantic.AT_LEAST_ONCE;
private CryptoKeyReader cryptoKeyReader;
private Set<String> encryptionKeys = new HashSet<>();

public Config<T> withAdminUrl(final String adminUrl) {
this.adminUrl = adminUrl;
return this;
}

public Config<T> withDefaultTopicName(final String defaultTopicName) {
this.defaultTopicName = Optional.ofNullable(defaultTopicName);
return this;
}

public Config<T> withClientConf(ClientConfigurationData clientConf) {
this.clientConf = clientConf;
return this;
}

public Config<T> withProperties(final Properties properties) {
this.properties = properties;
return this;
}

public Config<T> withSerializationSchema(final PulsarSerializationSchema<T> serializationSchema) {
this.serializationSchema = serializationSchema;
return this;
}

public Config<T> withMessageRouter(final MessageRouter messageRouter) {
this.messageRouter = messageRouter;
return this;
}

public Config<T> withSemantic(final PulsarSinkSemantic semantic) {
this.semantic = semantic;
return this;
}

public Config<T> withCryptoKeyReader(final CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
return this;
}

public Config<T> withEncryptionKeys(final Set<String> encryptionKeys) {
this.encryptionKeys = encryptionKeys;
return this;
}

}

protected String adminUrl;

protected ClientConfigurationData clientConfigurationData;
Expand Down Expand Up @@ -143,6 +204,10 @@ abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, Flin

protected transient Map<String, Producer<T>> topic2Producer;

private final CryptoKeyReader cryptoKeyReader;

private final Set<String> encryptionKeys;

public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
Expand All @@ -153,34 +218,29 @@ public FlinkPulsarSinkBase(
this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE);
}

public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
ClientConfigurationData clientConf,
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {
public FlinkPulsarSinkBase(final Config<T> config) {
super(new TransactionStateSerializer(), VoidSerializer.INSTANCE);

this.adminUrl = checkNotNull(adminUrl);
this.semantic = semantic;
this.adminUrl = checkNotNull(config.adminUrl);
this.semantic = config.semantic;
this.cryptoKeyReader = config.cryptoKeyReader;
this.encryptionKeys = config.encryptionKeys;

if (defaultTopicName.isPresent()) {
if (config.defaultTopicName.isPresent()) {
this.forcedTopic = true;
this.defaultTopic = defaultTopicName.get();
this.defaultTopic = config.defaultTopicName.get();
} else {
this.forcedTopic = false;
this.defaultTopic = null;
}

this.serializationSchema = serializationSchema;
this.serializationSchema = config.serializationSchema;

this.messageRouter = messageRouter;
this.messageRouter = config.messageRouter;

this.clientConfigurationData = clientConf;
this.clientConfigurationData = config.clientConf;

this.properties = checkNotNull(properties);
this.properties = checkNotNull(config.properties);

this.caseInsensitiveParams =
SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties));
Expand Down Expand Up @@ -216,6 +276,25 @@ public FlinkPulsarSinkBase(
}
}

public FlinkPulsarSinkBase(
String adminUrl,
Optional<String> defaultTopicName,
ClientConfigurationData clientConf,
Properties properties,
PulsarSerializationSchema<T> serializationSchema,
MessageRouter messageRouter,
PulsarSinkSemantic semantic) {
this(new Config<T>()
.withAdminUrl(adminUrl)
.withDefaultTopicName(defaultTopicName.orElse(null))
.withClientConf(clientConf)
.withProperties(properties)
.withSerializationSchema(serializationSchema)
.withMessageRouter(messageRouter)
.withSemantic(semantic)
);
}

public FlinkPulsarSinkBase(
String serviceUrl,
String adminUrl,
Expand Down Expand Up @@ -340,6 +419,12 @@ protected Producer<T> createProducer(
// maximizing the throughput
.batchingMaxBytes(5 * 1024 * 1024)
.loadConf(producerConf);
if (cryptoKeyReader != null){
builder.cryptoKeyReader(cryptoKeyReader);
for (final String encryptionKey : this.encryptionKeys) {
builder.addEncryptionKey(encryptionKey);
}
}
if (messageRouter == null) {
return builder.create();
} else {
Expand Down
Loading

0 comments on commit 42f0d4f

Please sign in to comment.