-
Notifications
You must be signed in to change notification settings - Fork 969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Manifest and Statistic support query by timestamp #4304
Conversation
@@ -200,6 +200,8 @@ private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) { | |||
SnapshotManager snapshotManager = dataTable.snapshotManager(); | |||
Long snapshotId = coreOptions.scanSnapshotId(); | |||
String tagName = coreOptions.scanTagName(); | |||
Long timestampMills = coreOptions.scanTimestampMills(); | |||
|
|||
Snapshot snapshot = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable 'snapshot' initializer 'null' is redundant.
Change "Snapshot snapshot = null;" to "Snapshot snapshot;".
} else if (timestampMills != null) { | ||
snapshot = snapshotManager.earlierOrEqualTimeMills(timestampMills); | ||
} else { | ||
snapshot = snapshotManager.latestSnapshot(); | ||
} | ||
|
||
if (snapshot == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a warn log?
snapshot = snapshotManager.latestSnapshot(); | ||
} else if (!StringUtils.isEmpty(tagName)) { | ||
if (!dataTable.tagManager().tagExists(tagName)) { | ||
throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new RuntimeException(
String.format("Specified parameter %s = %s is not exist.", SCAN_TAG_NAME.key(), tagName));
@@ -212,12 +214,16 @@ private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) { | |||
snapshotId, earliestSnapshotId, latestSnapshotId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String.format(
"Specified parameter %s = %s is not exist, you can set it in range from %s to %s.",
SCAN_SNAPSHOT_ID.key(), snapshotId, earliestSnapshotId, latestSnapshotId));
@@ -135,6 +135,24 @@ public void testReadManifestsFromSpecifiedTagName() throws Exception { | |||
assertThat(result).containsExactlyElementsOf(expectedRow); | |||
} | |||
|
|||
@Test | |||
public void testReadManifestsFromSpecifiedTimestampMillis() throws Exception { | |||
Thread.sleep(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why sleep?
public void testReadManifestsFromSpecifiedTimestampMillis() throws Exception { | ||
Thread.sleep(10); | ||
write(table, GenericRow.of(3, 1, 1), GenericRow.of(3, 2, 1)); | ||
long time = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline "long time = System.currentTimeMillis();"
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS") | ||
val time1 = System.currentTimeMillis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline "val time1 = System.currentTimeMillis"
spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS") | ||
val time2 = System.currentTimeMillis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline "val time2 = System.currentTimeMillis"
@@ -118,6 +121,12 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { | |||
Row(2, 0, 2, "{ }")) | |||
} | |||
|
|||
withSQLConf("spark.paimon.scan.timestamp-millis" -> time1.toString) { | |||
checkAnswer( | |||
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can modify "colstat" to "colStat" in StatisticTable. And then modify other relevant codes.
@wwj6591812 thanks for your review,I have completed the modification. |
+1 |
@@ -72,7 +72,7 @@ public class StatisticTable implements ReadonlyTable { | |||
new DataField(1, "schema_id", new BigIntType(false)), | |||
new DataField(2, "mergedRecordCount", new BigIntType(true)), | |||
new DataField(3, "mergedRecordSize", new BigIntType(true)), | |||
new DataField(4, "colstat", SerializationUtils.newStringType(true)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please don't modify this, it is better to keep compability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
+1 |
Purpose
fllow up #4291
Linked issue: close #xxx
Tests
API and Format
Documentation