Skip to content

Commit

Permalink
improve example for doc
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Nov 18, 2024
1 parent 498a7fa commit 6c95c88
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,49 @@
public class DorisSinkExample {

public static void main(String[] args) throws Exception {
JSONFormatWrite();
}

public static void JSONFormatWrite() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(30000);
DorisSink.Builder<String> builder = DorisSink.builder();

DorisOptions dorisOptions =
DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.student")
.setUsername("root")
.setPassword("")
.build();

Properties properties = new Properties();
properties.setProperty("read_json_by_line", "true");
properties.setProperty("format", "json");

DorisExecutionOptions executionOptions =
DorisExecutionOptions.builder()
.setLabelPrefix("label-doris")
.setDeletable(false)
.setBatchMode(true)
.setStreamLoadProp(properties)
.build();

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionOptions)
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisOptions);

List<String> data = new ArrayList<>();
data.add("{\"id\":3,\"name\":\"Michael\",\"age\":28}");
data.add("{\"id\":4,\"name\":\"David\",\"age\":38}");

env.fromCollection(data).sinkTo(builder.build());
env.execute("doris test");
}

public static void CSVFormatWrite() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
Expand All @@ -49,24 +92,13 @@ public static void main(String[] args) throws Exception {
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
DorisSink.Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
readOptionBuilder
.setDeserializeArrowAsync(false)
.setDeserializeQueueSize(64)
.setExecMemLimit(2147483648L)
.setRequestQueryTimeoutS(3600)
.setRequestBatchSize(1000)
.setRequestConnectTimeoutMs(10000)
.setRequestReadTimeoutMs(10000)
.setRequestRetries(3)
.setRequestTabletSize(1024 * 1024);
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("127.0.0.1:8040")
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("db.table")
.setUsername("test")
.setPassword("test");
Expand All @@ -77,7 +109,7 @@ public static void main(String[] args) throws Exception {
.setBufferSize(8 * 1024)
.setBufferCount(3);

builder.setDorisReadOptions(readOptionBuilder.build())
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,8 @@

package org.apache.doris.flink.example;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
Expand All @@ -45,40 +40,37 @@ public class DorisSinkExampleRowData {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.enableCheckpointing(10000);
env.setParallelism(1);
env.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));

DorisSink.Builder<RowData> builder = DorisSink.builder();

Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
// properties.setProperty("read_json_by_line", "true");
// properties.setProperty("format", "json");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("db.tbl")
.setTableIdentifier("test.students")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
executionBuilder
.setLabelPrefix(UUID.randomUUID().toString())
.setDeletable(false)
.setStreamLoadProp(properties);

// flink rowdata‘s schema
String[] fields = {"name", "age"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()};
String[] fields = {"id", "name", "age"};
DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(256), DataTypes.INT()};

builder.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(
RowDataSerializer.builder() // serialize according to rowdata
.setType(LoadConstants.CSV) // .setType(LoadConstants.CSV)
.setType(LoadConstants.CSV)
.setFieldDelimiter(",")
.setFieldNames(fields) // .setFieldDelimiter(",")
.setFieldNames(fields)
.setFieldType(types)
.build())
.setDorisOptions(dorisBuilder.build());
Expand All @@ -91,16 +83,17 @@ public static void main(String[] args) throws Exception {
@Override
public void flatMap(String s, Collector<RowData> out)
throws Exception {
GenericRowData genericRowData = new GenericRowData(2);
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, 1);
genericRowData.setField(
0, StringData.fromString("beijing"));
genericRowData.setField(1, 123);
1, StringData.fromString("Michael"));
genericRowData.setField(2, 18);
out.collect(genericRowData);

GenericRowData genericRowData2 = new GenericRowData(2);
genericRowData2.setField(
0, StringData.fromString("shanghai"));
genericRowData2.setField(1, 1234);
GenericRowData genericRowData2 = new GenericRowData(3);
genericRowData2.setField(0, 2);
genericRowData2.setField(1, StringData.fromString("David"));
genericRowData2.setField(2, 38);
out.collect(genericRowData2);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,34 @@

package org.apache.doris.flink.example;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.batch.RecordWithMeta;
import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer;

import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;

public class DorisSinkMultiTableExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(15000);

DorisBatchSink.Builder<RecordWithMeta> builder = DorisBatchSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();

