Skip to content

Commit

Permalink
update case
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Sep 11, 2024
1 parent 766b690 commit e78a600
Show file tree
Hide file tree
Showing 23 changed files with 65 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.FactoryUtil;

import org.apache.doris.flink.table.DorisConfigOptions;
import org.junit.Test;

import java.util.HashMap;
Expand All @@ -33,7 +34,7 @@ public class DorisCatalogFactoryTest {
@Test
public void testCreateCatalog() {
final Map<String, String> options = new HashMap<>();
options.put("type", "doris");
options.put("type", DorisConfigOptions.IDENTIFIER);
options.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
options.put("fenodes", "127.0.0.1:8030");
options.put("default-database", "doris_db");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.collect.Lists;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.container.AbstractITCaseService;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
Expand Down Expand Up @@ -402,7 +403,7 @@ private static CatalogTable createTable() {
TABLE_SCHEMA,
new HashMap<String, String>() {
{
put("connector", "doris");
put("connector", DorisConfigOptions.IDENTIFIER);
put("table.properties.replication_num", "1");
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.doris.flink.container.AbstractContainerTestBase;
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -72,7 +73,9 @@ public void testDoris2Doris() throws Exception {
+ "c16 ROW<name String, age int>, \n"
+ "c17 STRING \n"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'sink.label-prefix' = '"
Expand Down Expand Up @@ -109,7 +112,9 @@ public void testDoris2Doris() throws Exception {
+ "c16 ROW<name String, age int>, \n"
+ "c17 STRING \n"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'sink.label-prefix' = '"
+ UUID.randomUUID()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ private List<String> setMysql2DorisDefaultConfig(List<String> argList) {
argList.add(PASSWORD + "=" + getMySQLPassword());
argList.add(MYSQL_CONF);
argList.add(DATABASE_NAME + "=" + DATABASE);
// argList.add(MYSQL_CONF);
// argList.add("server-time-zone=UTC");

// set doris database
argList.add(DORIS_DATABASE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.catalog;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink;
package org.apache.doris.flink.example;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.lookup;
package org.apache.doris.flink.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.lookup;
package org.apache.doris.flink.example;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.doris.flink.sink.DorisSink.Builder;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.apache.doris.flink.utils.MockSource;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -172,7 +173,9 @@ public void testTableSinkJsonFormat() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'benodes' = '%s',"
+ " 'auto-redirect' = 'false',"
Expand Down Expand Up @@ -221,7 +224,9 @@ public void testTableBatch() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
Expand Down Expand Up @@ -309,7 +314,9 @@ public void testTableGroupCommit() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
Expand Down Expand Up @@ -358,7 +365,9 @@ public void testTableGzFormat() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -143,7 +144,9 @@ public void testTableSource() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
Expand Down Expand Up @@ -191,7 +194,9 @@ public void testTableSourceOldApi() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'source.use-old-api' = 'true',"
Expand Down Expand Up @@ -228,7 +233,9 @@ public void testTableSourceAllOptions() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'source.use-old-api' = 'true',"
Expand Down Expand Up @@ -274,7 +281,9 @@ public void testTableSourceFilterAndProjectionPushDown() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
Expand Down Expand Up @@ -314,7 +323,9 @@ public void testTableSourceFilterWithUnionAll() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
Expand Down Expand Up @@ -357,7 +368,9 @@ public void testJobManagerFailoverSource() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
Expand Down Expand Up @@ -436,7 +449,9 @@ public void testTaskManagerFailoverSource() throws Exception {
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = 'doris',"
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void testDorisSinkProperties() {

private Map<String, String> getAllOptions() {
Map<String, String> options = new HashMap<>();
options.put("connector", "doris");
options.put("connector", DorisConfigOptions.IDENTIFIER);
options.put("fenodes", "127.0.0.1:8030");
options.put("benodes", "127.0.0.1:8040");
options.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
Expand Down

0 comments on commit e78a600

Please sign in to comment.