Skip to content

Commit

Permalink
feat: read state in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ppedziwiatr committed Dec 11, 2023
1 parent eb044fe commit 2766094
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 58 deletions.
30 changes: 24 additions & 6 deletions src/contract/Contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,22 @@ export interface Contract<State = unknown> {
readState(
sortKeyOrBlockHeight?: string | number,
caller?: string,
interactions?: GQLNodeInterface[]
interactions?: GQLNodeInterface[],
signal?: AbortSignal
): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

readStateFor(sortKey: string, interactions: GQLNodeInterface[]): Promise<SortKeyCacheResult<EvalStateResult<State>>>;
/**
* Reads state in batches - i.e. it first loads max. 5k interactions, evaluates them, then reads another 5k..and so on.
*
* Consider this as an experimental feature
*/
readStateBatch(pagesPerBatch: number, signal: AbortSignal): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

readStateFor(
sortKey: string,
interactions: GQLNodeInterface[],
signal?: AbortSignal
): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

/**
* Returns the "view" of the state, computed by the SWC -
Expand All @@ -138,7 +150,8 @@ export interface Contract<State = unknown> {
input: Input,
tags?: Tags,
transfer?: ArTransfer,
caller?: string
caller?: string,
signal?: AbortSignal
): Promise<InteractionResult<State, View>>;

/**
Expand All @@ -155,7 +168,8 @@ export interface Contract<State = unknown> {
*/
viewStateForTx<Input = unknown, View = unknown>(
input: Input,
transaction: GQLNodeInterface
transaction: GQLNodeInterface,
signal?: AbortSignal
): Promise<InteractionResult<State, View>>;

/**
Expand All @@ -177,7 +191,11 @@ export interface Contract<State = unknown> {
vrf?: boolean
): Promise<InteractionResult<State, unknown>>;

applyInput<Input>(input: Input, transaction: GQLNodeInterface): Promise<InteractionResult<State, unknown>>;
applyInput<Input>(
input: Input,
transaction: GQLNodeInterface,
signal?: AbortSignal
): Promise<InteractionResult<State, unknown>>;

/**
* Writes a new "interaction" transaction - i.e. such transaction that stores input for the contract.
Expand All @@ -189,7 +207,7 @@ export interface Contract<State = unknown> {

/**
* Returns the full call tree report the last
* interaction with contract (eg. after reading state)
* interaction with contract (e.g. after reading state)
*/
getCallStack(): ContractCallRecord;

Expand Down
6 changes: 4 additions & 2 deletions src/contract/EvaluationOptionsEvaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ export class EvaluationOptionsEvaluator {
remoteStateSyncSource: () => this.rootOptions['remoteStateSyncSource'],
useKVStorage: (foreignOptions) => foreignOptions['useKVStorage'],
useConstructor: (foreignOptions) => foreignOptions['useConstructor'],
whitelistSources: () => this.rootOptions['whitelistSources']
whitelistSources: () => this.rootOptions['whitelistSources'],
transactionsPagesPerBatch: () => this.rootOptions['transactionsPagesPerBatch']
};

private readonly notConflictingEvaluationOptions: (keyof EvaluationOptions)[] = [
'useKVStorage',
'sourceType',
'useConstructor'
'useConstructor',
'transactionsPagesPerBatch'
];

/**
Expand Down
129 changes: 104 additions & 25 deletions src/contract/HandlerBasedContract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import {
InteractionResult,
InteractionType
} from '../core/modules/impl/HandlerExecutorFactory';
import { LexicographicalInteractionsSorter } from '../core/modules/impl/LexicographicalInteractionsSorter';
import {
genesisSortKey,
LexicographicalInteractionsSorter
} from '../core/modules/impl/LexicographicalInteractionsSorter';
import { InteractionsSorter } from '../core/modules/InteractionsSorter';
import { DefaultEvaluationOptions, EvalStateResult, EvaluationOptions } from '../core/modules/StateEvaluator';
import { WARP_TAGS } from '../core/KnownTags';
Expand Down Expand Up @@ -38,9 +41,9 @@ import { Mutex } from 'async-mutex';
import { Tag, Transaction, TransactionStatusResponse } from '../utils/types/arweave-types';
import { InteractionState } from './states/InteractionState';
import { ContractInteractionState } from './states/ContractInteractionState';
import { Crypto } from 'warp-isomorphic';
import { Buffer, Crypto } from 'warp-isomorphic';
import { VrfPluginFunctions } from '../core/WarpPlugin';
import { createData, tagsExceedLimit, DataItem, Signer } from 'warp-arbundles';
import { createData, DataItem, Signer, tagsExceedLimit } from 'warp-arbundles';

/**
* An implementation of {@link Contract} that is backwards compatible with current style
Expand Down Expand Up @@ -135,7 +138,8 @@ export class HandlerBasedContract<State> implements Contract<State> {
async readState(
sortKeyOrBlockHeight?: string | number,
caller?: string,
interactions?: GQLNodeInterface[]
interactions?: GQLNodeInterface[],
signal?: AbortSignal
): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
this.logger.info('Read state for', {
contractTxId: this._contractTxId,
Expand All @@ -162,7 +166,13 @@ export class HandlerBasedContract<State> implements Contract<State> {
const initBenchmark = Benchmark.measure();
this.maybeResetRootContract();

const executionContext = await this.createExecutionContext(this._contractTxId, sortKey, false, interactions);
const executionContext = await this.createExecutionContext(
this._contractTxId,
sortKey,
false,
interactions,
signal
);
this.logger.info('Execution Context', {
srcTxId: executionContext.contractDefinition?.srcTxId,
missingInteractions: executionContext.sortedInteractions?.length,
Expand Down Expand Up @@ -200,27 +210,83 @@ export class HandlerBasedContract<State> implements Contract<State> {

async readStateFor(
sortKey: string,
interactions: GQLNodeInterface[]
interactions: GQLNodeInterface[],
signal?: AbortSignal
): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
return this.readState(sortKey, undefined, interactions);
return this.readState(sortKey, undefined, interactions, signal);
}

async readStateBatch(pagesPerBatch = 1, signal: AbortSignal): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
if (!this.isRoot()) {
throw new Error('readStateBatch is only allowed for root contract calls');
}
if (pagesPerBatch < 1) {
throw new Error('At least one page per batch is required');
}
if (signal.aborted) {
throw new Error('readStateBatch aborted');
}

const contractTxId = this._contractTxId;
const { interactionsLoader, stateEvaluator } = this.warp;
let cachedState = await stateEvaluator.latestAvailableState<State>(contractTxId);

const evaluationOptions = {
...this._evaluationOptions,
transactionsPagesPerBatch: pagesPerBatch
};

let interactions: GQLNodeInterface[];
let batchesLoaded = 0;
do {
const batchBenchmark = Benchmark.measure();
this.logger.debug(`Loading ${++batchesLoaded}`, evaluationOptions);
interactions = await interactionsLoader.load(contractTxId, cachedState?.sortKey, undefined, evaluationOptions);
if (signal.aborted) {
throw new Error('readStateBatch aborted');
}
this.logger.debug(`Evaluating ${interactions.length} in ${batchesLoaded}`);
cachedState = await this.readStateFor(cachedState?.sortKey || genesisSortKey, interactions);
if (signal.aborted) {
throw new Error('readStateBatch aborted');
}
this.logger.debug(
`Batch ${batchesLoaded} evaluated in ${batchBenchmark.elapsed()} at sortKey ${cachedState.sortKey}`
);
} while (interactions.length > 0);

return cachedState;
}

async viewState<Input, View>(
input: Input,
tags: Tags = [],
transfer: ArTransfer = emptyTransfer,
caller?: string
caller?: string,
signal?: AbortSignal
): Promise<InteractionResult<State, View>> {
this.logger.info('View state for', this._contractTxId);
return await this.callContract<Input, View>(input, 'view', caller, undefined, tags, transfer);
return await this.callContract<Input, View>(
input,
'view',
caller,
undefined,
tags,
transfer,
false,
false,
true,
signal
);
}

async viewStateForTx<Input, View>(
input: Input,
interactionTx: GQLNodeInterface
interactionTx: GQLNodeInterface,
signal?: AbortSignal
): Promise<InteractionResult<State, View>> {
this.logger.info(`View state for ${this._contractTxId}`);
return await this.doApplyInputOnTx<Input, View>(input, interactionTx, 'view');
return await this.doApplyInputOnTx<Input, View>(input, interactionTx, 'view', signal);
}

async dryWrite<Input>(
Expand All @@ -234,9 +300,13 @@ export class HandlerBasedContract<State> implements Contract<State> {
return await this.callContract<Input>(input, 'write', caller, undefined, tags, transfer, undefined, vrf);
}

async applyInput<Input>(input: Input, transaction: GQLNodeInterface): Promise<InteractionResult<State, unknown>> {
async applyInput<Input>(
input: Input,
transaction: GQLNodeInterface,
signal?: AbortSignal
): Promise<InteractionResult<State, unknown>> {
this.logger.info(`Apply-input from transaction ${transaction.id} for ${this._contractTxId}`);
return await this.doApplyInputOnTx<Input>(input, transaction, 'write');
return await this.doApplyInputOnTx<Input>(input, transaction, 'write', signal);
}

async writeInteraction<Input>(
Expand Down Expand Up @@ -509,7 +579,8 @@ export class HandlerBasedContract<State> implements Contract<State> {
contractTxId: string,
upToSortKey?: string,
forceDefinitionLoad = false,
interactions?: GQLNodeInterface[]
interactions?: GQLNodeInterface[],
signal?: AbortSignal
): Promise<ExecutionContext<State, HandlerApi<State>>> {
const { definitionLoader, interactionsLoader, stateEvaluator } = this.warp;
let cachedState: SortKeyCacheResult<EvalStateResult<State>>;
Expand All @@ -531,14 +602,14 @@ export class HandlerBasedContract<State> implements Contract<State> {

this.logger.debug('Cached state', cachedState, upToSortKey);

if (cachedState && cachedState.sortKey == upToSortKey) {
if ((cachedState && cachedState.sortKey == upToSortKey) || upToSortKey === genesisSortKey) {
this.logger.debug('State fully cached, not loading interactions.');
if (forceDefinitionLoad || evolvedSrcTxId || interactions?.length) {
contractDefinition = await definitionLoader.load<State>(contractTxId, evolvedSrcTxId);
contractEvaluationOptions = this.resolveEvaluationOptions(contractDefinition.manifest?.evaluationOptions);
this.warp.executorFactory.checkWhiteListContractSources(contractDefinition, contractEvaluationOptions);
if (interactions?.length) {
sortedInteractions = (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map(
(i) => i.node
);
sortedInteractions = await this.getSortedInteractions(interactions);
}
}
} else {
Expand Down Expand Up @@ -616,10 +687,15 @@ export class HandlerBasedContract<State> implements Contract<State> {
evaluationOptions: contractEvaluationOptions || this.evaluationOptions(),
handler,
cachedState,
requestedSortKey: upToSortKey
requestedSortKey: upToSortKey,
signal
};
}

private async getSortedInteractions(interactions: GQLNodeInterface[]) {
return (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map((i) => i.node);
}

private resolveEvaluationOptions(rootManifestEvalOptions: EvaluationOptions) {
if (this.isRoot()) {
this._eoEvaluator = new EvaluationOptionsEvaluator(this.evaluationOptions(), rootManifestEvalOptions);
Expand Down Expand Up @@ -661,12 +737,13 @@ export class HandlerBasedContract<State> implements Contract<State> {

private async createExecutionContextFromTx(
contractTxId: string,
transaction: GQLNodeInterface
transaction: GQLNodeInterface,
signal?: AbortSignal
): Promise<ExecutionContext<State, HandlerApi<State>>> {
const caller = transaction.owner.address;
const sortKey = transaction.sortKey;

const baseContext = await this.createExecutionContext(contractTxId, sortKey, true);
const baseContext = await this.createExecutionContext(contractTxId, sortKey, true, undefined, signal);

return {
...baseContext,
Expand Down Expand Up @@ -695,7 +772,8 @@ export class HandlerBasedContract<State> implements Contract<State> {
transfer: ArTransfer = emptyTransfer,
strict = false,
vrf = false,
sign = true
sign = true,
signal?: AbortSignal
): Promise<InteractionResult<State, View>> {
this.logger.info('Call contract input', input);
this.maybeResetRootContract();
Expand All @@ -704,7 +782,7 @@ export class HandlerBasedContract<State> implements Contract<State> {
}
const { arweave, stateEvaluator } = this.warp;
// create execution context
let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true);
let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true, undefined, signal);

const currentBlockData =
this.warp.environment == 'mainnet' && !(this.warp.interactionsLoader.type() === 'arweave')
Expand Down Expand Up @@ -791,12 +869,13 @@ export class HandlerBasedContract<State> implements Contract<State> {
private async doApplyInputOnTx<Input, View = unknown>(
input: Input,
interactionTx: GQLNodeInterface,
interactionType: InteractionType
interactionType: InteractionType,
signal?: AbortSignal
): Promise<InteractionResult<State, View>> {
this.maybeResetRootContract();
let evalStateResult: SortKeyCacheResult<EvalStateResult<State>>;

const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx);
const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx, signal);

if (!this.isRoot() && this.interactionState().has(this.txId(), interactionTx.sortKey)) {
evalStateResult = new SortKeyCacheResult<EvalStateResult<State>>(
Expand Down
1 change: 1 addition & 0 deletions src/core/ExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ export type ExecutionContext<State, Api = unknown> = {
caller?: string; // note: this is only set for "viewState" and "write" operations
cachedState?: SortKeyCacheResult<EvalStateResult<State>>;
requestedSortKey?: string;
signal?: AbortSignal;
};
3 changes: 2 additions & 1 deletion src/core/modules/InteractionsLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export interface InteractionsLoader extends GwTypeAware, WarpAware {
contractTxId: string,
fromSortKey?: string,
toSortKey?: string,
evaluationOptions?: EvaluationOptions
evaluationOptions?: EvaluationOptions,
signal?: AbortSignal
): Promise<GQLNodeInterface[]>;

clearCache(): void;
Expand Down
4 changes: 4 additions & 0 deletions src/core/modules/StateEvaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ export class DefaultEvaluationOptions implements EvaluationOptions {
useConstructor = false;

whitelistSources = [];

transactionsPagesPerBatch = null;
}

// an interface for the contract EvaluationOptions - can be used to change the behaviour of some features.
Expand Down Expand Up @@ -242,6 +244,8 @@ export interface EvaluationOptions {
remoteStateSyncSource: string;

whitelistSources: string[];

transactionsPagesPerBatch: number;
}

// https://github.com/nodejs/node/issues/40678 duh...
Expand Down
Loading

0 comments on commit 2766094

Please sign in to comment.