From 9453349b1631848ea9608c155d89b4e29c2fbc75 Mon Sep 17 00:00:00 2001 From: gavingaozhangmin Date: Fri, 11 Jun 2021 12:07:40 +0800 Subject: [PATCH] emit Pulsar source and sink configuration property values to logs #335 --- .../connectors/pulsar/FlinkPulsarSinkBase.java | 10 ++++++++++ .../connectors/pulsar/FlinkPulsarSource.java | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index 5d1887c2..21d98ec3 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -40,6 +40,8 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.MessageId; @@ -271,6 +273,14 @@ public void open(Configuration parameters) throws Exception { topic2Producer = new HashMap<>(); } //super.open(parameters); + + try { + ObjectMapper m = new ObjectMapper(); + ObjectWriter w = m.writerWithDefaultPrettyPrinter(); + log.info("Pulsar sink config: {}", w.writeValueAsString(properties)); + } catch (IOException e) { + log.error("Failed to dump sink config info", e); + } } protected void initializeSendCallback() { diff --git a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index 3edd5ce4..26dfd7fa 100644 --- a/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -62,6 +62,8 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; @@ -69,6 +71,7 @@ import org.apache.pulsar.shade.com.google.common.collect.Maps; import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -501,6 +504,14 @@ public void open(Configuration parameters) throws Exception { taskIndex, ownedTopicStarts.size(), ownedTopicStarts); } } + + try { + ObjectMapper m = new ObjectMapper(); + ObjectWriter w = m.writerWithDefaultPrettyPrinter(); + log.info("Pulsar source config: {}", w.writeValueAsString(properties)); + } catch (IOException e) { + log.error("Failed to dump source config info", e); + } } protected String getSubscriptionName() {