From 6b12c003b61ab2dfe18271d2ec5fbed0bf9fc26c Mon Sep 17 00:00:00 2001 From: ppedziwiatr Date: Fri, 8 Dec 2023 13:41:35 +0100 Subject: [PATCH] feat: read state in batches --- src/contract/Contract.ts | 7 + src/contract/EvaluationOptionsEvaluator.ts | 6 +- src/contract/HandlerBasedContract.ts | 269 +++++++++++------- src/core/modules/StateEvaluator.ts | 4 + src/core/modules/impl/ArweaveGQLTxsFetcher.ts | 11 +- .../ArweaveGatewayBundledInteractionLoader.ts | 7 +- .../impl/ArweaveGatewayInteractionsLoader.ts | 29 +- .../impl/WarpGatewayInteractionsLoader.ts | 3 +- 8 files changed, 218 insertions(+), 118 deletions(-) diff --git a/src/contract/Contract.ts b/src/contract/Contract.ts index bf698589..797b9d23 100644 --- a/src/contract/Contract.ts +++ b/src/contract/Contract.ts @@ -116,6 +116,13 @@ export interface Contract { interactions?: GQLNodeInterface[] ): Promise>>; + /** + * Reads state in batches - i.e. it first loads max. 5k interactions, evaluates them, then reads another 5k..and so on. + * + * Consider this as an experimental feature + */ + readStateBatch(batchSize: number, signal: AbortSignal): Promise>>; + readStateFor(sortKey: string, interactions: GQLNodeInterface[]): Promise>>; /** diff --git a/src/contract/EvaluationOptionsEvaluator.ts b/src/contract/EvaluationOptionsEvaluator.ts index 477a2ce3..47d69184 100644 --- a/src/contract/EvaluationOptionsEvaluator.ts +++ b/src/contract/EvaluationOptionsEvaluator.ts @@ -107,13 +107,15 @@ export class EvaluationOptionsEvaluator { remoteStateSyncSource: () => this.rootOptions['remoteStateSyncSource'], useKVStorage: (foreignOptions) => foreignOptions['useKVStorage'], useConstructor: (foreignOptions) => foreignOptions['useConstructor'], - whitelistSources: () => this.rootOptions['whitelistSources'] + whitelistSources: () => this.rootOptions['whitelistSources'], + transactionsPagesPerBatch: () => this.rootOptions['transactionsPagesPerBatch'] }; private readonly notConflictingEvaluationOptions: (keyof EvaluationOptions)[] = [ 'useKVStorage', 'sourceType', - 'useConstructor' + 'useConstructor', + 'transactionsPagesPerBatch' ]; /** diff --git a/src/contract/HandlerBasedContract.ts b/src/contract/HandlerBasedContract.ts index 95c79e29..c2c931c8 100644 --- a/src/contract/HandlerBasedContract.ts +++ b/src/contract/HandlerBasedContract.ts @@ -1,26 +1,29 @@ -import stringify from 'safe-stable-stringify'; -import { SortKeyCacheResult } from '../cache/SortKeyCache'; -import { ContractCallRecord, InteractionCall } from '../core/ContractCallRecord'; -import { ExecutionContext } from '../core/ExecutionContext'; +import stringify from "safe-stable-stringify"; +import { SortKeyCacheResult } from "../cache/SortKeyCache"; +import { ContractCallRecord, InteractionCall } from "../core/ContractCallRecord"; +import { ExecutionContext } from "../core/ExecutionContext"; import { ContractInteraction, HandlerApi, InteractionData, InteractionResult, InteractionType -} from '../core/modules/impl/HandlerExecutorFactory'; -import { LexicographicalInteractionsSorter } from '../core/modules/impl/LexicographicalInteractionsSorter'; -import { InteractionsSorter } from '../core/modules/InteractionsSorter'; -import { DefaultEvaluationOptions, EvalStateResult, EvaluationOptions } from '../core/modules/StateEvaluator'; -import { WARP_TAGS } from '../core/KnownTags'; -import { Warp } from '../core/Warp'; -import { createDummyTx, createInteractionTagsList, createInteractionTx } from '../legacy/create-interaction-tx'; -import { GQLNodeInterface } from '../legacy/gqlResult'; -import { Benchmark } from '../logging/Benchmark'; -import { LoggerFactory } from '../logging/LoggerFactory'; -import { Evolve } from '../plugins/Evolve'; -import { ArweaveWrapper } from '../utils/ArweaveWrapper'; -import { getJsonResponse, isBrowser, sleep, stripTrailingSlash } from '../utils/utils'; +} from "../core/modules/impl/HandlerExecutorFactory"; +import { + genesisSortKey, + LexicographicalInteractionsSorter +} from "../core/modules/impl/LexicographicalInteractionsSorter"; +import { InteractionsSorter } from "../core/modules/InteractionsSorter"; +import { DefaultEvaluationOptions, EvalStateResult, EvaluationOptions } from "../core/modules/StateEvaluator"; +import { WARP_TAGS } from "../core/KnownTags"; +import { Warp } from "../core/Warp"; +import { createDummyTx, createInteractionTagsList, createInteractionTx } from "../legacy/create-interaction-tx"; +import { GQLNodeInterface } from "../legacy/gqlResult"; +import { Benchmark } from "../logging/Benchmark"; +import { LoggerFactory } from "../logging/LoggerFactory"; +import { Evolve } from "../plugins/Evolve"; +import { ArweaveWrapper } from "../utils/ArweaveWrapper"; +import { getJsonResponse, isBrowser, sleep, stripTrailingSlash } from "../utils/utils"; import { BenchmarkStats, Contract, @@ -28,19 +31,19 @@ import { InnerCallData, WriteInteractionOptions, WriteInteractionResponse -} from './Contract'; -import { ArTransfer, ArWallet, emptyTransfer, Tags } from './deploy/CreateContract'; -import { InnerWritesEvaluator } from './InnerWritesEvaluator'; -import { CustomSignature, Signature } from './Signature'; -import { EvaluationOptionsEvaluator } from './EvaluationOptionsEvaluator'; -import { WarpFetchWrapper } from '../core/WarpFetchWrapper'; -import { Mutex } from 'async-mutex'; -import { Tag, Transaction, TransactionStatusResponse } from '../utils/types/arweave-types'; -import { InteractionState } from './states/InteractionState'; -import { ContractInteractionState } from './states/ContractInteractionState'; -import { Crypto } from 'warp-isomorphic'; -import { VrfPluginFunctions } from '../core/WarpPlugin'; -import { createData, tagsExceedLimit, DataItem, Signer } from 'warp-arbundles'; +} from "./Contract"; +import { ArTransfer, ArWallet, emptyTransfer, Tags } from "./deploy/CreateContract"; +import { InnerWritesEvaluator } from "./InnerWritesEvaluator"; +import { CustomSignature, Signature } from "./Signature"; +import { EvaluationOptionsEvaluator } from "./EvaluationOptionsEvaluator"; +import { WarpFetchWrapper } from "../core/WarpFetchWrapper"; +import { Mutex } from "async-mutex"; +import { Tag, Transaction, TransactionStatusResponse } from "../utils/types/arweave-types"; +import { InteractionState } from "./states/InteractionState"; +import { ContractInteractionState } from "./states/ContractInteractionState"; +import { Buffer, Crypto } from "warp-isomorphic"; +import { VrfPluginFunctions } from "../core/WarpPlugin"; +import { createData, DataItem, Signer, tagsExceedLimit } from "warp-arbundles"; /** * An implementation of {@link Contract} that is backwards compatible with current style @@ -50,10 +53,10 @@ import { createData, tagsExceedLimit, DataItem, Signer } from 'warp-arbundles'; */ export class HandlerBasedContract implements Contract { - private readonly logger = LoggerFactory.INST.create('HandlerBasedContract'); + private readonly logger = LoggerFactory.INST.create("HandlerBasedContract"); // TODO: refactor: extract execution context logic to a separate class - private readonly ecLogger = LoggerFactory.INST.create('ExecutionContext'); + private readonly ecLogger = LoggerFactory.INST.create("ExecutionContext"); private readonly _innerWritesEvaluator = new InnerWritesEvaluator(); private readonly _callDepth: number; @@ -96,7 +99,7 @@ export class HandlerBasedContract implements Contract { )}` ); } - this.logger.debug('Calling interaction', { + this.logger.debug("Calling interaction", { id: _innerCallData.callingInteraction.id, sortKey: _innerCallData.callingInteraction.sortKey, type: _innerCallData.callType @@ -106,11 +109,11 @@ export class HandlerBasedContract implements Contract { // the current state of the callee contract is always in the result of an internal write. // following is a protection against naughty developers who might be doing such crazy things ;-) if ( - callingInteraction.interactionInput?.foreignContractCalls[_contractTxId]?.innerCallType === 'write' && - _innerCallData.callType === 'read' + callingInteraction.interactionInput?.foreignContractCalls[_contractTxId]?.innerCallType === "write" && + _innerCallData.callType === "read" ) { throw new Error( - 'Calling a readContractState after performing an inner write is wrong - instead use a state from the result of an internal write.' + "Calling a readContractState after performing an inner write is wrong - instead use a state from the result of an internal write." ); } @@ -137,16 +140,16 @@ export class HandlerBasedContract implements Contract { caller?: string, interactions?: GQLNodeInterface[] ): Promise>> { - this.logger.info('Read state for', { + this.logger.info("Read state for", { contractTxId: this._contractTxId, sortKeyOrBlockHeight }); if (!this.isRoot() && sortKeyOrBlockHeight == null) { - throw new Error('SortKey MUST be always set for non-root contract calls'); + throw new Error("SortKey MUST be always set for non-root contract calls"); } const { stateEvaluator } = this.warp; const sortKey = - typeof sortKeyOrBlockHeight == 'number' + typeof sortKeyOrBlockHeight == "number" ? this._sorter.generateLastSortKey(sortKeyOrBlockHeight) : sortKeyOrBlockHeight; @@ -163,7 +166,7 @@ export class HandlerBasedContract implements Contract { this.maybeResetRootContract(); const executionContext = await this.createExecutionContext(this._contractTxId, sortKey, false, interactions); - this.logger.info('Execution Context', { + this.logger.info("Execution Context", { srcTxId: executionContext.contractDefinition?.srcTxId, missingInteractions: executionContext.sortedInteractions?.length, cachedSortKey: executionContext.cachedState?.sortKey @@ -182,10 +185,10 @@ export class HandlerBasedContract implements Contract { total }; - this.logger.info('Benchmark', { - 'Gateway communication ': initBenchmark.elapsed(), - 'Contract evaluation ': stateBenchmark.elapsed(), - 'Total: ': `${total.toFixed(0)}ms` + this.logger.info("Benchmark", { + "Gateway communication ": initBenchmark.elapsed(), + "Contract evaluation ": stateBenchmark.elapsed(), + "Total: ": `${total.toFixed(0)}ms` }); if (sortKey && !this.isRoot()) { @@ -205,14 +208,66 @@ export class HandlerBasedContract implements Contract { return this.readState(sortKey, undefined, interactions); } + async readStateBatch(pagesPerBatch = 1, signal: AbortSignal): Promise>> { + if (!this.isRoot()) { + throw new Error("readStateBatch is only allowed for root contract calls"); + } + if (pagesPerBatch < 1) { + throw new Error("At least one page per batch is required"); + } + if (signal.aborted) { + throw new Error("readStateBatch aborted"); + } + + const taskDone = new AbortController(); + signal.addEventListener("abort", () => { + this.logger.warn("Aborting readStateBatch"); + }, { + once: true, + signal: taskDone.signal + }); + + try { + const contractTxId = this._contractTxId; + const { interactionsLoader, stateEvaluator } = this.warp; + let cachedState = await stateEvaluator.latestAvailableState(contractTxId); + + const evaluationOptions = { + ...this._evaluationOptions, + transactionsPagesInBatch: pagesPerBatch + }; + + let interactions: GQLNodeInterface[]; + let batchesLoaded = 0; + do { + const batchBenchmark = Benchmark.measure(); + this.logger.debug(`Loading ${++batchesLoaded}`); + interactions = await interactionsLoader.load(contractTxId, cachedState?.sortKey, undefined, evaluationOptions); + if (signal.aborted) { + throw new Error("readStateBatch aborted"); + } + this.logger.debug(`Evaluating ${interactions.length} in ${batchesLoaded}`); + cachedState = await this.readStateFor(cachedState?.sortKey || genesisSortKey, interactions); + if (signal.aborted) { + throw new Error("readStateBatch aborted"); + } + this.logger.debug(`Batch ${batchesLoaded} evaluated in ${batchBenchmark.elapsed()} at sortKey ${cachedState.sortKey}`); + } while (interactions.length > 0); + + return cachedState; + } finally { + taskDone.abort(); + } + } + async viewState( input: Input, tags: Tags = [], transfer: ArTransfer = emptyTransfer, caller?: string ): Promise> { - this.logger.info('View state for', this._contractTxId); - return await this.callContract(input, 'view', caller, undefined, tags, transfer); + this.logger.info("View state for", this._contractTxId); + return await this.callContract(input, "view", caller, undefined, tags, transfer); } async viewStateForTx( @@ -220,7 +275,7 @@ export class HandlerBasedContract implements Contract { interactionTx: GQLNodeInterface ): Promise> { this.logger.info(`View state for ${this._contractTxId}`); - return await this.doApplyInputOnTx(input, interactionTx, 'view'); + return await this.doApplyInputOnTx(input, interactionTx, "view"); } async dryWrite( @@ -230,20 +285,20 @@ export class HandlerBasedContract implements Contract { transfer?: ArTransfer, vrf?: boolean ): Promise> { - this.logger.info('Dry-write for', this._contractTxId); - return await this.callContract(input, 'write', caller, undefined, tags, transfer, undefined, vrf); + this.logger.info("Dry-write for", this._contractTxId); + return await this.callContract(input, "write", caller, undefined, tags, transfer, undefined, vrf); } async applyInput(input: Input, transaction: GQLNodeInterface): Promise> { this.logger.info(`Apply-input from transaction ${transaction.id} for ${this._contractTxId}`); - return await this.doApplyInputOnTx(input, transaction, 'write'); + return await this.doApplyInputOnTx(input, transaction, "write"); } async writeInteraction( input: Input, options?: WriteInteractionOptions ): Promise { - this.logger.info('Write interaction', { input, options }); + this.logger.info("Write interaction", { input, options }); if (!this._signature) { throw new Error("Wallet not connected. Use 'connect' method first."); } @@ -260,7 +315,7 @@ export class HandlerBasedContract implements Contract { const effectiveDisableBundling = options?.disableBundling === true; const effectiveReward = options?.reward; - const bundleInteraction = interactionsLoader.type() == 'warp' && !effectiveDisableBundling; + const bundleInteraction = interactionsLoader.type() == "warp" && !effectiveDisableBundling; this._signature.checkNonArweaveSigningAvailability(bundleInteraction); this._signature.checkBundlerSignerAvailability(bundleInteraction); @@ -270,11 +325,11 @@ export class HandlerBasedContract implements Contract { effectiveTransfer.target != emptyTransfer.target && effectiveTransfer.winstonQty != emptyTransfer.winstonQty ) { - throw new Error('Ar Transfers are not allowed for bundled interactions'); + throw new Error("Ar Transfers are not allowed for bundled interactions"); } - if (effectiveVrf && !bundleInteraction && environment === 'mainnet') { - throw new Error('Vrf generation is only available for bundle interaction'); + if (effectiveVrf && !bundleInteraction && environment === "mainnet") { + throw new Error("Vrf generation is only available for bundle interaction"); } if (!input) { @@ -294,24 +349,24 @@ export class HandlerBasedContract implements Contract { effectiveTransfer, effectiveStrict, false, - effectiveVrf && environment !== 'mainnet', + effectiveVrf && environment !== "mainnet", effectiveReward ); const response = await arweave.transactions.post(interactionTx); if (response.status !== 200) { - this.logger.error('Error while posting transaction', response); + this.logger.error("Error while posting transaction", response); return null; } if (this._evaluationOptions.waitForConfirmation) { - this.logger.info('Waiting for confirmation of', interactionTx.id); + this.logger.info("Waiting for confirmation of", interactionTx.id); const benchmark = Benchmark.measure(); await this.waitForConfirmation(interactionTx.id); - this.logger.info('Transaction confirmed after', benchmark.elapsed()); + this.logger.info("Transaction confirmed after", benchmark.elapsed()); } - if (this.warp.environment == 'local' && this._evaluationOptions.mineArLocalBlocks) { + if (this.warp.environment == "local" && this._evaluationOptions.mineArLocalBlocks) { await this.warp.testing.mineBlock(); } @@ -327,7 +382,7 @@ export class HandlerBasedContract implements Contract { vrf: boolean; } ): Promise { - this.logger.info('Bundle interaction input', input); + this.logger.info("Bundle interaction input", input); const interactionDataItem = await this.createInteractionDataItem( input, @@ -340,10 +395,10 @@ export class HandlerBasedContract implements Contract { const response = this._warpFetchWrapper.fetch( `${stripTrailingSlash(this._evaluationOptions.sequencerUrl)}/gateway/v2/sequencer/register`, { - method: 'POST', + method: "POST", headers: { - 'Content-Type': 'application/octet-stream', - Accept: 'application/json' + "Content-Type": "application/octet-stream", + Accept: "application/json" }, body: interactionDataItem.getRaw() } @@ -371,13 +426,13 @@ export class HandlerBasedContract implements Contract { } if (vrf) { - tags.push(new Tag(WARP_TAGS.REQUEST_VRF, 'true')); + tags.push(new Tag(WARP_TAGS.REQUEST_VRF, "true")); } const interactionTags = createInteractionTagsList( this._contractTxId, input, - this.warp.environment === 'testnet', + this.warp.environment === "testnet", tags ); @@ -424,7 +479,7 @@ export class HandlerBasedContract implements Contract { } if (vrf) { - tags.push(new Tag(WARP_TAGS.REQUEST_VRF, 'true')); + tags.push(new Tag(WARP_TAGS.REQUEST_VRF, "true")); } const interactionTx = await createInteractionTx( @@ -436,7 +491,7 @@ export class HandlerBasedContract implements Contract { transfer.target, transfer.winstonQty, bundle, - this.warp.environment === 'testnet', + this.warp.environment === "testnet", reward ); @@ -457,12 +512,12 @@ export class HandlerBasedContract implements Contract { ) { const { arweave } = this.warp; const caller = - this._signature.type == 'arweave' + this._signature.type == "arweave" ? await arweave.wallets.ownerToAddress(owner) : await this._signature.getAddress(); - const handlerResult = await this.callContract(input, 'write', caller, undefined, tags, transfer, strict, vrf); - if (handlerResult.type !== 'ok') { - throw Error('Cannot create interaction: ' + JSON.stringify(handlerResult.error || handlerResult.errorMessage)); + const handlerResult = await this.callContract(input, "write", caller, undefined, tags, transfer, strict, vrf); + if (handlerResult.type !== "ok") { + throw Error("Cannot create interaction: " + JSON.stringify(handlerResult.error || handlerResult.errorMessage)); } } @@ -481,7 +536,7 @@ export class HandlerBasedContract implements Contract { setEvaluationOptions(options: Partial): Contract { if (!this.isRoot()) { - throw new Error('Evaluation options can be set only for the root contract'); + throw new Error("Evaluation options can be set only for the root contract"); } this._evaluationOptions = { ...this._evaluationOptions, @@ -522,23 +577,23 @@ export class HandlerBasedContract implements Contract { } cachedState = cachedState || (await stateEvaluator.latestAvailableState(contractTxId, upToSortKey)); - this.logger.debug('cache lookup', benchmark.elapsed()); + this.logger.debug("cache lookup", benchmark.elapsed()); benchmark.reset(); const evolvedSrcTxId = Evolve.evolvedSrcTxId(cachedState?.cachedValue?.state); let handler, contractDefinition, contractEvaluationOptions, remoteState; let sortedInteractions = interactions || []; - this.logger.debug('Cached state', cachedState, upToSortKey); + this.logger.debug("Cached state", cachedState, upToSortKey); if (cachedState && cachedState.sortKey == upToSortKey) { - this.logger.debug('State fully cached, not loading interactions.'); + this.logger.debug("State fully cached, not loading interactions."); if (forceDefinitionLoad || evolvedSrcTxId || interactions?.length) { contractDefinition = await definitionLoader.load(contractTxId, evolvedSrcTxId); + contractEvaluationOptions = this.resolveEvaluationOptions(contractDefinition.manifest?.evaluationOptions); + this.warp.executorFactory.checkWhiteListContractSources(contractDefinition, contractEvaluationOptions); if (interactions?.length) { - sortedInteractions = (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map( - (i) => i.node - ); + sortedInteractions = await this.getSortedInteractions(interactions); } } } else { @@ -585,7 +640,7 @@ export class HandlerBasedContract implements Contract { sortedInteractions = sortedInteractions.filter((i) => i.sortKey.localeCompare(upToSortKey) <= 0); } - this.logger.debug('contract and interactions load', benchmark.elapsed()); + this.logger.debug("contract and interactions load", benchmark.elapsed()); if (this.isRoot() && sortedInteractions.length) { // note: if the root contract has zero interactions, it still should be safe // - as no other contracts will be called. @@ -620,6 +675,10 @@ export class HandlerBasedContract implements Contract { }; } + private async getSortedInteractions(interactions: GQLNodeInterface[]) { + return (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map((i) => i.node); + } + private resolveEvaluationOptions(rootManifestEvalOptions: EvaluationOptions) { if (this.isRoot()) { this._eoEvaluator = new EvaluationOptionsEvaluator(this.evaluationOptions(), rootManifestEvalOptions); @@ -676,7 +735,7 @@ export class HandlerBasedContract implements Contract { private maybeResetRootContract() { if (this.isRoot()) { - this.logger.debug('Clearing call stack for the root contract'); + this.logger.debug("Clearing call stack for the root contract"); this._callStack = new ContractCallRecord(this.txId(), 0); this._rootSortKey = null; this.warp.interactionsLoader.clearCache(); @@ -697,17 +756,17 @@ export class HandlerBasedContract implements Contract { vrf = false, sign = true ): Promise> { - this.logger.info('Call contract input', input); + this.logger.info("Call contract input", input); this.maybeResetRootContract(); if (!this._signature) { - this.logger.warn('Wallet not set.'); + this.logger.warn("Wallet not set."); } const { arweave, stateEvaluator } = this.warp; // create execution context let executionContext = await this.createExecutionContext(this._contractTxId, sortKey, true); const currentBlockData = - this.warp.environment == 'mainnet' && !(this.warp.interactionsLoader.type() === 'arweave') + this.warp.environment == "mainnet" && !(this.warp.interactionsLoader.type() === "arweave") ? await this._arweaveWrapper.warpGwBlock() : await arweave.blocks.getCurrent(); @@ -718,10 +777,10 @@ export class HandlerBasedContract implements Contract { } else if (this._signature) { effectiveCaller = await this._signature.getAddress(); } else { - effectiveCaller = ''; + effectiveCaller = ""; } - this.logger.info('effectiveCaller', effectiveCaller); + this.logger.info("effectiveCaller", effectiveCaller); executionContext = { ...executionContext, caller: effectiveCaller @@ -729,7 +788,7 @@ export class HandlerBasedContract implements Contract { // eval current state const evalStateResult = await stateEvaluator.eval(executionContext); - this.logger.info('Current state', evalStateResult.cachedValue.state); + this.logger.info("Current state", evalStateResult.cachedValue.state); // create interaction transaction const interaction: ContractInteraction = { @@ -738,7 +797,7 @@ export class HandlerBasedContract implements Contract { interactionType }; - this.logger.debug('interaction', interaction); + this.logger.debug("interaction", interaction); const tx = await createInteractionTx( arweave, sign ? this._signature?.signer : undefined, @@ -748,11 +807,11 @@ export class HandlerBasedContract implements Contract { transfer.target, transfer.winstonQty, true, - this.warp.environment === 'testnet' + this.warp.environment === "testnet" ); const dummyTx = createDummyTx(tx, executionContext.caller, currentBlockData); - this.logger.debug('Creating sortKey for', { + this.logger.debug("Creating sortKey for", { blockId: dummyTx.block.id, id: dummyTx.id, height: dummyTx.block.height @@ -761,11 +820,11 @@ export class HandlerBasedContract implements Contract { dummyTx.sortKey = await this._sorter.createSortKey(dummyTx.block.id, dummyTx.id, dummyTx.block.height, true); dummyTx.strict = strict; if (vrf) { - const vrfPlugin = this.warp.maybeLoadPlugin('vrf'); + const vrfPlugin = this.warp.maybeLoadPlugin("vrf"); if (vrfPlugin) { dummyTx.vrf = vrfPlugin.process().generateMockVrf(dummyTx.sortKey); } else { - this.logger.warn('Cannot generate mock vrf for interaction - no "warp-contracts-plugin-vrf" attached!'); + this.logger.warn("Cannot generate mock vrf for interaction - no \"warp-contracts-plugin-vrf\" attached!"); } } @@ -778,8 +837,8 @@ export class HandlerBasedContract implements Contract { evalStateResult.cachedValue ); - if (handleResult.type !== 'ok') { - this.logger.fatal('Error while interacting with contract', { + if (handleResult.type !== "ok") { + this.logger.fatal("Error while interacting with contract", { type: handleResult.type, error: handleResult.errorMessage }); @@ -808,7 +867,7 @@ export class HandlerBasedContract implements Contract { this.interactionState().update(this.txId(), evalStateResult.cachedValue, interactionTx.sortKey); } - this.logger.debug('callContractForTx - evalStateResult', { + this.logger.debug("callContractForTx - evalStateResult", { result: evalStateResult.cachedValue.state, txId: this._contractTxId }); @@ -855,7 +914,7 @@ export class HandlerBasedContract implements Contract { cacheHit: false, outputState: this._evaluationOptions.stackTrace.saveState ? result.state : undefined, executionTime: benchmark.elapsed(true) as number, - valid: result.type === 'ok', + valid: result.type === "ok", errorMessage: result.errorMessage, gasUsed: result.gasUsed }); @@ -882,9 +941,9 @@ export class HandlerBasedContract implements Contract { async stateHash(state: State): Promise { const jsonState = stringify(state); - const hash = await Crypto.subtle.digest('SHA-256', Buffer.from(jsonState, 'utf-8')); + const hash = await Crypto.subtle.digest("SHA-256", Buffer.from(jsonState, "utf-8")); - return Buffer.from(hash).toString('hex'); + return Buffer.from(hash).toString("hex"); } // eslint-disable-next-line @typescript-eslint/no-explicit-any -- params can be anything @@ -913,7 +972,7 @@ export class HandlerBasedContract implements Contract { } async evolve(newSrcTxId: string, options?: WriteInteractionOptions): Promise { - return await this.writeInteraction({ function: 'evolve', value: newSrcTxId }, options); + return await this.writeInteraction({ function: "evolve", value: newSrcTxId }, options); } get rootSortKey(): string { @@ -1026,7 +1085,7 @@ export class HandlerBasedContract implements Contract { ) { const handlerResult = await this.callContract( input, - 'write', + "write", undefined, undefined, tags, @@ -1036,19 +1095,19 @@ export class HandlerBasedContract implements Contract { false ); - if (strict && handlerResult.type !== 'ok') { - throw Error('Cannot create interaction: ' + JSON.stringify(handlerResult.error || handlerResult.errorMessage)); + if (strict && handlerResult.type !== "ok") { + throw Error("Cannot create interaction: " + JSON.stringify(handlerResult.error || handlerResult.errorMessage)); } const callStack: ContractCallRecord = this.getCallStack(); const innerWrites = this._innerWritesEvaluator.eval(callStack); - this.logger.debug('Input', input); - this.logger.debug('Callstack', callStack.print()); + this.logger.debug("Input", input); + this.logger.debug("Callstack", callStack.print()); innerWrites.forEach((contractTxId) => { tags.push(new Tag(WARP_TAGS.INTERACT_WRITE, contractTxId)); }); - this.logger.debug('Tags with inner calls', tags); + this.logger.debug("Tags with inner calls", tags); } clearChildren(): void { diff --git a/src/core/modules/StateEvaluator.ts b/src/core/modules/StateEvaluator.ts index 0785a810..98fe638d 100644 --- a/src/core/modules/StateEvaluator.ts +++ b/src/core/modules/StateEvaluator.ts @@ -152,6 +152,8 @@ export class DefaultEvaluationOptions implements EvaluationOptions { useConstructor = false; whitelistSources = []; + + transactionsPagesPerBatch = null; } // an interface for the contract EvaluationOptions - can be used to change the behaviour of some features. @@ -242,6 +244,8 @@ export interface EvaluationOptions { remoteStateSyncSource: string; whitelistSources: string[]; + + transactionsPagesPerBatch: number; } // https://github.com/nodejs/node/issues/40678 duh... diff --git a/src/core/modules/impl/ArweaveGQLTxsFetcher.ts b/src/core/modules/impl/ArweaveGQLTxsFetcher.ts index ff5dcc5d..f38fb586 100644 --- a/src/core/modules/impl/ArweaveGQLTxsFetcher.ts +++ b/src/core/modules/impl/ArweaveGQLTxsFetcher.ts @@ -117,7 +117,6 @@ export class ArweaveGQLTxsFetcher { async transaction(transactionId: string): Promise { const response = await this.fetch(TRANSACTION_QUERY, { id: transactionId }); - return response.transaction; } @@ -138,10 +137,15 @@ export class ArweaveGQLTxsFetcher { return response.edges[0].node; } - async transactions(variables: ArweaveTransactionQuery): Promise { + async transactions(variables: ArweaveTransactionQuery, pagesPerBatch: number): Promise { let pageResult = (await this.fetch(TRANSACTIONS_QUERY, variables)).transactions; const edges: GQLEdgeInterface[] = [...pageResult.edges]; - while (pageResult.pageInfo.hasNextPage) { + let pagesLoaded = 1; + if (pagesLoaded >= pagesPerBatch) { + return edges; + } + + while (pageResult.pageInfo.hasNextPage && pagesLoaded < pagesPerBatch) { const cursor = pageResult.edges[MAX_REQUEST - 1].cursor; const newVariables = { @@ -151,6 +155,7 @@ export class ArweaveGQLTxsFetcher { pageResult = (await this.fetch(TRANSACTIONS_QUERY, newVariables)).transactions; edges.push(...pageResult.edges); + pagesLoaded++; } return edges; } diff --git a/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts b/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts index e60ba210..fe21db4b 100644 --- a/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts +++ b/src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts @@ -62,6 +62,9 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade evaluationOptions?: EvaluationOptions ): Promise { this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey }); + if (evaluationOptions?.transactionsPagesPerBatch) { + throw new Error(`Loading in batches not yet implemented for ${ArweaveGatewayBundledInteractionLoader.name}`); + } const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey) || 0; const toBlockHeight = this.sorter.extractBlockHeight(toSortKey) || (await this.currentBlockHeight()); @@ -89,7 +92,7 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade }; const loadingBenchmark = Benchmark.measure(); - let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery); + let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery, Number.MAX_SAFE_INTEGER); if (evaluationOptions.internalWrites) { interactions = await this.appendInternalWriteInteractions( @@ -242,7 +245,7 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade }, first: MAX_REQUEST }; - const innerWritesInteractions = await this.arweaveFetcher.transactions(innerWritesVariables); + const innerWritesInteractions = await this.arweaveFetcher.transactions(innerWritesVariables, Number.MAX_SAFE_INTEGER); this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length); interactions = interactions.concat(innerWritesInteractions); return interactions; diff --git a/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts b/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts index 73defff6..c8538919 100644 --- a/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts +++ b/src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts @@ -39,7 +39,9 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey }); const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey); - const toBlockHeight = this.sorter.extractBlockHeight(toSortKey); + let toBlockHeight = this.sorter.extractBlockHeight(toSortKey); + const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER; + this.logger.debug('Pages per batch', pagesPerBatch); const mainTransactionsQuery: ArweaveTransactionQuery = { tags: [ @@ -60,12 +62,29 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { }; const loadingBenchmark = Benchmark.measure(); - let interactions = (await this.arweaveTransactionQuery.transactions(mainTransactionsQuery)).filter( + let interactions = (await this.arweaveTransactionQuery.transactions(mainTransactionsQuery, pagesPerBatch)).filter( bundledTxsFilter ); loadingBenchmark.stop(); + if (evaluationOptions?.transactionsPagesPerBatch && interactions.length > 0) { + interactions = await this.sorter.sort(interactions); + toBlockHeight = interactions[interactions.length - 1].node.block.height; + } if (evaluationOptions.internalWrites) { + const pagesPerBatchIw = (function () { + if (evaluationOptions?.transactionsPagesPerBatch) { + if (interactions.length > 0) { + // note: the limit in this case is the block height of the last 'direct' interaction + return Number.MAX_SAFE_INTEGER; + } else { + return evaluationOptions?.transactionsPagesPerBatch; + } + } else { + return Number.MAX_SAFE_INTEGER; + } + })(); + const innerWritesVariables: ArweaveTransactionQuery = { tags: [ { @@ -79,9 +98,9 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader { }, first: MAX_REQUEST }; - const innerWritesInteractions = (await this.arweaveTransactionQuery.transactions(innerWritesVariables)).filter( - bundledTxsFilter - ); + const innerWritesInteractions = ( + await this.arweaveTransactionQuery.transactions(innerWritesVariables, pagesPerBatchIw) + ).filter(bundledTxsFilter); this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length); interactions = interactions.concat(innerWritesInteractions); diff --git a/src/core/modules/impl/WarpGatewayInteractionsLoader.ts b/src/core/modules/impl/WarpGatewayInteractionsLoader.ts index b7aa0e8d..a1731755 100644 --- a/src/core/modules/impl/WarpGatewayInteractionsLoader.ts +++ b/src/core/modules/impl/WarpGatewayInteractionsLoader.ts @@ -73,6 +73,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { let page = 0; let limit = 0; let items = 0; + const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER; const effectiveSourceType = evaluationOptions ? evaluationOptions.sourceType : this.source; const benchmarkTotalTime = Benchmark.measure(); @@ -109,7 +110,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader { items = response.paging.items; this.logger.debug(`Loaded interactions length: ${interactions.length}, from: ${fromSortKey}, to: ${toSortKey}`); - } while (items == limit); // note: items < limit means that we're on the last page + } while (items == limit && page < pagesPerBatch); // note: items < limit means that we're on the last page this.logger.debug('All loaded interactions:', { from: fromSortKey,