diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarSerializationSchemaWrapper.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarSerializationSchemaWrapper.java index 2511d24e..f1bc30aa 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarSerializationSchemaWrapper.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/util/serialization/PulsarSerializationSchemaWrapper.java @@ -45,6 +45,7 @@ public class PulsarSerializationSchemaWrapper implements PulsarSerializationS private final SchemaMode schemaMode; private final SerializableFunction topicExtractor; private final SerializableFunction keyExtractor; + private final SerializableFunction> deliverAtExtractor; private PulsarSerializationSchemaWrapper(SerializationSchema serializationSchema, RecordSchemaType recordSchemaType, @@ -53,7 +54,8 @@ private PulsarSerializationSchemaWrapper(SerializationSchema serializationSch DataType dataType, SchemaMode schemaMode, SerializableFunction topicExtractor, - SerializableFunction keyExtractor) { + SerializableFunction keyExtractor, + SerializableFunction> deliverAtExtractor) { this.serializationSchema = serializationSchema; this.recordSchemaType = recordSchemaType; this.schema = schema; @@ -62,6 +64,7 @@ private PulsarSerializationSchemaWrapper(SerializationSchema serializationSch this.schemaMode = checkNotNull(schemaMode); this.topicExtractor = topicExtractor; this.keyExtractor = keyExtractor; + this.deliverAtExtractor = deliverAtExtractor; } @Override @@ -79,6 +82,9 @@ public void serialize(T element, TypedMessageBuilder messageBuilder) { if (keyExtractor != null) { messageBuilder.keyBytes(keyExtractor.apply(element)); } + if (deliverAtExtractor != null) { + deliverAtExtractor.apply(element).ifPresent(deliverAt -> messageBuilder.deliverAt(deliverAt)); + } messageBuilder.value(element); } @@ -147,6 +153,7 @@ public static class Builder { private SchemaMode mode; private SerializableFunction topicExtractor = (T) -> null; private SerializableFunction keyExtractor; + private SerializableFunction> deliverAtExtractor; public Builder(SerializationSchema serializationSchema) { this.serializationSchema = serializationSchema; @@ -199,6 +206,12 @@ public PulsarSerializationSchemaWrapper.Builder setKeyExtractor( return this; } + public PulsarSerializationSchemaWrapper.Builder setDeliverAtExtractor( + SerializableFunction> deliverAtExtractor) { + this.deliverAtExtractor = deliverAtExtractor; + return this; + } + public PulsarSerializationSchemaWrapper build() { checkNotNull(mode, "Must set mode " + "use useSpecialMode or useAtomicMode or usePojoMode or useRowMode"); @@ -210,7 +223,8 @@ public PulsarSerializationSchemaWrapper build() { dataType, mode, topicExtractor, - keyExtractor); + keyExtractor, + deliverAtExtractor); } } }