Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Manifest and Statistic support query by timestamp #4304

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/content/maintenance/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ SELECT * FROM my_table$manifests /*+ OPTIONS('scan.tag-name'='tag1') */;
+--------------------------------+-------------+------------------+-------------------+---------------+
1 rows in set
*/

- You can also query the manifest with specified timestamp in unix milliseconds
SELECT * FROM my_table$manifests /*+ OPTIONS('scan.timestamp-millis'='1678883047356') */;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
1 rows in set
*/
```

### Aggregation fields Table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,19 @@ public Identifier identifier() {

@Override
public Optional<Statistics> statistics() {
Snapshot latestSnapshot;
Snapshot latestSnapshot = null;
Long snapshotId = coreOptions().scanSnapshotId();
String tagName = coreOptions().scanTagName();

if (snapshotId == null) {
if (!StringUtils.isEmpty(tagName) && tagManager().tagExists(tagName)) {
return store().newStatsFileHandler()
.readStats(tagManager().tag(tagName).trimToSnapshot());
} else {
snapshotId = snapshotManager().latestSnapshotId();
}
}
Long timestampMills = coreOptions().scanTimestampMills();

if (snapshotId != null && snapshotManager().snapshotExists(snapshotId)) {
latestSnapshot = snapshotManager().snapshot(snapshotId);
} else {
} else if (!StringUtils.isEmpty(tagName) && tagManager().tagExists(tagName)) {
return store().newStatsFileHandler()
.readStats(tagManager().tag(tagName).trimToSnapshot());
} else if (timestampMills != null) {
latestSnapshot = snapshotManager().earlierOrEqualTimeMills(timestampMills);
} else if (snapshotId == null && StringUtils.isEmpty(tagName)) {
latestSnapshot = snapshotManager().latestSnapshot();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,24 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

/** A {@link Table} for showing committing snapshots of table. */
public class ManifestsTable implements ReadonlyTable {

private static final Logger LOG = LoggerFactory.getLogger(ManifestsTable.class);

private static final long serialVersionUID = 1L;

public static final String MANIFESTS = "manifests";
Expand Down Expand Up @@ -200,27 +208,39 @@ private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) {
SnapshotManager snapshotManager = dataTable.snapshotManager();
Long snapshotId = coreOptions.scanSnapshotId();
String tagName = coreOptions.scanTagName();
Snapshot snapshot = null;
Long timestampMills = coreOptions.scanTimestampMills();

Snapshot snapshot;
if (snapshotId != null) {
// reminder user with snapshot id range
if (!snapshotManager.snapshotExists(snapshotId)) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
throw new SnapshotNotExistException(
String.format(
"Specified scan.snapshot-id %s is not exist, you can set it in range from %s to %s",
snapshotId, earliestSnapshotId, latestSnapshotId));
"Specified parameter %s = %s is not exist, you can set it in range from %s to %s.",
SCAN_SNAPSHOT_ID.key(),
snapshotId,
earliestSnapshotId,
latestSnapshotId));
}
snapshot = snapshotManager.snapshot(snapshotId);
} else {
if (!StringUtils.isEmpty(tagName) && dataTable.tagManager().tagExists(tagName)) {
snapshot = dataTable.tagManager().tag(tagName).trimToSnapshot();
} else {
snapshot = snapshotManager.latestSnapshot();
} else if (!StringUtils.isEmpty(tagName)) {
if (!dataTable.tagManager().tagExists(tagName)) {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new RuntimeException(
String.format("Specified parameter %s = %s is not exist.", SCAN_TAG_NAME.key(), tagName));

String.format(
"Specified parameter %s = %s is not exist.",
SCAN_TAG_NAME.key(), tagName));
}
snapshot = dataTable.tagManager().tag(tagName).trimToSnapshot();
} else if (timestampMills != null) {
snapshot = snapshotManager.earlierOrEqualTimeMills(timestampMills);
} else {
snapshot = snapshotManager.latestSnapshot();
}

if (snapshot == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a warn log?

LOG.warn("Check if your snapshot is empty.");
return Collections.emptyList();
}
FileStorePathFactory fileStorePathFactory = dataTable.store().pathFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,28 @@ public void testReadManifestsFromSpecifiedTagName() throws Exception {
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testReadManifestsFromSpecifiedTimestampMillis() throws Exception {
write(table, GenericRow.of(3, 1, 1), GenericRow.of(3, 2, 1));
List<InternalRow> expectedRow = getExpectedResult(3L);
manifestsTable =
(ManifestsTable)
manifestsTable.copy(
Collections.singletonMap(
CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
String.valueOf(System.currentTimeMillis())));
List<InternalRow> result = read(manifestsTable);
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testReadManifestsFromNotExistSnapshot() throws Exception {
manifestsTable =
(ManifestsTable)
manifestsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "3"));
assertThrows(
"Specified scan.snapshot-id 3 is not exist, you can set it in range from 1 to 2",
"Specified parameter scan.snapshot-id = 3 is not exist, you can set it in range from 1 to 2",
SnapshotNotExistException.class,
() -> read(manifestsTable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,22 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")

withSQLConf("spark.paimon.scan.timestamp-millis" -> System.currentTimeMillis.toString) {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(2, 0, 2, "{ }"))
}

spark.sql(s"INSERT INTO T VALUES ('3', 'b', 2, 1)")
spark.sql(s"INSERT INTO T VALUES ('4', 'bbb', 3, 2)")

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")

withSQLConf("spark.paimon.scan.timestamp-millis" -> System.currentTimeMillis.toString) {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(5, 0, 4, "{ }"))
}
// create tag
checkAnswer(
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag5', snapshot => 5)"),
Expand Down Expand Up @@ -131,9 +142,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
}

withSQLConf("spark.paimon.scan.snapshot-id" -> "100") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(5, 0, 4, "{ }"))
Assertions.assertEquals(0, sql("select * from `T$statistics`").count())
}

}
Expand Down
Loading