Skip to content

Commit

Permalink
[flink] Fix FlinkGenericCatalog can not get procedure (#4321)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Oct 14, 2024
1 parent 3a7f790 commit eaa4a24
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,11 @@ public List<String> listProcedures(String dbName)
*/
public Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
return ProcedureUtil.getProcedure(paimon.catalog(), procedurePath)
.orElse(flink.getProcedure(procedurePath));
Optional<Procedure> procedure = ProcedureUtil.getProcedure(paimon.catalog(), procedurePath);
if (procedure.isPresent()) {
return procedure.get();
} else {
return flink.getProcedure(procedurePath);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,20 @@ public void testReadPaimonAllProcedures() {
assertThat(result)
.contains(Row.of("compact"), Row.of("merge_into"), Row.of("migrate_table"));
}

@Test
public void testCreateTag() {
sql(
"CREATE TABLE paimon_t ( "
+ "f0 INT, "
+ "f1 INT "
+ ") WITH ('connector'='paimon', 'file.format' = 'avro' )");
sql("INSERT INTO paimon_t VALUES (1, 1), (2, 2)");
assertThat(sql("SELECT * FROM paimon_t"))
.containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2, 2));
sql("CALL sys.create_tag('test_db.paimon_t', 'tag_1')");

List<Row> result = sql("SELECT tag_name FROM paimon_t$tags");
assertThat(result).contains(Row.of("tag_1"));
}
}

0 comments on commit eaa4a24

Please sign in to comment.