readOptionBuilder
.setDeserializeArrowAsync(false)
.setDeserializeQueueSize(64)
.setExecMemLimit(2147483648L)
.setRequestQueryTimeoutS(3600)
.setRequestBatchSize(1000)
.setRequestConnectTimeoutMs(10000)
.setRequestReadTimeoutMs(10000)
.setRequestRetries(3)
.setRequestTabletSize(1024 * 1024);

DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
Properties properties = new Properties();
properties.setProperty("column_separator", ",");
properties.setProperty("line_delimiter", "\n");
properties.setProperty("format", "csv");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.test_flink_tmp")
.setTableIdentifier("")
.setUsername("root")
.setPassword("");

Expand All @@ -66,21 +54,24 @@ public static void main(String[] args) throws Exception {
.setLabelPrefix("label")
.setStreamLoadProp(properties)
.setDeletable(false)
.setBufferFlushMaxBytes(8 * 1024)
.setBufferFlushMaxRows(10)
.setBatchMode(true)
.setBufferFlushMaxBytes(10 * 1024 * 1024)
.setBufferFlushMaxRows(10000)
.setBufferFlushIntervalMs(1000 * 10);

builder.setDorisReadOptions(readOptionBuilder.build())
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisBuilder.build())
.setSerializer(new RecordWithMetaSerializer());

// RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
// RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
// DataStreamSource<RecordWithMeta> stringDataStreamSource = env.fromCollection(
// Arrays.asList(record, record1));
// stringDataStreamSource.sinkTo(builder.build());
RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
DataStreamSource<RecordWithMeta> stringDataStreamSource =
env.fromCollection(Arrays.asList(record, record1));
stringDataStreamSource.sinkTo(builder.build());

/*
// mock unbounded streaming source
env.addSource(
new SourceFunction<RecordWithMeta>() {
private Long id = 1000000L;
Expand All @@ -90,6 +81,12 @@ public void run(SourceContext<RecordWithMeta> out) throws Exception {
while (true) {
id = id + 1;
RecordWithMeta record =
new RecordWithMeta(
"test",
"test_flink_tmp",
UUID.randomUUID() + ",1");
out.collect(record);
record =
new RecordWithMeta(
"test",
"test_flink_tmp1",
Expand All @@ -98,18 +95,18 @@ public void run(SourceContext<RecordWithMeta> out) throws Exception {
record =
new RecordWithMeta(
"test",
"test_flink_tmp",
"test_flink_tmp2",
UUID.randomUUID() + ",1");
out.collect(record);
Thread.sleep(3000);
Thread.sleep(1000);
}
}
@Override
public void cancel() {}
})
.sinkTo(builder.build());

**/
env.execute("doris multi table test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,84 @@

package org.apache.doris.flink.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.cfg.DorisStreamOptions;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.doris.flink.source.DorisSource;

import java.util.List;
import java.util.Properties;

public class DorisSourceDataStream {

public static void main(String[] args) throws Exception {
useArrowFlightSQLRead();
}

public static void useArrowFlightSQLRead() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DorisOptions option =
DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.students")
.setUsername("root")
.setPassword("")
.build();

DorisReadOptions readOptions =
DorisReadOptions.builder()
.setUseFlightSql(true)
.setFlightSqlPort(29747)
.setFilterQuery("age > 1")
.build();

DorisSource<List<?>> dorisSource =
DorisSource.<List<?>>builder()
.setDorisOptions(option)
.setDorisReadOptions(readOptions)
.setDeserializer(new SimpleListDeserializationSchema())
.build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
env.execute("Doris Source Test");
}

public static void useThriftRead() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DorisOptions option =
DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setTableIdentifier("test.students")
.setUsername("root")
.setPassword("")
.build();

DorisReadOptions readOptions = DorisReadOptions.builder().build();
DorisSource<List<?>> dorisSource =
DorisSource.<List<?>>builder()
.setDorisOptions(option)
.setDorisReadOptions(readOptions)
.setDeserializer(new SimpleListDeserializationSchema())
.build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
env.execute("Doris Source Test");
}

public static void useSourceFunctionRead() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("fenodes", "FE_IP:8030");
properties.put("fenodes", "127.0.0.1:8030");
properties.put("username", "root");
properties.put("password", "");
properties.put("table.identifier", "db.table");
properties.put("doris.read.field", "id,code,name");
properties.put("doris.filter.query", "name='doris'");
properties.put("table.identifier", "test.students");
DorisStreamOptions options = new DorisStreamOptions(properties);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.addSource(new DorisSourceFunction(options, new SimpleListDeserializationSchema()))
.print();
Expand Down

0 comments on commit 6c95c88

Please sign in to comment.