-
Notifications
You must be signed in to change notification settings - Fork 120
#386: Implemented passing of CryptoKeyReader
.
#387
base: master
Are you sure you want to change the base?
#386: Implemented passing of CryptoKeyReader
.
#387
Conversation
As requested [here](streamnative#386), it's now possible to pass a `CryptoKeyReader` (and encryption keys) to `FlinkPulsarSource` and `FlinkPulsarSink`. Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors. Added integration test. (Maybe it can be moved to one of the other tests to avoid overhead.)
@jianyun8023 @syhily could you help take a look? |
Hi @objecttrouve, tks for your contribution. Using a builder pattern on internal Fetcher don't sounds like a Good design. The detailed review advice would be given in weekend. |
private final PulsarSerializationSchema<T> serializationSchema; | ||
|
||
public FlinkPulsarSink(final Builder<T> builder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be public for a Builder pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely. Must have missed it. Will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made c'tors private.
private final PulsarSerializationSchema<T> serializationSchema; | ||
|
||
public FlinkPulsarSink(final Builder<T> builder) { | ||
super( | ||
new FlinkPulsarSinkBase.Config<T>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be boilerplate. Why not simply use ctor? Since it's a builder pattern. The end user would never touch this ctor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The end user would never touch this ctor.
I wasn't sure about which classes an end user could touch. Therefore, I tried hard not to change the public c'tors plus not to add more than absolutely necessary. But if the class is just for internal use, right, it's unnecessary boilerplate. Will change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to the old c'tor.
|
||
super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic); | ||
this.serializationSchema = serializationSchema; | ||
this(new Builder<T>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor old ctor doen't look like a good choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal was basically to delegate as much as possible to a single c'tor that contains the interesting logic, instead of repeating variants of the logic inside the c'tor all over the place.
But to be honest, I don't mind very much. So I'll just revert to the old c'tor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to the old c'tor.
if (clientConf == null){ | ||
throw new IllegalStateException("Client conf must be set."); | ||
} | ||
if ((cryptoKeyReader == null) != (encryptionKeys.isEmpty())){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cryptoKeyReader
requires extra serialization check. Use Preconditions.checkState(InstantiationUtil.isSerializable(cryptoKeyReader))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the check.
@@ -86,6 +89,64 @@ | |||
@Slf4j | |||
abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, FlinkPulsarSinkBase.PulsarTransactionState<T>, Void> implements CheckpointedFunction { | |||
|
|||
public static class Config<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need builder for this base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much the same reasoning as implemented throughout. A builder or config object makes it easier to add parameters in the future, without breaking an API or resulting in more and more overloaded c'tors. I have to admit I'm not sure which classes exactly were intended strictly for private purposes, so I just applied the pattern all over the place.
(Knowing that builders and config objects imply a lot of boilerplate.)
If overloading c'tors or changing the API of the class is OK, I'm completely fine with it though. So I'll change this piece, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to the old c'tor.
PulsarSerializationSchema<T> serializationSchema, | ||
MessageRouter messageRouter, | ||
PulsarSinkSemantic semantic) { | ||
public FlinkPulsarSinkBase(final Config<T> config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may just keep this ctor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much the same as above, I'll change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to the old c'tor.
if (clientConf == null){ | ||
throw new IllegalStateException("Client conf mustn't be null. Either provide a client conf or a service URL plus properties."); | ||
} | ||
return new FlinkPulsarSource<>(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cryptoKeyReader
doesn't require encryptionKeys
here. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to check the serialization for encryptionKeys
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I think it doesn't require the encryptionKeys
. But I'll double check.
OK, will check serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the source, no encryption keys are required. Added serialization check.
streamingRuntime.getMetricGroup().addGroup(PULSAR_SOURCE_METRICS_GROUP), | ||
useMetrics | ||
); | ||
return new PulsarFetcher.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the PulsarFetcher
is an internal API. Plz just keep the ctor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to old c'tor.
@syhily, thanks for the feedback! I'll make the requested changes in my next free time slot. |
Turned out builder pattern wasn't necessary here. There's one path now for the CryptoKeyReader (plus the encryption keys) and possibly others where it's null (and the keys are empty).
@syhily, I made the changes as I understood your requests. Please let me know if there's anything else. Also, if I should squash. (Left multiple commits for better relatability to your remarks. For the time being.) |
Thanks for your contribution. The PR looks reasonable on my side. I will merge this on the weekend. |
Thanks @syhily! |
Implemented passing a
CryptoKeyReader
(and encryption keys) toFlinkPulsarSource
andFlinkPulsarSink
, as requested here.Used builder pattern for easy extensibility without breaking or excessively overloading public c'tors.
Added test. (Maybe it can be moved to one of the other tests to avoid overhead.)