From 66ccaf1b18eaf29375d5ad8ba3791fba753be880 Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Fri, 11 Jun 2021 12:07:40 +0800 Subject: [PATCH] emit Pulsar source and sink configuration property values to logs #335 --- .../connectors/pulsar/FlinkPulsarSinkBase.java | 10 ++++++++++ .../connectors/pulsar/FlinkPulsarSource.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index aadc15187..bd1049414 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -40,6 +40,8 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; @@ -271,6 +273,14 @@ public void open(Configuration parameters) throws Exception { topic2Producer = new HashMap<>(); } //super.open(parameters); + + try { + ObjectMapper m = new ObjectMapper(); + ObjectWriter w = m.writerWithDefaultPrettyPrinter(); + log.info("Pulsar sink config: {}", w.writeValueAsString(properties)); + } catch (IOException e) { + log.error("Failed to dump sink config info", e); + } } protected void initializeSendCallback() { diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index c09790ce4..839d18b20 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -62,6 +62,8 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; @@ -69,6 +71,7 @@ import org.apache.pulsar.shade.com.google.common.collect.Maps; import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -496,6 +499,14 @@ public void open(Configuration parameters) throws Exception { taskIndex, ownedTopicStarts.size(), ownedTopicStarts); } } + + try { + ObjectMapper m = new ObjectMapper(); + ObjectWriter w = m.writerWithDefaultPrettyPrinter(); + log.info("Pulsar source config: {}", w.writeValueAsString(properties)); + } catch (IOException e) { + log.error("Failed to dump source config info", e); + } } protected String getSubscriptionName() {