Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate Periodic Realtime Segment Metadata Queries: Task Now Publish Schema for Seamless Coordinator Updates #15475

Merged
merged 40 commits into from
Jan 10, 2024

Conversation

findingrish
Copy link
Contributor

@findingrish findingrish commented Dec 4, 2023

Description

Issue: #14989

The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (#14985). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information. This task encompasses addressing both realtime and finalized segments.

This modification specifically addresses the issue with realtime segments. Tasks will now routinely communicate the schema for realtime segments during the segment announcement process. The Coordinator will identify the schema alongside the segment announcement and subsequently update the schema for realtime segments in the metadata cache.

Design

Task

  • New method, Sink.getSignature to return the RowSignature of the Sink by coming the signature of each FireHydrant.
  • Periodically, the StreamAppenderator.SinkSchemaAnnouncer will compute sink schema changes and announce them to the DataSegmentAnnouncer.
  • New APIs have been introduced in DataSegmentAnnouncer to receive sink schema information and manage schema cleanup when a task is closed.
  • A new Pojo named SegmentSchemas has been added to facilitate the passing of schema information for multiple segments.
  • A new implementation of DataSegmentChangeRequest has been introduced, named SegmentSchemasChangeRequest.

Coordinator

  • Modifications have been made to the HttpServerInventoryView to handle schema information.
  • Schema Update Flow: HttpServerInventoryView -> CoordinatorServerView -> CoordinatorSegmentMetadataCache.
  • The CoordinatorSegmentMetadata cache has been updated to incorporate schema changes. Changes have also been made to the refresh logic to eliminate the need for executing segment metadata queries for realtime segments.

Testing

  • Added UTs.
  • Tested it locally with wikipedia dataset and kafka based ingestion.
  • Tested in a druid cluster, verified the datasource signature and numRows for realtime segments.

Potential side effects

None

Limitations

Currently, this feature doesn't work with zookeeper based segment announcement.

Upgrade considerations

The general upgrade order should be followed. The new code is behind a feature flag, so it is compatible with existing setups. Even if centralized datasource schema building (#14985) is enabled, realtime segments will be refreshed using segment metadata query to Indexer/Task.

This experimental feature aims to eliminate the necessity for periodically executing the SegmentMetadataQuery to the Indexer/Task for retrieving the schema of realtime segments. Presently, it is accessible through two feature flags and should only be enabled for Proof of Concept (PoC) or testing purposes. To activate it, configure the following settings in the common configurations: druid.centralizedDatasourceSchema.enabled and druid.centralizedDatasourceSchema.announceRealtimeSegmentSchema. It's important to note that the feature flag is temporary druid.centralizedDatasourceSchema.announceRealtimeSegmentSchema and will be removed in a subsequent update.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@findingrish
Copy link
Contributor Author

findingrish commented Dec 11, 2023

Please note that the test failures are because of an uncovered noop method in BrokerSegmentMetadataCache.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to go through UT's.


if ((!Objects.equals(numRows, previousNumRows)) || (updatedColumns.size() > 0) || (newColumns.size() > 0)) {
publish = true;
delta = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
delta = true;

new SegmentSchema(
segmentId.getDataSource(),
segmentId.toString(),
delta,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
delta,
true,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delta can be either true or false. Hence cannot hardcode.

private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
// columns excluding current index, includes __time column
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is curIndex, the inmemory Fire hydrant ? If yes lets rename it like that / or document it

numRowsExcludingCurrIndex.addAndGet(segment.asQueryableIndex().getNumRows());
QueryableIndex index = segment.asQueryableIndex();
mergeIndexDimensions(new QueryableIndexStorageAdapter(index));
numRowsExcludingCurrIndex.addAndGet(index.getNumRows());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for restoring tasks rite ?

/**
* Merge the column from the index with the existing columns.
*/
private void mergeIndexDimensions(StorageAdapter storageAdapter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void mergeIndexDimensions(StorageAdapter storageAdapter)
private void overWriteIndexDimensions(StorageAdapter storageAdapter)

@@ -148,7 +159,9 @@ public Sink(
maxCount = hydrant.getCount();
ReferenceCountingSegment segment = hydrant.getIncrementedSegment();
try {
numRowsExcludingCurrIndex.addAndGet(segment.asQueryableIndex().getNumRows());
QueryableIndex index = segment.asQueryableIndex();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sink is also used for both realtime and batch. How do your changes affect batch ingestion ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, I am maintaining set of columns in each sink. I am assuming this wouldn't be much of an overhead in batch ingestion.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes LGTM. Nit comments only.

// Newest segments first, so they override older ones.
private static final Comparator<SegmentId> SEGMENT_ORDER = Comparator
protected static final Comparator<SegmentId> SEGMENT_ORDER = Comparator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the change in this class apart from field access change ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Visibility is changed for a couple of fields.

@@ -219,11 +220,20 @@ public void configure(Binder binder)
taskDirPath = taskAndStatusFile.get(0);
attemptId = taskAndStatusFile.get(1);

if (Boolean.parseBoolean(properties.getProperty("druid.centralizedDatasourceSchema.enabled"))
&& !properties.getOrDefault("druid.serverview.type", "http").equals("http")) {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be druidException for cluster administrator no ?

@@ -192,6 +194,12 @@ protected List<? extends Module> getModules()
modules.add(JettyHttpClientModule.global());

if (isSegmentMetadataCacheEnabled) {
if (!properties.getOrDefault(SERVERVIEW_TYPE_PROPERTY, "http").equals("http")) {
throw new RuntimeException(
"CentralizedDatasourceSchema feature is incompatible with Zookeeper based segment discovery. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not mention zookeeper but just the value of properties.getOrDefault(SERVERVIEW_TYPE_PROPERTY). If tomorrow we add new eay of announcing serviewViews that we need not change this piece of code.

@@ -153,7 +154,8 @@ public class CliCoordinator extends ServerRunnable
{
private static final Logger log = new Logger(CliCoordinator.class);
private static final String AS_OVERLORD_PROPERTY = "druid.coordinator.asOverlord.enabled";
private static final String CENTRALIZED_SCHEMA_MANAGEMENT_ENABLED = "druid.centralizedDatasourceSchema.enabled";
private static final String CENTRALIZED_DATASOURCE_SCHEMA_ENABLED = "druid.centralizedDatasourceSchema.enabled";
private static final String SERVERVIEW_TYPE_PROPERTY = "druid.serverview.type";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please push this config to ServerViewModule and then reference them from there.

@cryptoe cryptoe merged commit 71f5307 into apache:master Jan 10, 2024
83 checks passed
@cryptoe
Copy link
Contributor

cryptoe commented Jan 10, 2024

@findingrish Thank you for the PR.

@LakshSingla LakshSingla added this to the 29.0.0 milestone Jan 29, 2024
cryptoe pushed a commit that referenced this pull request Apr 24, 2024
…rce Schema Building (#15817)

Issue: #14989

The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (#14985). Thereafter, we addressed the problem of publishing schema for realtime segments (#15475). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information.

This is the final change which involves publishing segment schema for finalized segments from task and periodically polling them in the Coordinator.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants