From 475e48791b873c516c4c26774cda3b45a268cd70 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Fri, 29 Nov 2024 20:38:47 +0800 Subject: [PATCH] [core] Remove all lineage implementation (#4607) --- .../generated/catalog_configuration.html | 6 - .../org/apache/paimon/factories/Factory.java | 2 +- .../paimon/lineage/DataLineageEntity.java | 33 --- .../apache/paimon/lineage/LineageMeta.java | 102 --------- .../paimon/lineage/LineageMetaFactory.java | 37 ---- .../paimon/lineage/TableLineageEntity.java | 32 --- .../lineage/TableLineageEntityImpl.java | 56 ----- .../apache/paimon/options/CatalogOptions.java | 22 -- .../paimon/catalog/AbstractCatalog.java | 26 +-- .../paimon/table/CatalogEnvironment.java | 18 +- .../table/system/SinkTableLineageTable.java | 64 ------ .../table/system/SourceTableLineageTable.java | 64 ------ .../table/system/SystemTableLoader.java | 29 +-- .../table/system/TableLineageTable.java | 168 -------------- .../flink/AbstractFlinkTableFactory.java | 58 +---- .../paimon/flink/CatalogTableITCase.java | 6 +- .../paimon/flink/FlinkLineageITCase.java | 206 ------------------ .../org.apache.paimon.factories.Factory | 3 - .../org/apache/paimon/hive/HiveCatalog.java | 3 +- 19 files changed, 12 insertions(+), 923 deletions(-) delete mode 100644 paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java delete mode 100644 paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java delete mode 100644 paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java delete mode 100644 paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java delete mode 100644 paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java delete mode 100644 paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java delete mode 100644 paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java delete mode 100644 paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java delete mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 6706d5c421a1..63f7adda1e0d 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -86,12 +86,6 @@ Boolean Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations. - -
lineage-meta
- (none) - String - The lineage meta to store table and data lineage information.

Possible values:
-
lock-acquire-timeout
8 min diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java index b0f1ec84c170..74796879ef4b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java +++ b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java @@ -20,7 +20,7 @@ /** * Base interface for all kind of factories that create object instances from a list of key-value - * pairs in Paimon's catalog, lineage. + * pairs in Paimon's catalog. * *

A factory is uniquely identified by {@link Class} and {@link #identifier()}. * diff --git a/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java b/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java deleted file mode 100644 index e7401a9be3b7..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lineage/DataLineageEntity.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lineage; - -import org.apache.paimon.data.Timestamp; - -/** - * Data lineage entity with table lineage, barrier id and snapshot id for table source and sink - * lineage. - */ -public interface DataLineageEntity extends TableLineageEntity { - long getBarrierId(); - - long getSnapshotId(); - - Timestamp getCreateTime(); -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java deleted file mode 100644 index 5d1c42daf6c8..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMeta.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lineage; - -import org.apache.paimon.predicate.Predicate; - -import javax.annotation.Nullable; - -import java.util.Iterator; - -/** Metadata store will manage table lineage and data lineage information for the catalog. */ -public interface LineageMeta extends AutoCloseable { - /** - * Save the source table and job lineage. - * - * @param entity the table lineage entity - */ - void saveSourceTableLineage(TableLineageEntity entity); - - /** - * Delete the source table lineage for given job. - * - * @param job the job for table lineage - */ - void deleteSourceTableLineage(String job); - - /** - * Get source table and job lineages. - * - * @param predicate the predicate for the table lineages - * @return the iterator for source table and job lineages - */ - Iterator sourceTableLineages(@Nullable Predicate predicate); - - /** - * Save the sink table and job lineage. - * - * @param entity the table lineage entity - */ - void saveSinkTableLineage(TableLineageEntity entity); - - /** - * Get sink table and job lineages. - * - * @param predicate the predicate for the table lineages - * @return the iterator for sink table and job lineages - */ - Iterator sinkTableLineages(@Nullable Predicate predicate); - - /** - * Delete the sink table lineage for given job. - * - * @param job the job for table lineage - */ - void deleteSinkTableLineage(String job); - - /** - * Save the source table and job lineage. - * - * @param entity the data lineage entity - */ - void saveSourceDataLineage(DataLineageEntity entity); - - /** - * Get source data and job lineages. - * - * @param predicate the predicate for the table lineages - * @return the iterator for source table and job lineages - */ - Iterator sourceDataLineages(@Nullable Predicate predicate); - - /** - * Save the sink table and job lineage. - * - * @param entity the data lineage entity - */ - void saveSinkDataLineage(DataLineageEntity entity); - - /** - * Get sink data and job lineages. - * - * @param predicate the predicate for the table lineages - * @return the iterator for sink table and job lineages - */ - Iterator sinkDataLineages(@Nullable Predicate predicate); -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java b/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java deleted file mode 100644 index 11c6d3a1173c..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lineage/LineageMetaFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lineage; - -import org.apache.paimon.factories.Factory; -import org.apache.paimon.options.Options; - -import java.io.Serializable; - -/** Factory to create {@link LineageMeta}. Each factory should have a unique identifier. */ -public interface LineageMetaFactory extends Factory, Serializable { - - LineageMeta create(LineageMetaContext context); - - /** - * Context has all options in a catalog and is used in factory to create {@link LineageMeta}. - */ - interface LineageMetaContext { - Options options(); - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java deleted file mode 100644 index c4312c4eb080..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntity.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lineage; - -import org.apache.paimon.data.Timestamp; - -/** Table lineage entity with database, table and job for table source and sink lineage. */ -public interface TableLineageEntity { - String getDatabase(); - - String getTable(); - - String getJob(); - - Timestamp getCreateTime(); -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java b/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java deleted file mode 100644 index ef11ee87f15c..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lineage/TableLineageEntityImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lineage; - -import org.apache.paimon.data.Timestamp; - -/** Default implementation for {@link TableLineageEntity}. */ -public class TableLineageEntityImpl implements TableLineageEntity { - private final String database; - private final String table; - private final String job; - private final Timestamp timestamp; - - public TableLineageEntityImpl(String database, String table, String job, Timestamp timestamp) { - this.database = database; - this.table = table; - this.job = job; - this.timestamp = timestamp; - } - - @Override - public String getDatabase() { - return database; - } - - @Override - public String getTable() { - return table; - } - - @Override - public String getJob() { - return job; - } - - @Override - public Timestamp getCreateTime() { - return timestamp; - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index f69af2d59910..bb8cfae68284 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -18,8 +18,6 @@ package org.apache.paimon.options; -import org.apache.paimon.options.description.Description; -import org.apache.paimon.options.description.TextElement; import org.apache.paimon.table.CatalogTableType; import java.time.Duration; @@ -130,26 +128,6 @@ public class CatalogOptions { .withDescription( "Controls the max number for snapshots per table in the catalog are cached."); - public static final ConfigOption LINEAGE_META = - key("lineage-meta") - .stringType() - .noDefaultValue() - .withDescription( - Description.builder() - .text( - "The lineage meta to store table and data lineage information.") - .linebreak() - .linebreak() - .text("Possible values:") - .linebreak() - .list( - TextElement.text( - "\"jdbc\": Use standard jdbc to store table and data lineage information.")) - .list( - TextElement.text( - "\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage.")) - .build()); - public static final ConfigOption ALLOW_UPPER_CASE = ConfigOptions.key("allow-upper-case") .booleanType() diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 16b76513d7ef..2b277a29b835 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -24,7 +24,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.lineage.LineageMetaFactory; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.FileStoreCommit; @@ -62,7 +61,6 @@ import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.CoreOptions.createCommitUser; import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; -import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; @@ -76,19 +74,14 @@ public abstract class AbstractCatalog implements Catalog { protected final Map tableDefaultOptions; protected final Options catalogOptions; - @Nullable protected final LineageMetaFactory lineageMetaFactory; - protected AbstractCatalog(FileIO fileIO) { this.fileIO = fileIO; - this.lineageMetaFactory = null; this.tableDefaultOptions = new HashMap<>(); this.catalogOptions = new Options(); } protected AbstractCatalog(FileIO fileIO, Options options) { this.fileIO = fileIO; - this.lineageMetaFactory = - findAndCreateLineageMeta(options, AbstractCatalog.class.getClassLoader()); this.tableDefaultOptions = Catalog.tableDefaultOptions(options.toMap()); this.catalogOptions = options; } @@ -377,27 +370,13 @@ public void alterTable( protected abstract void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException; - @Nullable - private LineageMetaFactory findAndCreateLineageMeta(Options options, ClassLoader classLoader) { - return options.getOptional(LINEAGE_META) - .map( - meta -> - FactoryUtil.discoverFactory( - classLoader, LineageMetaFactory.class, meta)) - .orElse(null); - } - @Override public Table getTable(Identifier identifier) throws TableNotExistException { if (isSystemDatabase(identifier.getDatabaseName())) { String tableName = identifier.getTableName(); Table table = SystemTableLoader.loadGlobal( - tableName, - fileIO, - this::allTablePaths, - catalogOptions, - lineageMetaFactory); + tableName, fileIO, this::allTablePaths, catalogOptions); if (table == null) { throw new TableNotExistException(identifier); } @@ -444,8 +423,7 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist lockFactory().orElse(null), lockContext().orElse(null), identifier), - metastoreClientFactory(identifier, tableMeta.schema).orElse(null), - lineageMetaFactory)); + metastoreClientFactory(identifier, tableMeta.schema).orElse(null))); CoreOptions options = table.coreOptions(); if (options.type() == TableType.OBJECT_TABLE) { String objectLocation = options.objectLocation(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index 9ff5f9b4f6a8..a722d9e21ada 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -19,7 +19,6 @@ package org.apache.paimon.table; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.lineage.LineageMetaFactory; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.Lock; @@ -27,10 +26,7 @@ import java.io.Serializable; -/** - * Catalog environment in table which contains log factory, metastore client factory and lineage - * meta. - */ +/** Catalog environment in table which contains log factory, metastore client factory. */ public class CatalogEnvironment implements Serializable { private static final long serialVersionUID = 1L; @@ -39,23 +35,20 @@ public class CatalogEnvironment implements Serializable { @Nullable private final String uuid; private final Lock.Factory lockFactory; @Nullable private final MetastoreClient.Factory metastoreClientFactory; - @Nullable private final LineageMetaFactory lineageMetaFactory; public CatalogEnvironment( @Nullable Identifier identifier, @Nullable String uuid, Lock.Factory lockFactory, - @Nullable MetastoreClient.Factory metastoreClientFactory, - @Nullable LineageMetaFactory lineageMetaFactory) { + @Nullable MetastoreClient.Factory metastoreClientFactory) { this.identifier = identifier; this.uuid = uuid; this.lockFactory = lockFactory; this.metastoreClientFactory = metastoreClientFactory; - this.lineageMetaFactory = lineageMetaFactory; } public static CatalogEnvironment empty() { - return new CatalogEnvironment(null, null, Lock.emptyFactory(), null, null); + return new CatalogEnvironment(null, null, Lock.emptyFactory(), null); } @Nullable @@ -76,9 +69,4 @@ public Lock.Factory lockFactory() { public MetastoreClient.Factory metastoreClientFactory() { return metastoreClientFactory; } - - @Nullable - public LineageMetaFactory lineageMetaFactory() { - return lineageMetaFactory; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java deleted file mode 100644 index 71efce070471..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SinkTableLineageTable.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.system; - -import org.apache.paimon.lineage.LineageMeta; -import org.apache.paimon.lineage.LineageMetaFactory; -import org.apache.paimon.options.Options; -import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.InnerTableRead; - -import java.util.Map; - -/** - * This is a system table to display all the sink table lineages. - * - *

- *  For example:
- *     If we select * from sys.sink_table_lineage, we will get
- *     database_name       table_name       job_name      create_time
- *        default            test0            job1    2023-10-22 20:35:12
- *       database1           test1            job1    2023-10-28 21:35:52
- *          ...               ...             ...             ...
- *     We can write sql to fetch the information we need.
- * 
- */ -public class SinkTableLineageTable extends TableLineageTable { - - public static final String SINK_TABLE_LINEAGE = "sink_table_lineage"; - - public SinkTableLineageTable(LineageMetaFactory lineageMetaFactory, Options options) { - super(lineageMetaFactory, options); - } - - @Override - public InnerTableRead newRead() { - return new TableLineageRead(lineageMetaFactory, options, LineageMeta::sinkTableLineages); - } - - @Override - public String name() { - return SINK_TABLE_LINEAGE; - } - - @Override - public Table copy(Map dynamicOptions) { - return new SinkTableLineageTable(lineageMetaFactory, options); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java deleted file mode 100644 index 5d9904fa6675..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SourceTableLineageTable.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.system; - -import org.apache.paimon.lineage.LineageMeta; -import org.apache.paimon.lineage.LineageMetaFactory; -import org.apache.paimon.options.Options; -import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.InnerTableRead; - -import java.util.Map; - -/** - * This is a system table to display all the source table lineages. - * - *
- *  For example:
- *     If we select * from sys.source_table_lineage, we will get
- *     database_name       table_name       job_name      create_time
- *        default            test0            job1    2023-10-22 20:35:12
- *       database1           test1            job1    2023-10-28 21:35:52
- *          ...               ...             ...             ...
- *     We can write sql to fetch the information we need.
- * 
- */ -public class SourceTableLineageTable extends TableLineageTable { - - public static final String SOURCE_TABLE_LINEAGE = "source_table_lineage"; - - public SourceTableLineageTable(LineageMetaFactory lineageMetaFactory, Options options) { - super(lineageMetaFactory, options); - } - - @Override - public InnerTableRead newRead() { - return new TableLineageRead(lineageMetaFactory, options, LineageMeta::sourceTableLineages); - } - - @Override - public String name() { - return SOURCE_TABLE_LINEAGE; - } - - @Override - public Table copy(Map dynamicOptions) { - return new SourceTableLineageTable(lineageMetaFactory, options); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index 3d5b211316ec..763e4d121673 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -20,7 +20,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.lineage.LineageMetaFactory; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -37,7 +36,6 @@ import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; import static org.apache.paimon.table.system.AggregationFieldsTable.AGGREGATION_FIELDS; import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS; import static org.apache.paimon.table.system.AuditLogTable.AUDIT_LOG; @@ -52,12 +50,9 @@ import static org.apache.paimon.table.system.PartitionsTable.PARTITIONS; import static org.apache.paimon.table.system.ReadOptimizedTable.READ_OPTIMIZED; import static org.apache.paimon.table.system.SchemasTable.SCHEMAS; -import static org.apache.paimon.table.system.SinkTableLineageTable.SINK_TABLE_LINEAGE; import static org.apache.paimon.table.system.SnapshotsTable.SNAPSHOTS; -import static org.apache.paimon.table.system.SourceTableLineageTable.SOURCE_TABLE_LINEAGE; import static org.apache.paimon.table.system.StatisticTable.STATISTICS; import static org.apache.paimon.table.system.TagsTable.TAGS; -import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Loader to load system {@link Table}s. */ public class SystemTableLoader { @@ -95,38 +90,18 @@ public static Table loadGlobal( String tableName, FileIO fileIO, Supplier>> allTablePaths, - Options catalogOptions, - @Nullable LineageMetaFactory lineageMetaFactory) { + Options catalogOptions) { switch (tableName.toLowerCase()) { case ALL_TABLE_OPTIONS: return new AllTableOptionsTable(fileIO, allTablePaths.get()); case CATALOG_OPTIONS: return new CatalogOptionsTable(catalogOptions); - case SOURCE_TABLE_LINEAGE: - { - checkNotNull( - lineageMetaFactory, - String.format( - "Lineage meta should be configured for catalog with %s", - LINEAGE_META.key())); - return new SourceTableLineageTable(lineageMetaFactory, catalogOptions); - } - case SINK_TABLE_LINEAGE: - { - checkNotNull( - lineageMetaFactory, - String.format( - "Lineage meta should be configured for catalog with %s", - LINEAGE_META.key())); - return new SinkTableLineageTable(lineageMetaFactory, catalogOptions); - } default: return null; } } public static List loadGlobalTableNames() { - return Arrays.asList( - ALL_TABLE_OPTIONS, CATALOG_OPTIONS, SOURCE_TABLE_LINEAGE, SINK_TABLE_LINEAGE); + return Arrays.asList(ALL_TABLE_OPTIONS, CATALOG_OPTIONS); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java deleted file mode 100644 index aeaf3ca3b133..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TableLineageTable.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.system; - -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.disk.IOManager; -import org.apache.paimon.lineage.LineageMeta; -import org.apache.paimon.lineage.LineageMetaFactory; -import org.apache.paimon.lineage.TableLineageEntity; -import org.apache.paimon.options.Options; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.table.ReadonlyTable; -import org.apache.paimon.table.source.InnerTableRead; -import org.apache.paimon.table.source.InnerTableScan; -import org.apache.paimon.table.source.ReadOnceTableScan; -import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.TableRead; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.RowType; -import org.apache.paimon.types.TimestampType; -import org.apache.paimon.types.VarCharType; -import org.apache.paimon.utils.IteratorRecordReader; -import org.apache.paimon.utils.ProjectedRow; - -import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.function.BiFunction; - -import static org.apache.paimon.utils.Preconditions.checkNotNull; - -/** Base lineage table for source and sink table lineage. */ -public abstract class TableLineageTable implements ReadonlyTable { - protected final LineageMetaFactory lineageMetaFactory; - protected final Options options; - - public static final RowType TABLE_TYPE = - new RowType( - Arrays.asList( - new DataField( - 0, "database_name", new VarCharType(VarCharType.MAX_LENGTH)), - new DataField(1, "table_name", new VarCharType(VarCharType.MAX_LENGTH)), - new DataField(2, "job_name", new VarCharType(VarCharType.MAX_LENGTH)), - new DataField(3, "create_time", new TimestampType()))); - - protected TableLineageTable(LineageMetaFactory lineageMetaFactory, Options options) { - this.lineageMetaFactory = lineageMetaFactory; - this.options = options; - } - - @Override - public InnerTableScan newScan() { - return new ReadOnceTableScan() { - @Override - public InnerTableScan withFilter(Predicate predicate) { - return this; - } - - @Override - protected Plan innerPlan() { - /// TODO get the real row count for plan. - return () -> Collections.singletonList((Split) () -> 1L); - } - }; - } - - @Override - public RowType rowType() { - return TABLE_TYPE; - } - - @Override - public List primaryKeys() { - return Arrays.asList("database_name", "table_name", "job_name"); - } - - /** Table lineage read with lineage meta query. */ - protected static class TableLineageRead implements InnerTableRead { - private final LineageMetaFactory lineageMetaFactory; - private final Options options; - private final BiFunction> - tableLineageQuery; - @Nullable private Predicate predicate; - private RowType readType; - - protected TableLineageRead( - LineageMetaFactory lineageMetaFactory, - Options options, - BiFunction> - tableLineageQuery) { - this.lineageMetaFactory = lineageMetaFactory; - this.options = options; - this.tableLineageQuery = tableLineageQuery; - this.predicate = null; - } - - @Override - public InnerTableRead withFilter(Predicate predicate) { - this.predicate = predicate; - return this; - } - - @Override - public InnerTableRead withReadType(RowType readType) { - this.readType = readType; - return this; - } - - @Override - public TableRead withIOManager(IOManager ioManager) { - return this; - } - - @Override - public RecordReader createReader(Split split) throws IOException { - try (LineageMeta lineageMeta = lineageMetaFactory.create(() -> options)) { - Iterator sourceTableLineages = - tableLineageQuery.apply(lineageMeta, predicate); - return new IteratorRecordReader<>( - Iterators.transform( - sourceTableLineages, - entity -> { - checkNotNull(entity); - GenericRow row = - GenericRow.of( - BinaryString.fromString(entity.getDatabase()), - BinaryString.fromString(entity.getTable()), - BinaryString.fromString(entity.getJob()), - entity.getCreateTime()); - if (readType != null) { - return ProjectedRow.from( - readType, TableLineageTable.TABLE_TYPE) - .replaceRow(row); - } else { - return row; - } - })); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 9f90a2cd0130..6b10dbb84bf4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -25,15 +25,10 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.Timestamp; import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.flink.sink.FlinkTableSink; import org.apache.paimon.flink.source.DataTableSource; import org.apache.paimon.flink.source.SystemTableSource; -import org.apache.paimon.lineage.LineageMeta; -import org.apache.paimon.lineage.LineageMetaFactory; -import org.apache.paimon.lineage.TableLineageEntity; -import org.apache.paimon.lineage.TableLineageEntityImpl; import org.apache.paimon.options.Options; import org.apache.paimon.options.OptionsUtils; import org.apache.paimon.schema.Schema; @@ -47,7 +42,6 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.ValidationException; @@ -71,7 +65,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.function.BiConsumer; import java.util.regex.Pattern; import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; @@ -109,23 +102,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { isStreamingMode, context.getObjectIdentifier()); } else { - Table table = buildPaimonTable(context); - if (table instanceof FileStoreTable) { - storeTableLineage( - ((FileStoreTable) table).catalogEnvironment().lineageMetaFactory(), - context, - (entity, lineageFactory) -> { - try (LineageMeta lineage = - lineageFactory.create(() -> Options.fromMap(table.options()))) { - lineage.saveSourceTableLineage(entity); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } return new DataTableSource( context.getObjectIdentifier(), - table, + buildPaimonTable(context), isStreamingMode, context, createOptionalLogStoreFactory(context).orElse(null)); @@ -134,46 +113,13 @@ public DynamicTableSource createDynamicTableSource(Context context) { @Override public DynamicTableSink createDynamicTableSink(Context context) { - Table table = buildPaimonTable(context); - if (table instanceof FileStoreTable) { - storeTableLineage( - ((FileStoreTable) table).catalogEnvironment().lineageMetaFactory(), - context, - (entity, lineageFactory) -> { - try (LineageMeta lineage = - lineageFactory.create(() -> Options.fromMap(table.options()))) { - lineage.saveSinkTableLineage(entity); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } return new FlinkTableSink( context.getObjectIdentifier(), - table, + buildPaimonTable(context), context, createOptionalLogStoreFactory(context).orElse(null)); } - private void storeTableLineage( - @Nullable LineageMetaFactory lineageMetaFactory, - Context context, - BiConsumer tableLineage) { - if (lineageMetaFactory != null) { - String pipelineName = context.getConfiguration().get(PipelineOptions.NAME); - if (pipelineName == null) { - throw new ValidationException("Cannot get pipeline name for lineage meta."); - } - tableLineage.accept( - new TableLineageEntityImpl( - context.getObjectIdentifier().getDatabaseName(), - context.getObjectIdentifier().getObjectName(), - pipelineName, - Timestamp.fromEpochMillis(System.currentTimeMillis())), - lineageMetaFactory); - } - } - @Override public Set> requiredOptions() { return Collections.emptySet(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 96334de3f87b..10b03b7139ae 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -21,8 +21,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.table.system.AllTableOptionsTable; import org.apache.paimon.table.system.CatalogOptionsTable; -import org.apache.paimon.table.system.SinkTableLineageTable; -import org.apache.paimon.table.system.SourceTableLineageTable; import org.apache.paimon.utils.BlockingIterator; import org.apache.commons.lang3.StringUtils; @@ -200,9 +198,7 @@ public void testSystemDatabase() { assertThat(sql("SHOW TABLES")) .containsExactlyInAnyOrder( Row.of(AllTableOptionsTable.ALL_TABLE_OPTIONS), - Row.of(CatalogOptionsTable.CATALOG_OPTIONS), - Row.of(SourceTableLineageTable.SOURCE_TABLE_LINEAGE), - Row.of(SinkTableLineageTable.SINK_TABLE_LINEAGE)); + Row.of(CatalogOptionsTable.CATALOG_OPTIONS)); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java deleted file mode 100644 index 5b61d5272f80..000000000000 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkLineageITCase.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink; - -import org.apache.paimon.lineage.DataLineageEntity; -import org.apache.paimon.lineage.LineageMeta; -import org.apache.paimon.lineage.LineageMetaFactory; -import org.apache.paimon.lineage.TableLineageEntity; -import org.apache.paimon.predicate.Predicate; - -import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.junit.jupiter.api.Test; - -import javax.annotation.Nullable; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** ITCase for flink table and data lineage. */ -public class FlinkLineageITCase extends CatalogITCaseBase { - private static final String THROWING_META = "throwing-meta"; - private static final Map> jobSourceTableLineages = - new HashMap<>(); - private static final Map> jobSinkTableLineages = - new HashMap<>(); - - @Override - protected List ddl() { - return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c INT)"); - } - - @Override - protected Map catalogOptions() { - return Collections.singletonMap(LINEAGE_META.key(), THROWING_META); - } - - @Test - public void testTableLineage() throws Exception { - // Validate for source and sink lineage when pipeline name is null - assertThatThrownBy( - () -> tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await()) - .hasCauseExactlyInstanceOf(ValidationException.class) - .hasRootCauseMessage("Cannot get pipeline name for lineage meta."); - assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM T").collect().close()) - .hasCauseExactlyInstanceOf(ValidationException.class) - .hasRootCauseMessage("Cannot get pipeline name for lineage meta."); - - // Call storeSinkTableLineage and storeSourceTableLineage methods - tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, "insert_t_job"); - tEnv.executeSql("INSERT INTO T VALUES (1, 2, 3),(4, 5, 6);").await(); - assertThat(jobSinkTableLineages).isNotEmpty(); - TableLineageEntity sinkTableLineage = - jobSinkTableLineages.get("insert_t_job").get("default.T.insert_t_job"); - assertThat(sinkTableLineage.getTable()).isEqualTo("T"); - - List sinkTableRows = new ArrayList<>(); - try (CloseableIterator iterator = - tEnv.executeSql("SELECT * FROM sys.sink_table_lineage").collect()) { - while (iterator.hasNext()) { - sinkTableRows.add(iterator.next()); - } - } - assertThat(sinkTableRows.size()).isEqualTo(1); - Row sinkTableRow = sinkTableRows.get(0); - assertThat(sinkTableRow.getField("database_name")).isEqualTo("default"); - assertThat(sinkTableRow.getField("table_name")).isEqualTo("T"); - assertThat(sinkTableRow.getField("job_name")).isEqualTo("insert_t_job"); - - tEnv.getConfig().getConfiguration().set(PipelineOptions.NAME, "select_t_job"); - tEnv.executeSql("SELECT * FROM T").collect().close(); - assertThat(jobSourceTableLineages).isNotEmpty(); - TableLineageEntity sourceTableLineage = - jobSourceTableLineages.get("select_t_job").get("default.T.select_t_job"); - assertThat(sourceTableLineage.getTable()).isEqualTo("T"); - - List sourceTableRows = new ArrayList<>(); - try (CloseableIterator iterator = - tEnv.executeSql("SELECT * FROM sys.source_table_lineage").collect()) { - while (iterator.hasNext()) { - sourceTableRows.add(iterator.next()); - } - } - assertThat(sourceTableRows.size()).isEqualTo(1); - Row sourceTableRow = sourceTableRows.get(0); - assertThat(sourceTableRow.getField("database_name")).isEqualTo("default"); - assertThat(sourceTableRow.getField("table_name")).isEqualTo("T"); - assertThat(sourceTableRow.getField("job_name")).isEqualTo("select_t_job"); - } - - private static String getTableLineageKey(TableLineageEntity entity) { - return String.format("%s.%s.%s", entity.getDatabase(), entity.getTable(), entity.getJob()); - } - - /** Factory to create throwing lineage meta. */ - public static class TestingMemoryLineageMetaFactory implements LineageMetaFactory { - private static final long serialVersionUID = 1L; - - @Override - public String identifier() { - return THROWING_META; - } - - @Override - public LineageMeta create(LineageMetaContext context) { - return new TestingMemoryLineageMeta(); - } - } - - /** Throwing specific exception in each method. */ - private static class TestingMemoryLineageMeta implements LineageMeta { - - @Override - public void saveSourceTableLineage(TableLineageEntity entity) { - jobSourceTableLineages - .computeIfAbsent(entity.getJob(), key -> new HashMap<>()) - .put(getTableLineageKey(entity), entity); - } - - @Override - public void deleteSourceTableLineage(String job) { - jobSourceTableLineages.remove(job); - } - - @Override - public Iterator sourceTableLineages(@Nullable Predicate predicate) { - return jobSourceTableLineages.values().stream() - .flatMap(v -> v.values().stream()) - .iterator(); - } - - @Override - public void saveSinkTableLineage(TableLineageEntity entity) { - assertThat(entity.getJob()).isEqualTo("insert_t_job"); - assertThat(entity.getTable()).isEqualTo("T"); - assertThat(entity.getDatabase()).isEqualTo("default"); - jobSinkTableLineages - .computeIfAbsent(entity.getJob(), key -> new HashMap<>()) - .put(getTableLineageKey(entity), entity); - } - - @Override - public Iterator sinkTableLineages(@Nullable Predicate predicate) { - return jobSinkTableLineages.values().stream() - .flatMap(v -> v.values().stream()) - .iterator(); - } - - @Override - public void deleteSinkTableLineage(String job) { - jobSinkTableLineages.remove(job); - } - - @Override - public void saveSourceDataLineage(DataLineageEntity entity) { - assertThat(entity.getJob()).isEqualTo("select_t_job"); - assertThat(entity.getTable()).isEqualTo("T"); - assertThat(entity.getDatabase()).isEqualTo("default"); - throw new UnsupportedOperationException("Method saveSinkTableLineage is not supported"); - } - - @Override - public Iterator sourceDataLineages(@Nullable Predicate predicate) { - throw new UnsupportedOperationException(); - } - - @Override - public void saveSinkDataLineage(DataLineageEntity entity) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator sinkDataLineages(@Nullable Predicate predicate) { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws Exception {} - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory index fcb6fe982943..3c05b5fba3ec 100644 --- a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -15,8 +15,5 @@ org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory -# Lineage meta factory -org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory - # Catalog lock factory org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory \ No newline at end of file diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 9a90995f282d..5157e606006c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -610,8 +610,7 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier) lockFactory().orElse(null), lockContext().orElse(null), identifier), - metastoreClientFactory(identifier, tableMeta.schema()).orElse(null), - lineageMetaFactory)); + metastoreClientFactory(identifier, tableMeta.schema()).orElse(null))); } catch (TableNotExistException ignore) { }