From 67bf6d895490910e282201e61a17ea8cfc03f821 Mon Sep 17 00:00:00 2001 From: ppedziwiatr Date: Fri, 8 Dec 2023 13:41:35 +0100 Subject: [PATCH] feat: read state in batches --- src/contract/Contract.ts | 30 +++- src/contract/EvaluationOptionsEvaluator.ts | 6 +- src/contract/HandlerBasedContract.ts | 143 +++++++++++++++--- src/core/ExecutionContext.ts | 1 + src/core/modules/InteractionsLoader.ts | 3 +- src/core/modules/StateEvaluator.ts | 4 + src/core/modules/impl/ArweaveGQLTxsFetcher.ts | 22 ++- .../ArweaveGatewayBundledInteractionLoader.ts | 20 ++- .../impl/ArweaveGatewayInteractionsLoader.ts | 36 ++++- .../modules/impl/DefaultStateEvaluator.ts | 12 +- .../impl/WarpGatewayInteractionsLoader.ts | 9 +- .../impl/handler/AbstractContractHandler.ts | 8 +- 12 files changed, 237 insertions(+), 57 deletions(-) diff --git a/src/contract/Contract.ts b/src/contract/Contract.ts index bf698589..206aa2a7 100644 --- a/src/contract/Contract.ts +++ b/src/contract/Contract.ts @@ -113,10 +113,22 @@ export interface Contract { readState( sortKeyOrBlockHeight?: string | number, caller?: string, - interactions?: GQLNodeInterface[] + interactions?: GQLNodeInterface[], + signal?: AbortSignal ): Promise>>; - readStateFor(sortKey: string, interactions: GQLNodeInterface[]): Promise>>; + /** + * Reads state in batches - i.e. it first loads max. 5k interactions, evaluates them, then reads another 5k..and so on. + * + * Consider this as an experimental feature + */ + readStateBatch(batchSize: number, signal: AbortSignal): Promise>>; + + readStateFor( + sortKey: string, + interactions: GQLNodeInterface[], + signal?: AbortSignal + ): Promise>>; /** * Returns the "view" of the state, computed by the SWC - @@ -138,7 +150,8 @@ export interface Contract { input: Input, tags?: Tags, transfer?: ArTransfer, - caller?: string + caller?: string, + signal?: AbortSignal ): Promise>; /** @@ -155,7 +168,8 @@ export interface Contract { */ viewStateForTx( input: Input, - transaction: GQLNodeInterface + transaction: GQLNodeInterface, + signal?: AbortSignal ): Promise>; /** @@ -177,7 +191,11 @@ export interface Contract { vrf?: boolean ): Promise>; - applyInput(input: Input, transaction: GQLNodeInterface): Promise>; + applyInput( + input: Input, + transaction: GQLNodeInterface, + signal?: AbortSignal + ): Promise>; /** * Writes a new "interaction" transaction - i.e. such transaction that stores input for the contract. @@ -189,7 +207,7 @@ export interface Contract { /** * Returns the full call tree report the last - * interaction with contract (eg. after reading state) + * interaction with contract (e.g. after reading state) */ getCallStack(): ContractCallRecord; diff --git a/src/contract/EvaluationOptionsEvaluator.ts b/src/contract/EvaluationOptionsEvaluator.ts index 477a2ce3..47d69184 100644 --- a/src/contract/EvaluationOptionsEvaluator.ts +++ b/src/contract/EvaluationOptionsEvaluator.ts @@ -107,13 +107,15 @@ export class EvaluationOptionsEvaluator { remoteStateSyncSource: () => this.rootOptions['remoteStateSyncSource'], useKVStorage: (foreignOptions) => foreignOptions['useKVStorage'], useConstructor: (foreignOptions) => foreignOptions['useConstructor'], - whitelistSources: () => this.rootOptions['whitelistSources'] + whitelistSources: () => this.rootOptions['whitelistSources'], + transactionsPagesPerBatch: () => this.rootOptions['transactionsPagesPerBatch'] }; private readonly notConflictingEvaluationOptions: (keyof EvaluationOptions)[] = [ 'useKVStorage', 'sourceType', - 'useConstructor' + 'useConstructor', + 'transactionsPagesPerBatch' ]; /** diff --git a/src/contract/HandlerBasedContract.ts b/src/contract/HandlerBasedContract.ts index 95c79e29..754e42c9 100644 --- a/src/contract/HandlerBasedContract.ts +++ b/src/contract/HandlerBasedContract.ts @@ -9,7 +9,10 @@ import { InteractionResult, InteractionType } from '../core/modules/impl/HandlerExecutorFactory'; -import { LexicographicalInteractionsSorter } from '../core/modules/impl/LexicographicalInteractionsSorter'; +import { + genesisSortKey, + LexicographicalInteractionsSorter +} from '../core/modules/impl/LexicographicalInteractionsSorter'; import { InteractionsSorter } from '../core/modules/InteractionsSorter'; import { DefaultEvaluationOptions, EvalStateResult, EvaluationOptions } from '../core/modules/StateEvaluator'; import { WARP_TAGS } from '../core/KnownTags'; @@ -38,9 +41,9 @@ import { Mutex } from 'async-mutex'; import { Tag, Transaction, TransactionStatusResponse } from '../utils/types/arweave-types'; import { InteractionState } from './states/InteractionState'; import { ContractInteractionState } from './states/ContractInteractionState'; -import { Crypto } from 'warp-isomorphic'; +import { Buffer, Crypto } from 'warp-isomorphic'; import { VrfPluginFunctions } from '../core/WarpPlugin'; -import { createData, tagsExceedLimit, DataItem, Signer } from 'warp-arbundles'; +import { createData, DataItem, Signer, tagsExceedLimit } from 'warp-arbundles'; /** * An implementation of {@link Contract} that is backwards compatible with current style @@ -135,7 +138,8 @@ export class HandlerBasedContract implements Contract { async readState( sortKeyOrBlockHeight?: string | number, caller?: string, - interactions?: GQLNodeInterface[] + interactions?: GQLNodeInterface[], + signal?: AbortSignal ): Promise>> { this.logger.info('Read state for', { contractTxId: this._contractTxId, @@ -162,7 +166,13 @@ export class HandlerBasedContract implements Contract { const initBenchmark = Benchmark.measure(); this.maybeResetRootContract(); - const executionContext = await this.createExecutionContext(this._contractTxId, sortKey, false, interactions); + const executionContext = await this.createExecutionContext( + this._contractTxId, + sortKey, + false, + interactions, + signal + ); this.logger.info('Execution Context', { srcTxId: executionContext.contractDefinition?.srcTxId, missingInteractions: executionContext.sortedInteractions?.length, @@ -200,27 +210,99 @@ export class HandlerBasedContract implements Contract { async readStateFor( sortKey: string, - interactions: GQLNodeInterface[] + interactions: GQLNodeInterface[], + signal?: AbortSignal ): Promise>> { - return this.readState(sortKey, undefined, interactions); + return this.readState(sortKey, undefined, interactions, signal); + } + + async readStateBatch(pagesPerBatch = 1, signal: AbortSignal): Promise>> { + if (!this.isRoot()) { + throw new Error('readStateBatch is only allowed for root contract calls'); + } + if (pagesPerBatch < 1) { + throw new Error('At least one page per batch is required'); + } + if (signal.aborted) { + throw new Error('readStateBatch aborted'); + } + + const taskDone = new AbortController(); + signal.addEventListener( + 'abort', + () => { + this.logger.warn('Aborting readStateBatch'); + }, + { + once: true, + signal: taskDone.signal + } + ); + + try { + const contractTxId = this._contractTxId; + const { interactionsLoader, stateEvaluator } = this.warp; + let cachedState = await stateEvaluator.latestAvailableState(contractTxId); + + const evaluationOptions = { + ...this._evaluationOptions, + transactionsPagesInBatch: pagesPerBatch + }; + + let interactions: GQLNodeInterface[]; + let batchesLoaded = 0; + do { + const batchBenchmark = Benchmark.measure(); + this.logger.debug(`Loading ${++batchesLoaded}`); + interactions = await interactionsLoader.load(contractTxId, cachedState?.sortKey, undefined, evaluationOptions); + if (signal.aborted) { + throw new Error('readStateBatch aborted'); + } + this.logger.debug(`Evaluating ${interactions.length} in ${batchesLoaded}`); + cachedState = await this.readStateFor(cachedState?.sortKey || genesisSortKey, interactions); + if (signal.aborted) { + throw new Error('readStateBatch aborted'); + } + this.logger.debug( + `Batch ${batchesLoaded} evaluated in ${batchBenchmark.elapsed()} at sortKey ${cachedState.sortKey}` + ); + } while (interactions.length > 0); + + return cachedState; + } finally { + taskDone.abort(); + } } async viewState( input: Input, tags: Tags = [], transfer: ArTransfer = emptyTransfer, - caller?: string + caller?: string, + signal?: AbortSignal ): Promise> { this.logger.info('View state for', this._contractTxId); - return await this.callContract(input, 'view', caller, undefined, tags, transfer); + return await this.callContract( + input, + 'view', + caller, + undefined, + tags, + transfer, + false, + false, + true, + signal + ); } async viewStateForTx( input: Input, - interactionTx: GQLNodeInterface + interactionTx: GQLNodeInterface, + signal?: AbortSignal ): Promise> { this.logger.info(`View state for ${this._contractTxId}`); - return await this.doApplyInputOnTx(input, interactionTx, 'view'); + return await this.doApplyInputOnTx(input, interactionTx, 'view', signal); } async dryWrite( @@ -234,9 +316,13 @@ export class HandlerBasedContract implements Contract { return await this.callContract(input, 'write', caller, undefined, tags, transfer, undefined, vrf); } - async applyInput(input: Input, transaction: GQLNodeInterface): Promise> { + async applyInput( + input: Input, + transaction: GQLNodeInterface, + signal?: AbortSignal + ): Promise> { this.logger.info(`Apply-input from transaction ${transaction.id} for ${this._contractTxId}`); - return await this.doApplyInputOnTx(input, transaction, 'write'); + return await this.doApplyInputOnTx(input, transaction, 'write', signal); } async writeInteraction( @@ -509,7 +595,8 @@ export class HandlerBasedContract implements Contract { contractTxId: string, upToSortKey?: string, forceDefinitionLoad = false, - interactions?: GQLNodeInterface[] + interactions?: GQLNodeInterface[], + signal?: AbortSignal ): Promise>> { const { definitionLoader, interactionsLoader, stateEvaluator } = this.warp; let cachedState: SortKeyCacheResult>; @@ -535,10 +622,10 @@ export class HandlerBasedContract implements Contract { this.logger.debug('State fully cached, not loading interactions.'); if (forceDefinitionLoad || evolvedSrcTxId || interactions?.length) { contractDefinition = await definitionLoader.load(contractTxId, evolvedSrcTxId); + contractEvaluationOptions = this.resolveEvaluationOptions(contractDefinition.manifest?.evaluationOptions); + this.warp.executorFactory.checkWhiteListContractSources(contractDefinition, contractEvaluationOptions); if (interactions?.length) { - sortedInteractions = (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map( - (i) => i.node - ); + sortedInteractions = await this.getSortedInteractions(interactions); } } } else { @@ -616,10 +703,15 @@ export class HandlerBasedContract implements Contract { evaluationOptions: contractEvaluationOptions || this.evaluationOptions(), handler, cachedState, - requestedSortKey: upToSortKey + requestedSortKey: upToSortKey, + signal }; } + private async getSortedInteractions(interactions: GQLNodeInterface[]) { + return (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map((i) => i.node); + } + private resolveEvaluationOptions(rootManifestEvalOptions: EvaluationOptions) { if (this.isRoot()) { this._eoEvaluator = new EvaluationOptionsEvaluator(this.evaluationOptions(), rootManifestEvalOptions); @@ -661,12 +753,13 @@ export class HandlerBasedContract implements Contract { private async createExecutionContextFromTx( contractTxId: string, - transaction: GQLNodeInterface + transaction: GQLNodeInterface, + signal?: AbortSignal ): Promise>> { const caller = transaction.owner.address; const sortKey = transaction.sortKey; - const baseContext = await this.createExecutionContext(contractTxId, sortKey, true); + const baseContext = await this.createExecutionContext(contractTxId, sortKey, true, undefined, signal); return { ...baseContext, @@ -695,7 +788,8 @@ export class HandlerBasedContract implements Contract { transfer: ArTransfer = emptyTransfer, strict = false, vrf = false, - sign = true + sign = true, + signal?: AbortSignal ): Promise> { this.logger.info('Call contract input', input); this.maybeResetRootContract(); @@ -704,7 +798,7 @@ export class HandlerBasedContract implements Contract { } const { arweave, stateEvaluator } = this.warp; // create execution context - let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true); + let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true, undefined, signal); const currentBlockData = this.warp.environment == 'mainnet' && !(this.warp.interactionsLoader.type() === 'arweave') @@ -791,12 +885,13 @@ export class HandlerBasedContract implements Contract { private async doApplyInputOnTx( input: Input, interactionTx: GQLNodeInterface, - interactionType: InteractionType + interactionType: InteractionType, + signal?: AbortSignal ): Promise> { this.maybeResetRootContract(); let evalStateResult: SortKeyCacheResult>; - const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx); + const executionContext = await this.createExecutionContextFromTx(this._contractTxId, interactionTx, signal); if (!this.isRoot() && this.interactionState().has(this.txId(), interactionTx.sortKey)) { evalStateResult = new SortKeyCacheResult>( diff --git a/src/core/ExecutionContext.ts b/src/core/ExecutionContext.ts index 99d7c64a..d1fcf0b6 100644 --- a/src/core/ExecutionContext.ts +++ b/src/core/ExecutionContext.ts @@ -43,4 +43,5 @@ export type ExecutionContext = { caller?: string; // note: this is only set for "viewState" and "write" operations cachedState?: SortKeyCacheResult>; requestedSortKey?: string; + signal?: AbortSignal; }; diff --git a/src/core/modules/InteractionsLoader.ts b/src/core/modules/InteractionsLoader.ts index 24590592..c7906365 100644 --- a/src/core/modules/InteractionsLoader.ts +++ b/src/core/modules/InteractionsLoader.ts @@ -27,7 +27,8 @@ export interface InteractionsLoader extends GwTypeAware, WarpAware { contractTxId: string, fromSortKey?: string, toSortKey?: string, - evaluationOptions?: EvaluationOptions + evaluationOptions?: EvaluationOptions, + signal?: AbortSignal ): Promise; clearCache(): void; diff --git a/src/core/modules/StateEvaluator.ts b/src/core/modules/StateEvaluator.ts index 0785a810..98fe638d 100644 --- a/src/core/modules/StateEvaluator.ts +++ b/src/core/modules/StateEvaluator.ts @@ -152,6 +152,8 @@ export class DefaultEvaluationOptions implements EvaluationOptions { useConstructor = false; whitelistSources = []; + + transactionsPagesPerBatch = null; } // an interface for the contract EvaluationOptions - can be used to change the behaviour of some features. @@ -242,6 +244,8 @@ export interface EvaluationOptions { remoteStateSyncSource: string; whitelistSources: string[]; + + transactionsPagesPerBatch: number; } // https://github.com/nodejs/node/issues/40678 duh... diff --git a/src/core/modules/impl/ArweaveGQLTxsFetcher.ts b/src/core/modules/impl/ArweaveGQLTxsFetcher.ts index ff5dcc5d..a935b5b9 100644 --- a/src/core/modules/impl/ArweaveGQLTxsFetcher.ts +++ b/src/core/modules/impl/ArweaveGQLTxsFetcher.ts @@ -117,7 +117,6 @@ export class ArweaveGQLTxsFetcher { async transaction(transactionId: string): Promise { const response = await this.fetch(TRANSACTION_QUERY, { id: transactionId }); - return response.transaction; } @@ -138,10 +137,26 @@ export class ArweaveGQLTxsFetcher { return response.edges[0].node; } - async transactions(variables: ArweaveTransactionQuery): Promise { + async transactions( + variables: ArweaveTransactionQuery, + pagesPerBatch: number, + signal?: AbortSignal + ): Promise { let pageResult = (await this.fetch(TRANSACTIONS_QUERY, variables)).transactions; const edges: GQLEdgeInterface[] = [...pageResult.edges]; - while (pageResult.pageInfo.hasNextPage) { + let pagesLoaded = 1; + if (pagesLoaded >= pagesPerBatch) { + return edges; + } + if (signal?.aborted) { + throw new Error(`Abort signal in ${ArweaveGQLTxsFetcher.name}`); + } + + while (pageResult.pageInfo.hasNextPage && pagesLoaded < pagesPerBatch) { + if (signal?.aborted) { + throw new Error(`Abort signal in ${ArweaveGQLTxsFetcher.name}`); + } + const cursor = pageResult.edges[MAX_REQUEST - 1].cursor; const newVariables = { @@ -151,6 +166,7 @@ export class ArweaveGQLTxsFetcher { pageResult = (await this.fetch(TRANSACTIONS_QUERY, newVariables)).transactions; edges.push(...pageResult.edges); + pagesLoaded++; } return edges; } diff --git a/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts b/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts index e60ba210..cc4f0328 100644 --- a/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts +++ b/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts @@ -59,9 +59,13 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade contractId: string, fromSortKey?: string, toSortKey?: string, - evaluationOptions?: EvaluationOptions + evaluationOptions?: EvaluationOptions, + signal?: AbortSignal ): Promise { this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey }); + if (evaluationOptions?.transactionsPagesPerBatch) { + throw new Error(`Loading in batches not yet implemented for ${ArweaveGatewayBundledInteractionLoader.name}`); + } const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey) || 0; const toBlockHeight = this.sorter.extractBlockHeight(toSortKey) || (await this.currentBlockHeight()); @@ -89,14 +93,15 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade }; const loadingBenchmark = Benchmark.measure(); - let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery); + let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery, Number.MAX_SAFE_INTEGER, signal); if (evaluationOptions.internalWrites) { interactions = await this.appendInternalWriteInteractions( contractId, fromBlockHeight, toBlockHeight, - interactions + interactions, + signal ); } loadingBenchmark.stop(); @@ -227,7 +232,8 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade contractId: string, fromBlockHeight: number, toBlockHeight: number, - interactions: GQLEdgeInterface[] + interactions: GQLEdgeInterface[], + signal: AbortSignal ) { const innerWritesVariables: GqlReqVariables = { tags: [ @@ -242,7 +248,11 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade }, first: MAX_REQUEST }; - const innerWritesInteractions = await this.arweaveFetcher.transactions(innerWritesVariables); + const innerWritesInteractions = await this.arweaveFetcher.transactions( + innerWritesVariables, + Number.MAX_SAFE_INTEGER, + signal + ); this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length); interactions = interactions.concat(innerWritesInteractions); return interactions; diff --git a/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts b/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts index 73defff6..4b28331c 100644 --- a/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts +++ b/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts @@ -34,12 +34,15 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { contractId: string, fromSortKey?: string, toSortKey?: string, - evaluationOptions?: EvaluationOptions + evaluationOptions?: EvaluationOptions, + signal?: AbortSignal ): Promise { this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey }); const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey); - const toBlockHeight = this.sorter.extractBlockHeight(toSortKey); + let toBlockHeight = this.sorter.extractBlockHeight(toSortKey); + const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER; + this.logger.debug('Pages per batch', pagesPerBatch); const mainTransactionsQuery: ArweaveTransactionQuery = { tags: [ @@ -60,12 +63,29 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { }; const loadingBenchmark = Benchmark.measure(); - let interactions = (await this.arweaveTransactionQuery.transactions(mainTransactionsQuery)).filter( - bundledTxsFilter - ); + let interactions = ( + await this.arweaveTransactionQuery.transactions(mainTransactionsQuery, pagesPerBatch, signal) + ).filter(bundledTxsFilter); loadingBenchmark.stop(); + if (evaluationOptions?.transactionsPagesPerBatch && interactions.length > 0) { + interactions = await this.sorter.sort(interactions); + toBlockHeight = interactions[interactions.length - 1].node.block.height; + } if (evaluationOptions.internalWrites) { + const pagesPerBatchIw = (function () { + if (evaluationOptions?.transactionsPagesPerBatch) { + if (interactions.length > 0) { + // note: the limit in this case is the block height of the last 'direct' interaction + return Number.MAX_SAFE_INTEGER; + } else { + return evaluationOptions?.transactionsPagesPerBatch; + } + } else { + return Number.MAX_SAFE_INTEGER; + } + })(); + const innerWritesVariables: ArweaveTransactionQuery = { tags: [ { @@ -79,9 +99,9 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { }, first: MAX_REQUEST }; - const innerWritesInteractions = (await this.arweaveTransactionQuery.transactions(innerWritesVariables)).filter( - bundledTxsFilter - ); + const innerWritesInteractions = ( + await this.arweaveTransactionQuery.transactions(innerWritesVariables, pagesPerBatchIw, signal) + ).filter(bundledTxsFilter); this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length); interactions = interactions.concat(innerWritesInteractions); diff --git a/src/core/modules/impl/DefaultStateEvaluator.ts b/src/core/modules/impl/DefaultStateEvaluator.ts index 5217eb33..a3638aa0 100644 --- a/src/core/modules/impl/DefaultStateEvaluator.ts +++ b/src/core/modules/impl/DefaultStateEvaluator.ts @@ -55,7 +55,7 @@ export abstract class DefaultStateEvaluator implements StateEvaluator { executionContext: ExecutionContext> ): Promise>> { const { ignoreExceptions, stackTrace, internalWrites } = executionContext.evaluationOptions; - const { contract, contractDefinition, sortedInteractions, warp } = executionContext; + const { contract, contractDefinition, sortedInteractions, warp, signal } = executionContext; let currentState = baseState.state; let currentSortKey = null; @@ -89,6 +89,9 @@ export abstract class DefaultStateEvaluator implements StateEvaluator { if (shouldBreakAfterEvolve) { break; } + if (signal?.aborted) { + throw new Error(`Abort signal in ${DefaultStateEvaluator.name}`); + } const missingInteraction = missingInteractions[i]; currentSortKey = missingInteraction.sortKey; @@ -151,7 +154,12 @@ export abstract class DefaultStateEvaluator implements StateEvaluator { let newState: EvalStateResult = null; let writingContractState: SortKeyCacheResult> = null; try { - writingContractState = await writingContract.readState(missingInteraction.sortKey); + writingContractState = await writingContract.readState( + missingInteraction.sortKey, + undefined, + undefined, + signal + ); newState = contract.interactionState().get(contract.txId(), missingInteraction.sortKey); } catch (e) { // ppe: not sure why we're not handling all ContractErrors here... diff --git a/src/core/modules/impl/WarpGatewayInteractionsLoader.ts b/src/core/modules/impl/WarpGatewayInteractionsLoader.ts index b7aa0e8d..b3b3cbcf 100644 --- a/src/core/modules/impl/WarpGatewayInteractionsLoader.ts +++ b/src/core/modules/impl/WarpGatewayInteractionsLoader.ts @@ -65,7 +65,8 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { contractId: string, fromSortKey?: string, toSortKey?: string, - evaluationOptions?: EvaluationOptions + evaluationOptions?: EvaluationOptions, + signal?: AbortSignal ): Promise { this.logger.debug('Loading interactions: for ', { contractId, fromSortKey, toSortKey }); @@ -73,6 +74,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { let page = 0; let limit = 0; let items = 0; + const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER; const effectiveSourceType = evaluationOptions ? evaluationOptions.sourceType : this.source; const benchmarkTotalTime = Benchmark.measure(); @@ -80,6 +82,9 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { do { const benchmarkRequestTime = Benchmark.measure(); + if (signal?.aborted) { + throw new Error(`Abort signal in ${WarpGatewayInteractionsLoader.name}`); + } const url = `${baseUrl}/gateway/v2/interactions-sort-key`; @@ -109,7 +114,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { items = response.paging.items; this.logger.debug(`Loaded interactions length: ${interactions.length}, from: ${fromSortKey}, to: ${toSortKey}`); - } while (items == limit); // note: items < limit means that we're on the last page + } while (items == limit && page < pagesPerBatch); // note: items < limit means that we're on the last page this.logger.debug('All loaded interactions:', { from: fromSortKey, diff --git a/src/core/modules/impl/handler/AbstractContractHandler.ts b/src/core/modules/impl/handler/AbstractContractHandler.ts index 2867a127..75d4b493 100644 --- a/src/core/modules/impl/handler/AbstractContractHandler.ts +++ b/src/core/modules/impl/handler/AbstractContractHandler.ts @@ -63,7 +63,7 @@ export abstract class AbstractContractHandler implements HandlerApi(input, this.swGlobal._activeTx); + const result = await calleeContract.applyInput(input, this.swGlobal._activeTx, executionContext.signal); this.logger.debug('Cache result?:', !this.swGlobal._activeTx.dry); const shouldAutoThrow = @@ -117,7 +117,7 @@ export abstract class AbstractContractHandler implements HandlerApi(input, this.swGlobal._activeTx); + return await childContract.viewStateForTx(input, this.swGlobal._activeTx, executionContext.signal); }; } @@ -130,14 +130,14 @@ export abstract class AbstractContractHandler implements HandlerApi