diff --git a/spec/Section 5 -- Validation.md b/spec/Section 5 -- Validation.md index 467590876..5f0640294 100644 --- a/spec/Section 5 -- Validation.md +++ b/spec/Section 5 -- Validation.md @@ -463,7 +463,7 @@ unambiguous. Therefore any two field selections which might both be encountered for the same object are only valid if they are equivalent. During execution, the simultaneous execution of fields with the same response -name is accomplished by {MergeSelectionSets()} and {CollectFields()}. +name is accomplished by {CollectSubfields()}. For simple hand-written GraphQL, this rule is obviously a clear developer error, however nested fragments can make this difficult to detect manually. diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 7ee850dce..2d478b5fe 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -131,12 +131,8 @@ ExecuteQuery(query, schema, variableValues, initialValue): - Let {queryType} be the root Query type in {schema}. - Assert: {queryType} is an Object type. - Let {selectionSet} be the top level Selection Set in {query}. -- Let {data} be the result of running {ExecuteSelectionSet(selectionSet, - queryType, initialValue, variableValues)} _normally_ (allowing - parallelization). -- Let {errors} be the list of all _field error_ raised while executing the - selection set. -- Return an unordered map containing {data} and {errors}. +- Return {ExecuteRootSelectionSet(variableValues, initialValue, queryType, + selectionSet)}. ### Mutation @@ -153,11 +149,8 @@ ExecuteMutation(mutation, schema, variableValues, initialValue): - Let {mutationType} be the root Mutation type in {schema}. - Assert: {mutationType} is an Object type. - Let {selectionSet} be the top level Selection Set in {mutation}. -- Let {data} be the result of running {ExecuteSelectionSet(selectionSet, - mutationType, initialValue, variableValues)} _serially_. -- Let {errors} be the list of all _field error_ raised while executing the - selection set. -- Return an unordered map containing {data} and {errors}. +- Return {ExecuteRootSelectionSet(variableValues, initialValue, mutationType, + selectionSet, true)}. ### Subscription @@ -255,8 +248,10 @@ CreateSourceEventStream(subscription, schema, variableValues, initialValue): - Let {subscriptionType} be the root Subscription type in {schema}. - Assert: {subscriptionType} is an Object type. - Let {selectionSet} be the top level Selection Set in {subscription}. +- Let {path} be an empty list. +- Let {eventSourceDeliveryGroup} be a new delivery group with path {path}. - Let {groupedFieldSet} be the result of {CollectFields(subscriptionType, - selectionSet, variableValues)}. + selectionSet, variableValues, path, eventSourceDeliveryGroup)}. - If {groupedFieldSet} does not have exactly one entry, raise a _request error_. - Let {fields} be the value of the first entry in {groupedFieldSet}. - Let {fieldName} be the name of the first entry in {fields}. Note: This value @@ -292,7 +287,11 @@ MapSourceToResponseEvent(sourceStream, subscription, schema, variableValues): - For each {event} on {sourceStream}: - Let {response} be the result of running {ExecuteSubscriptionEvent(subscription, schema, variableValues, event)}. - - Yield an event containing {response}. + - If {response} is an event stream: + - For each {childEvent} on {response}: + - Yield {childEvent}. + - Otherwise: + - Yield an event containing {response}. - When {responseStream} completes: complete this event stream. ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): @@ -300,12 +299,8 @@ ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): - Let {subscriptionType} be the root Subscription type in {schema}. - Assert: {subscriptionType} is an Object type. - Let {selectionSet} be the top level Selection Set in {subscription}. -- Let {data} be the result of running {ExecuteSelectionSet(selectionSet, - subscriptionType, initialValue, variableValues)} _normally_ (allowing - parallelization). -- Let {errors} be the list of all _field error_ raised while executing the - selection set. -- Return an unordered map containing {data} and {errors}. +- Return {ExecuteRootSelectionSet(variableValues, initialValue, + subscriptionType, selectionSet)}. Note: The {ExecuteSubscriptionEvent()} algorithm is intentionally similar to {ExecuteQuery()} since this is how each event result is produced. @@ -321,39 +316,90 @@ Unsubscribe(responseStream): - Cancel {responseStream} -## Executing Selection Sets +## Executing the Root Selection Set -To execute a selection set, the object value being evaluated and the object type -need to be known, as well as whether it must be executed serially, or may be -executed in parallel. +To execute the root selection set, the object value being evaluated and the +object type need to be known, as well as whether it must be executed serially, +or may be executed in parallel. -First, the selection set is turned into a grouped field set; then, each -represented field in the grouped field set produces an entry into a response -map. +Executing the root selection set works similarly for queries (parallel), +mutations (serial), and subscriptions (where it is executed for each event in +the underlying Source Stream). -ExecuteSelectionSet(selectionSet, objectType, objectValue, variableValues): +First, the selection set is turned into a grouped field set; then, we execute +this grouped field set and return the resulting {data} and {errors}. +ExecuteRootSelectionSet(variableValues, initialValue, objectType, selectionSet, +serial): + +- If {serial} is not provided, initialize it to {false}. +- Let {path} be an empty list. +- Let {rootDeliveryGroup} be a new delivery group with path {path}. - Let {groupedFieldSet} be the result of {CollectFields(objectType, - selectionSet, variableValues)}. + selectionSet, variableValues, path, rootDeliveryGroup)}. +- Let {currentDeliveryGroups} be a set containing {rootDeliveryGroup}. +- Let {data} and {incrementalDetailsByPath} be the result of running + {ExecuteGroupedFieldSet(groupedFieldSet, objectType, initialValue, + variableValues, path, currentDeliveryGroups)} _serially_ if {serial} is true, + _normally_ (allowing parallelization) otherwise. +- Let {errors} be the list of all _field error_ raised while executing the + selection set. +- If {incrementalDetailsByPath} is empty: + - Return an unordered map containing {data} and {errors}. +- Otherwise: + - Return {IncrementalEventStream(data, errors, incrementalDetailsByPath, + variableValues)}. + +## Executing a Grouped Field Set + +To execute a grouped field set, the object value being evaluated and the object +type need to be known, as well as whether it must be executed serially, or may +be executed in parallel. + +Each represented field in the grouped field set produces an entry into a +response map. + +ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, variableValues, +path, currentDeliveryGroups): + +- Let {incrementalDetailsByPath} be an empty map. - Initialize {resultMap} to an empty ordered map. -- For each {groupedFieldSet} as {responseKey} and {fields}: - - Let {fieldName} be the name of the first entry in {fields}. Note: This value - is unaffected if an alias is used. - - Let {fieldType} be the return type defined for the field {fieldName} of - {objectType}. - - If {fieldType} is defined: - - Let {responseValue} be {ExecuteField(objectType, objectValue, fieldType, - fields, variableValues)}. - - Set {responseValue} as the value for {responseKey} in {resultMap}. -- Return {resultMap}. +- For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - Let {deliveryGroups} be the set of delivery groups in {fieldDigests}. + - If {deliveryGroups} contains every delivery group in + {currentDeliveryGroups}: + - Let {fieldName} be the name of the field of the first entry in + {fieldDigests}. Note: This value is unaffected if an alias is used. + - Let {fieldType} be the return type defined for the field {fieldName} of + {objectType}. + - If {fieldType} is defined: + - Let {childPath} be the result of appending {responseKey} to {path}. + - Let {responseValue} and {childIncrementalDetailsByPath} be + {ExecuteField(objectType, objectValue, fieldType, fieldDigests, + variableValues, childPath, currentDeliveryGroups)}. + - Set {responseValue} as the value for {responseKey} in {resultMap}. + - For each {childIncrementalDetailsByPath} as {childPath} and + {childIncrementalDetails}: + - Set {childIncrementalDetails} as the value for {childPath} in + {incrementalDetailsByPath}. + - Otherwise: + - Let {details} be the details object in {incrementalDetailsByPath} for + {path}; if no such details object exists, create it as a details object + containing {groupedFieldSet} as an empty map, {objectType} and + {objectValue}. + - Let {incrementalFieldSet} be the value for property {groupedFieldSet} in + {details}. + - Set {fieldDigests} as the value for {responseKey} in + {incrementalFieldSet}. +- Return {resultMap} and {incrementalDetailsByPath}. Note: {resultMap} is ordered by which fields appear first in the operation. This is explained in greater detail in the Field Collection section below. **Errors and Non-Null Fields** -If during {ExecuteSelectionSet()} a field with a non-null {fieldType} raises a -_field error_ then that error must propagate to this entire selection set, +If during {ExecuteGroupedFieldSet()} a field with a non-null {fieldType} raises +a _field error_ then that error must propagate to this entire selection set, either resolving to {null} if allowed or further propagated to a parent field. If this occurs, any sibling fields which have not yet executed or have not yet @@ -362,6 +408,8 @@ yielded a value may be cancelled to avoid unnecessary work. Note: See [Handling Field Errors](#sec-Handling-Field-Errors) for more about this behavior. +Further, if this occurs, the {incrementalDetailsByPath} must be made empty. + ### Normal and Serial Execution Normally the executor can execute the entries in a grouped field set in whatever @@ -489,7 +537,8 @@ The depth-first-search order of the field groups produced by {CollectFields()} is maintained through execution, ensuring that fields appear in the executed response in a stable and predictable order. -CollectFields(objectType, selectionSet, variableValues, visitedFragments): +CollectFields(objectType, selectionSet, variableValues, path, deliveryGroup, +visitedFragments): - If {visitedFragments} is not provided, initialize it to the empty set. - Initialize {groupedFields} to an empty ordered map of lists. @@ -509,8 +558,19 @@ CollectFields(objectType, selectionSet, variableValues, visitedFragments): defined, otherwise the field name). - Let {groupForResponseKey} be the list in {groupedFields} for {responseKey}; if no such list exists, create it as an empty list. - - Append {selection} to the {groupForResponseKey}. + - Let {fieldDigest} be a new field digest containing {selection} and + {deliveryGroup}. + - Append {fieldDigest} to the {groupForResponseKey}. - If {selection} is a {FragmentSpread}: + - Let {fragmentDeliveryGroup} be {deliveryGroup}. + - If {selection} provides the directive `@defer`, let {deferDirective} be + that directive. + - If {deferDirective}'s {if} argument is not {false} and is not a variable + in {variableValues} with the value {false}: + - Let {label} be the value of {deferDirective}'s {label} argument (or + the value of the associated variable) if any. + - Let {fragmentDeliveryGroup} be a new delivery group with path {path}, + parent {deliveryGroup} and label {label}. - Let {fragmentSpreadName} be the name of {selection}. - If {fragmentSpreadName} is in {visitedFragments}, continue with the next {selection} in {selectionSet}. @@ -524,8 +584,8 @@ CollectFields(objectType, selectionSet, variableValues, visitedFragments): with the next {selection} in {selectionSet}. - Let {fragmentSelectionSet} be the top-level selection set of {fragment}. - Let {fragmentGroupedFieldSet} be the result of calling - {CollectFields(objectType, fragmentSelectionSet, variableValues, - visitedFragments)}. + {CollectFields(objectType, fragmentSelectionSet, variableValues, path, + fragmentDeliveryGroup, visitedFragments)}. - For each {fragmentGroup} in {fragmentGroupedFieldSet}: - Let {responseKey} be the response key shared by all fields in {fragmentGroup}. @@ -533,14 +593,23 @@ CollectFields(objectType, selectionSet, variableValues, visitedFragments): {responseKey}; if no such list exists, create it as an empty list. - Append all items in {fragmentGroup} to {groupForResponseKey}. - If {selection} is an {InlineFragment}: + - Let {fragmentDeliveryGroup} be {deliveryGroup}. + - If {selection} provides the directive `@defer`, let {deferDirective} be + that directive. + - If {deferDirective}'s {if} argument is not {false} and is not a variable + in {variableValues} with the value {false}: + - Let {label} be the value of {deferDirective}'s {label} argument (or + the value of the associated variable) if any. + - Let {fragmentDeliveryGroup} be a new delivery group with path {path}, + parent {deliveryGroup} and label {label}. - Let {fragmentType} be the type condition on {selection}. - If {fragmentType} is not {null} and {DoesFragmentTypeApply(objectType, fragmentType)} is false, continue with the next {selection} in {selectionSet}. - Let {fragmentSelectionSet} be the top-level selection set of {selection}. - Let {fragmentGroupedFieldSet} be the result of calling - {CollectFields(objectType, fragmentSelectionSet, variableValues, - visitedFragments)}. + {CollectFields(objectType, fragmentSelectionSet, variableValues, path, + fragmentDeliveryGroup, visitedFragments)}. - For each {fragmentGroup} in {fragmentGroupedFieldSet}: - Let {responseKey} be the response key shared by all fields in {fragmentGroup}. @@ -572,16 +641,17 @@ coerces any provided argument values, then resolves a value for the field, and finally completes that value either by recursively executing another selection set or coercing a scalar value. -ExecuteField(objectType, objectValue, fieldType, fields, variableValues): +ExecuteField(objectType, objectValue, fieldType, fieldDigests, variableValues, +path, currentDeliveryGroups): -- Let {field} be the first entry in {fields}. +- Let {field} be the first entry in {fieldDigests}. - Let {fieldName} be the field name of {field}. - Let {argumentValues} be the result of {CoerceArgumentValues(objectType, field, variableValues)} - Let {resolvedValue} be {ResolveFieldValue(objectType, objectValue, fieldName, argumentValues)}. -- Return the result of {CompleteValue(fieldType, fields, resolvedValue, - variableValues)}. +- Return the result of {CompleteValue(fieldType, fieldDigests, resolvedValue, + variableValues, path, currentDeliveryGroups)}. ### Coercing Field Arguments @@ -668,33 +738,49 @@ After resolving the value for a field, it is completed by ensuring it adheres to the expected return type. If the return type is another Object type, then the field execution process continues recursively. -CompleteValue(fieldType, fields, result, variableValues): +CompleteValue(fieldType, fieldDigests, result, variableValues, path, +currentDeliveryGroups): - If the {fieldType} is a Non-Null type: - Let {innerType} be the inner type of {fieldType}. - - Let {completedResult} be the result of calling {CompleteValue(innerType, - fields, result, variableValues)}. + - Let {completedResult} and {incrementalDetailsByPath} be the result of + calling {CompleteValue(innerType, fieldDigests, result, variableValues, + path, currentDeliveryGroups)}. - If {completedResult} is {null}, raise a _field error_. - - Return {completedResult}. + - Return {completedResult} and {incrementalDetailsByPath}. - If {result} is {null} (or another internal value similar to {null} such as - {undefined}), return {null}. + {undefined}), return {null} and an empty map. - If {fieldType} is a List type: - If {result} is not a collection of values, raise a _field error_. - Let {innerType} be the inner type of {fieldType}. - - Return a list where each list item is the result of calling - {CompleteValue(innerType, fields, resultItem, variableValues)}, where - {resultItem} is each item in {result}. + - Let {incrementalDetailsByPath} be an empty map. + - Let {list} be an empty list. + - For each list item {resultItem} at 0-indexed index {resultIndex} in + {result}: + - Let {subpath} be the result of appending {resultIndex} to {path}. + - Let {listValue} and {itemIncrementalDetailsByPath} be the result of + calling {CompleteValue(innerType, fieldDigests, resultItem, + variableValues, subpath, currentDeliveryGroups)}. + - Append {listValue} to {list}. + - If {listValue} is not {null}: + - For each {itemIncrementalDetailsByPath} as {childPath} and + {childIncrementalDetails}: + - Set {childIncrementalDetails} as the value for {childPath} in + {incrementalDetailsByPath}. + - Return {list} and {incrementalDetailsByPath}. - If {fieldType} is a Scalar or Enum type: - - Return the result of {CoerceResult(fieldType, result)}. + - Let {completedResult} be the result of {CoerceResult(fieldType, result)}. + - Return {completedResult} and an empty map. - If {fieldType} is an Object, Interface, or Union type: - If {fieldType} is an Object type. - Let {objectType} be {fieldType}. - Otherwise if {fieldType} is an Interface or Union type. - Let {objectType} be {ResolveAbstractType(fieldType, result)}. - - Let {subSelectionSet} be the result of calling {MergeSelectionSets(fields)}. - - Return the result of evaluating {ExecuteSelectionSet(subSelectionSet, - objectType, result, variableValues)} _normally_ (allowing for - parallelization). + - Let {groupedFieldSet} be the result of calling {CollectSubfields(objectType, + fieldDigests, variableValues, path)}. + - Return the result of evaluating {ExecuteGroupedFieldSet(groupedFieldSet, + objectType, result, variableValues, path, currentDeliveryGroups)} _normally_ + (allowing for parallelization). **Coercing Results** @@ -739,9 +825,9 @@ ResolveAbstractType(abstractType, objectValue): **Merging Selection Sets** -When more than one field of the same name is executed in parallel, their -selection sets are merged together when completing the value in order to -continue execution of the sub-selection sets. +When more than one field of the same name is executed in parallel, during value +completion their selection sets are collected together to produce a single +grouped field set in order to continue execution of the sub-selection sets. An example operation illustrating parallel fields with the same name with sub-selections. @@ -760,14 +846,21 @@ sub-selections. After resolving the value for `me`, the selection sets are merged together so `firstName` and `lastName` can be resolved for one value. -MergeSelectionSets(fields): +CollectSubfields(objectType, fieldDigests, variableValues, path): -- Let {selectionSet} be an empty list. -- For each {field} in {fields}: +- Let {groupedFieldSet} be an empty map. +- For each {fieldDigest} in {fieldDigests}: + - Let {field} be the field of {fieldDigest}. + - Let {deliveryGroup} be the delivery group of {fieldDigest}. - Let {fieldSelectionSet} be the selection set of {field}. - If {fieldSelectionSet} is null or empty, continue to the next field. - - Append all selections in {fieldSelectionSet} to {selectionSet}. -- Return {selectionSet}. + - Let {subGroupedFieldSet} be the result of {CollectFields(objectType, + fieldSelectionSet, variableValues, path, deliveryGroup)}. + - For each {subGroupedFieldSet} as {responseKey} and {subfieldDigests}: + - Let {groupForResponseKey} be the list in {groupedFieldSet} for + {responseKey}; if no such list exists, create it as an empty list. + - Append all field digests in {subfieldDigests} to {groupForResponseKey}. +- Return {groupedFieldSet}. ### Handling Field Errors @@ -805,3 +898,212 @@ upwards. If all fields from the root of the request to the source of the field error return `Non-Null` types, then the {"data"} entry in the response should be {null}. + +## Incremental Delivery + +### Delivery Group + +::: A _delivery group_ represents either the root selection set or a particular +`@defer` directive at a particular {path} in the response. + +::: The _root delivery group_ is the _delivery group_ that represents the root +selection set in the operation. + +Each _delivery group_ belongs to a {parent} delivery group, except for the _root +delivery group_. During field collection, the delivery group of each field is +tracked, and this is used when determining when to execute and deliver deferred +fields and streamed list items. + +In an operation that does not utilise the `@defer` directive, there will only be +a single delivery group, the _root delivery group_, and all fields will belong +to it. + +### Incremental Event Stream + +IncrementalEventStream(data, errors, initialIncrementalDetailsByPath, +variableValues): + +- Let {incrementalDetailsByPath} be {initialIncrementalDetailsByPath}. +- Return a new event stream {responseStream} which yields events as follows: + - Let {pendingDeliveryGroups} be + {CollectDeliveryGroups(incrementalDetailsByPath)}. + - Assert: {pendingDeliveryGroups} is not empty. + - Let {pending} be {MakePending(pendingDeliveryGroups)}. + - Yield an event containing {data}, {errors}, {pending}, and the value {true} + for {hasNext}. + - Let {streams} and {runnableDeliveryGroupsSets} be + {IncrementalStreams(incrementalDetailsByPath)}. + - For each {event} on each stream in {streams}: + - Yield {event}. + - When every stream in {streams} has completed: + - Yield a map with {hasNext} set to {false}. + +In order to increase efficiency, any two or more consecutive payloads in the +IncrementalEventStream stream may optionally be combined by concatenating the +lists therein (maintaining order) and setting {hasNext} to {false} if any of the +payloads has {hasNext} set to {false}, otherwise {true}. + +CollectDeliveryGroups(incrementalDetailsByPath, excludingDeliveryGroups): + +- If {excludingDeliveryGroups} is not provided, initialize it to the empty set. +- Let {allDeliveryGroup} be an empty set. +- For each {incrementalDetailsByPath} as {path} and {details}: + - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties in + {details}. + - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - For each {fieldDigests} as {fieldDigest}. + - Let {deliveryGroup} be the delivery group in {fieldDigest}. + - If {deliveryGroup} is not in {excludingDeliveryGroups}: + - Add {deliveryGroup} to {allDeliveryGroups}. +- Return {allDeliveryGroups}. + +MakePending(deliveryGroups): + +- Let {pending} be an empty list. +- For each {deliveryGroups} as {deliveryGroup}: + - Let {id}, {path} and {label} be those properties in {deliveryGroup}. + - Let {pendingPayload} be an unordered map containing {id}, {path}, {label}. + - Append {pendingPayload} to {pending}. +- Return {pending}. + +IncrementalStreams(incrementalDetailsByPath): + +- Let {streams} be an empty list. +- Let {runnableDeliveryGroupsSets} be the result of + {PartitionDeliveryGroupsSets(incrementalDetailsByPath)}. +- For each {runnableDeliveryGroupsSets} as {runnableDeliveryGroupsSet}: + - Let {stream} be {IncrementalStream(incrementalDetailsByPath, + runnableDeliveryGroupsSet)}. + - Append {stream} to {streams}. +- Return {streams} and {runnableDeliveryGroupsSets}. + +PartitionDeliveryGroupsSets(incrementalDetailsByPath): + +- Let {graph} be an empty graph, where the nodes are delivery groups. +- For each {incrementalDetailsByPath} as {path} and {details}: + - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties in + {details}. + - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - Let {deliveryGroupsSet} be the set containing the delivery group from each + digest in {fieldDigests}. + - Add each {deliveryGroup} in {deliveryGroupsSet} as a node to the {graph} + (if it's not already present). + - For each pair of delivery groups {deliveryGroup1} and {deliveryGroup2} in + {deliveryGroupsSet}: + - Ensure {deliveryGroup1} and {deliveryGroup2} are connected in {graph}. +- Let {partitionedDeliveryGroupsSets} be an empty unordered list. +- For each connected {subgraph} in {graph}: + - Let {deliveryGroupsSet} be the delivery groups in {subgraph}. + - Add {deliveryGroupsSet} to {partitionedDeliveryGroupsSets}. +- Assert: every {deliveryGroup} in {graph} must appear in exactly one set in + {partitionedDeliveryGroupsSets}. +- Return {partitionedDeliveryGroupsSets}. + +IncrementalStream(incrementalDetailsByPath, deliveryGroupsSet): + +- Let {remainingIncrementalDetailsByPath}, {runnable} be + {SplitRunnable(incrementalDetailsByPath, deliveryGroupsSet)}. +- Let {hasNext} be {true}. +- Return a new event stream {incrementalStream} which yields events as follows: + - In the event of one or more errors, {errors}: + - Let {completed} be an empty list. + - For each {deliveryGroupsSet} as {deliveryGroup}: + - {id} be the id of {deliveryGroup}. + - Append an unordered map containing {id} and {errors} to {completed}. + - Yield an unordered map containing {hasNext} and {completed}. + - Complete {incrementalStream}. + - Return. + - Let {incremental} be an empty list. + - For each {runnable} as {path} and {incrementalDetails} (in parallel): + - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties + in {incrementalDetails}. + - Assert: {objectValue} exists and is not {null}. + - Let {data} and {childIncrementalDetailsByPath} be the result of running + {ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, + variableValues, path, deliveryGroups)} _normally_ (allowing + parallelization). + - Let {errors} be the list of all _field error_ raised while executing the + grouped field set. + - Let {remainingIncrementalDetailsByPath} be + {MergeIncrementalDetailsByPath(remainingIncrementalDetailsByPath, + childIncrementalDetailsByPath)}. + - Append an unordered map containing {hasNext}, {id}, {data} and {errors} to + {incremental}. + - Let {pendingDeliveryGroups} be + {CollectDeliveryGroups(remainingIncrementalDetailsByPath, + deliveryGroupsSet)}. + - Let {pending} be {MakePending(pendingDeliveryGroups)}. + - Let {sentInitial} be {false}. + - Let {streams} and {runnableDeliveryGroupsSets} be + {IncrementalStreams(remainingIncrementalDetailsByPath)}. + - For each {deliveryGroupsSet} as {deliveryGroup}: + - If {deliveryGroup} is not contained in any delivery group set in + {runnableDeliveryGroupsSets}: + - If {sentInitial} is not {true}: + - Let {sentInitial} be {true}. + - Yield an unordered map containing {hasNext}, {incremental} and + {pending}. + - Let {id} be the id of {deliveryGroup}. + - Let {completedItem} be an unordered map containing {id}. + - Let {completed} be a list containing {completedItem}. + - Yield an unordered map containing {hasNext} and {completed}. + - For each {event} on each stream in {streams}: + - If {sentInitial} is not {true}: + - Let {sentInitial} be {true}. + - Yield an unordered map containing {hasNext}, {incremental} and + {pending}. + - Yield {event}. + - When every stream in {streams} has completed: + - Complete {incrementalStreams}. + +SplitRunnable(incrementalDetailsByPath, runnableDeliveryGroupsSet): + +- Let {remainingIncrementalDetailsByPath} be an empty map. +- Let {runnable} be an empty map. +- For each {incrementalDetailsByPath} as {path} and {incrementalDetails}: + - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties in + {incrementalDetails}. + - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - Let {deliveryGroups} be the set containing the delivery group from each + digest in {fieldDigests}. + - If {deliveryGroups} contains the same number and set of delivery groups as + {runnableDeliveryGroupsSet} (order unimportant): + - Let {targetIncrementalDetails} be the incremental details object in + {runnable} for {path}; if no such object exists, create it with + {objectType}, {objectValue}, and an empty {fieldDigests} map. + - Otherwise, if {deliveryGroups} only contains delivery groups that are also + in {runnableDeliveryGroupsSet}: + - Let {targetIncrementalDetails} be the incremental details object in + {remainingIncrementalDetailsByPath} for {path}; if no such object + exists, create it with {objectType}, {objectValue}, and an empty + {fieldDigests} map. + - Otherwise: + - Continue with the next {responseKey} and {fieldDigests} in + {groupedFieldSet}. + - Let {targetGroupedFieldSet} be the {groupedFieldSet} property of + {targetIncrementalDetails}. + - Set {fieldDigests} as the value for {responseKey} in + {targetGroupedFieldSet}. +- Return {remainingIncrementalDetailsByPath} and {runnable}. + +MergeIncrementalDetailsByPath(incrementalDetailsByPath1, +incrementalDetailsByPath2): + +- Let {incrementalDetailsByPath} be a copy of {incrementalDetailsByPath1}. +- For each {incrementalDetailsByPath2} as {path} and {newIncrementalDetails}: + - Let {originalIncrementalDetails} be the value for {path} in + {incrementalDetailsByPath}, or {null} if no such entry exists. + - If {originalIncrementalDetails} is null: + - Set {newIncrementalDetails} as the value for {path} in + {incrementalDetailsByPath}. + - Otherwise: + - Let {originalGroupedFieldSet} be the grouped field set in + {originalIncrementalDetails}. + - Let {newGroupedFieldSet} be the grouped field set in + {newIncrementalDetails}. + - For each {newGroupedFieldSet} as {responseKey} and {newFieldDigests}: + - Let {fieldDigests} be the value for {responseKey} in + {originalGroupedFieldSet}; or if no such entry is found, create it as + the empty set. + - Add every entry in {newFieldDigests} to {fieldDigests}. +- Return {incrementalDetailsByPath}.