Skip to content

Commit

Permalink
fix flink sql properties key style (streamnative#329)
Browse files Browse the repository at this point in the history
* fix flink sql properties key style

* fix code style

* compatible with existing LOWER_HYPHEN format configuration
  • Loading branch information
Jianyun Zhao authored Jun 5, 2021
1 parent cd06798 commit 7abc462
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,7 @@ public PulsarFetcher(
this.runtimeContext = runtimeContext;
this.clientConf = clientConf;
this.readerConf = readerConf == null ? new HashMap<>() : readerConf;

String failOnDataLossVal = this.readerConf.getOrDefault(PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY, "true").toString();
this.failOnDataLoss = Boolean.parseBoolean(failOnDataLossVal);
this.readerConf.remove(PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY);

this.failOnDataLoss = SourceSinkUtils.getFailOnDataLossAndRemoveKey(this.readerConf);
this.pollTimeoutMs = pollTimeoutMs;
this.commitMaxRetries = commitMaxRetries;
this.deserializer = deserializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
public class PulsarOptions {

// option key prefix for different modules
public static final String PULSAR_CLIENT_OPTION_KEY_PREFIX = "pulsar.client.";
public static final String PULSAR_PRODUCER_OPTION_KEY_PREFIX = "pulsar.producer.";
public static final String PULSAR_READER_OPTION_KEY_PREFIX = "pulsar.reader.";
public static final String PULSAR_OPTION_KEY_PREFIX = "pulsar.";
public static final String PULSAR_CLIENT_OPTION_KEY_PREFIX = PULSAR_OPTION_KEY_PREFIX + "client.";
public static final String PULSAR_PRODUCER_OPTION_KEY_PREFIX = PULSAR_OPTION_KEY_PREFIX + "producer.";
public static final String PULSAR_READER_OPTION_KEY_PREFIX = PULSAR_OPTION_KEY_PREFIX + "reader.";

// topic options
public static final String TOPIC_SINGLE_OPTION_KEY = "topic";
Expand Down Expand Up @@ -56,10 +57,10 @@ public class PulsarOptions {
public static final String SEND_TIMEOUT_MS = "send-timeout-ms";
public static final String SUBSCRIPTION_ROLE_OPTION_KEY = "subscription-role-prefix";
public static final String COMMIT_MAX_RETRIES = "commit-max-retries";
public static final String FAIL_ON_DATA_LOSS_OPTION_KEY = "fail-on-data-loss";
public static final String ENABLE_KEY_HASH_RANGE_KEY = "enable-key-hash-range";
public static final String KEY_DISABLED_METRICS = "key-disable-metrics";
public static final String OLD_STATE_VERSION = "old-state-version";
public static final String FAIL_ON_DATA_LOSS_OPTION_KEY = "failOnDataLoss";

public static final String INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
"Some data may have been lost because they are not available in Pulsar any more; either the\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.apache.flink.streaming.connectors.pulsar.internal;

import org.apache.flink.shaded.guava18.com.google.common.base.CaseFormat;

import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -144,7 +146,7 @@ public static Map<String, Object> getReaderParams(Map<String, String> parameters

public static Map<String, String> toCaceInsensitiveParams(Map<String, String> parameters) {
return parameters.entrySet().stream()
.collect(Collectors.toMap(t -> t.getKey().toLowerCase(Locale.ROOT), t -> t.getValue()));
.collect(Collectors.toMap(t -> t.getKey().toLowerCase(Locale.ROOT), t -> t.getValue()));
}

public static Map<String, Object> getProducerParams(Map<String, String> parameters) {
Expand Down Expand Up @@ -187,9 +189,20 @@ public static Range distributeRange(int countOfSubTasks, int indexOfSubTasks) {

public static int getOldStateVersion(Map<String, String> caseInsensitiveParams, int defaultValue) {
final String value = caseInsensitiveParams.get(PulsarOptions.OLD_STATE_VERSION);
if (StringUtils.isBlank(value)){
if (StringUtils.isBlank(value)) {
return defaultValue;
}
return Integer.parseInt(value);
}

public static boolean getFailOnDataLossAndRemoveKey(Map<String, Object> readerConf) {
String key = PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY;
if (!readerConf.containsKey(key)) {
key = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_HYPHEN, key);
}
String failOnDataLossVal = readerConf.getOrDefault(key, "true").toString();
final boolean value = Boolean.parseBoolean(failOnDataLossVal);
readerConf.remove(key);
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava18.com.google.common.base.CaseFormat;

import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -49,6 +51,7 @@
import java.util.Set;
import java.util.stream.IntStream;

import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.PULSAR_OPTION_KEY_PREFIX;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.AT_LEAST_ONCE;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.EXACTLY_ONCE;
import static org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic.NONE;
Expand Down Expand Up @@ -378,7 +381,10 @@ public static Properties getPulsarProperties(Map<String, String> tableOptions) {
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(key -> {
final String value = tableOptions.get(key);
final String subKey = key.substring((PROPERTIES_PREFIX).length());
String subKey = key.substring((PROPERTIES_PREFIX).length());
if (subKey.startsWith(PULSAR_OPTION_KEY_PREFIX)) {
subKey = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, subKey);
}
pulsarProperties.put(subKey, value);
});
}
Expand Down

0 comments on commit 7abc462

Please sign in to comment.