Skip to content

Commit

Permalink
[core] Manifest and Statistic support query by timestamp (#4304)
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree authored Oct 15, 2024
1 parent 4045158 commit 5e5b2fd
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 23 deletions.
11 changes: 11 additions & 0 deletions docs/content/maintenance/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ SELECT * FROM my_table$manifests /*+ OPTIONS('scan.tag-name'='tag1') */;
+--------------------------------+-------------+------------------+-------------------+---------------+
1 rows in set
*/

- You can also query the manifest with specified timestamp in unix milliseconds
SELECT * FROM my_table$manifests /*+ OPTIONS('scan.timestamp-millis'='1678883047356') */;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
1 rows in set
*/
```

### Aggregation fields Table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,19 @@ public Identifier identifier() {

@Override
public Optional<Statistics> statistics() {
Snapshot latestSnapshot;
Snapshot latestSnapshot = null;
Long snapshotId = coreOptions().scanSnapshotId();
String tagName = coreOptions().scanTagName();

if (snapshotId == null) {
if (!StringUtils.isEmpty(tagName) && tagManager().tagExists(tagName)) {
return store().newStatsFileHandler()
.readStats(tagManager().tag(tagName).trimToSnapshot());
} else {
snapshotId = snapshotManager().latestSnapshotId();
}
}
Long timestampMills = coreOptions().scanTimestampMills();

if (snapshotId != null && snapshotManager().snapshotExists(snapshotId)) {
latestSnapshot = snapshotManager().snapshot(snapshotId);
} else {
} else if (!StringUtils.isEmpty(tagName) && tagManager().tagExists(tagName)) {
return store().newStatsFileHandler()
.readStats(tagManager().tag(tagName).trimToSnapshot());
} else if (timestampMills != null) {
latestSnapshot = snapshotManager().earlierOrEqualTimeMills(timestampMills);
} else if (snapshotId == null && StringUtils.isEmpty(tagName)) {
latestSnapshot = snapshotManager().latestSnapshot();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,24 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

/** A {@link Table} for showing committing snapshots of table. */
public class ManifestsTable implements ReadonlyTable {

private static final Logger LOG = LoggerFactory.getLogger(ManifestsTable.class);

private static final long serialVersionUID = 1L;

public static final String MANIFESTS = "manifests";
Expand Down Expand Up @@ -200,27 +208,39 @@ private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) {
SnapshotManager snapshotManager = dataTable.snapshotManager();
Long snapshotId = coreOptions.scanSnapshotId();
String tagName = coreOptions.scanTagName();
Snapshot snapshot = null;
Long timestampMills = coreOptions.scanTimestampMills();

Snapshot snapshot;
if (snapshotId != null) {
// reminder user with snapshot id range
if (!snapshotManager.snapshotExists(snapshotId)) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
throw new SnapshotNotExistException(
String.format(
"Specified scan.snapshot-id %s is not exist, you can set it in range from %s to %s",
snapshotId, earliestSnapshotId, latestSnapshotId));
"Specified parameter %s = %s is not exist, you can set it in range from %s to %s.",
SCAN_SNAPSHOT_ID.key(),
snapshotId,
earliestSnapshotId,
latestSnapshotId));
}
snapshot = snapshotManager.snapshot(snapshotId);
} else {
if (!StringUtils.isEmpty(tagName) && dataTable.tagManager().tagExists(tagName)) {
snapshot = dataTable.tagManager().tag(tagName).trimToSnapshot();
} else {
snapshot = snapshotManager.latestSnapshot();
} else if (!StringUtils.isEmpty(tagName)) {
if (!dataTable.tagManager().tagExists(tagName)) {
throw new RuntimeException(
String.format(
"Specified parameter %s = %s is not exist.",
SCAN_TAG_NAME.key(), tagName));
}
snapshot = dataTable.tagManager().tag(tagName).trimToSnapshot();
} else if (timestampMills != null) {
snapshot = snapshotManager.earlierOrEqualTimeMills(timestampMills);
} else {
snapshot = snapshotManager.latestSnapshot();
}

if (snapshot == null) {
LOG.warn("Check if your snapshot is empty.");
return Collections.emptyList();
}
FileStorePathFactory fileStorePathFactory = dataTable.store().pathFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,28 @@ public void testReadManifestsFromSpecifiedTagName() throws Exception {
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testReadManifestsFromSpecifiedTimestampMillis() throws Exception {
write(table, GenericRow.of(3, 1, 1), GenericRow.of(3, 2, 1));
List<InternalRow> expectedRow = getExpectedResult(3L);
manifestsTable =
(ManifestsTable)
manifestsTable.copy(
Collections.singletonMap(
CoreOptions.SCAN_TIMESTAMP_MILLIS.key(),
String.valueOf(System.currentTimeMillis())));
List<InternalRow> result = read(manifestsTable);
assertThat(result).containsExactlyElementsOf(expectedRow);
}

@Test
public void testReadManifestsFromNotExistSnapshot() throws Exception {
manifestsTable =
(ManifestsTable)
manifestsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), "3"));
assertThrows(
"Specified scan.snapshot-id 3 is not exist, you can set it in range from 1 to 2",
"Specified parameter scan.snapshot-id = 3 is not exist, you can set it in range from 1 to 2",
SnapshotNotExistException.class,
() -> read(manifestsTable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,22 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")

withSQLConf("spark.paimon.scan.timestamp-millis" -> System.currentTimeMillis.toString) {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(2, 0, 2, "{ }"))
}

spark.sql(s"INSERT INTO T VALUES ('3', 'b', 2, 1)")
spark.sql(s"INSERT INTO T VALUES ('4', 'bbb', 3, 2)")

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")

withSQLConf("spark.paimon.scan.timestamp-millis" -> System.currentTimeMillis.toString) {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(5, 0, 4, "{ }"))
}
// create tag
checkAnswer(
spark.sql("CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag5', snapshot => 5)"),
Expand Down Expand Up @@ -131,9 +142,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
}

withSQLConf("spark.paimon.scan.snapshot-id" -> "100") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(5, 0, 4, "{ }"))
Assertions.assertEquals(0, sql("select * from `T$statistics`").count())
}

}
Expand Down

0 comments on commit 5e5b2fd

Please sign in to comment.