Skip to content

Commit

Permalink
emit Pulsar source and sink configuration property values to logs str…
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Aug 30, 2021
1 parent 0280311 commit 9453349
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -271,6 +273,14 @@ public void open(Configuration parameters) throws Exception {
topic2Producer = new HashMap<>();
}
//super.open(parameters);

try {
ObjectMapper m = new ObjectMapper();
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Pulsar sink config: {}", w.writeValueAsString(properties));
} catch (IOException e) {
log.error("Failed to dump sink config info", e);
}
}

protected void initializeSendCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -501,6 +504,14 @@ public void open(Configuration parameters) throws Exception {
taskIndex, ownedTopicStarts.size(), ownedTopicStarts);
}
}

try {
ObjectMapper m = new ObjectMapper();
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Pulsar source config: {}", w.writeValueAsString(properties));
} catch (IOException e) {
log.error("Failed to dump source config info", e);
}
}

protected String getSubscriptionName() {
Expand Down

0 comments on commit 9453349

Please sign in to comment.