Skip to content

Commit

Permalink
feat: read state in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ppedziwiatr committed Dec 8, 2023
1 parent eb044fe commit c9a9baf
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 18 deletions.
7 changes: 7 additions & 0 deletions src/contract/Contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ export interface Contract<State = unknown> {
interactions?: GQLNodeInterface[]
): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

/**
* Reads state in batches - i.e. it first loads max. 5k interactions, evaluates them, then reads another 5k..and so on.
*
* Consider this as an experimental feature
*/
readStateBatch(batchSize: number): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

readStateFor(sortKey: string, interactions: GQLNodeInterface[]): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

/**
Expand Down
6 changes: 4 additions & 2 deletions src/contract/EvaluationOptionsEvaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ export class EvaluationOptionsEvaluator {
remoteStateSyncSource: () => this.rootOptions['remoteStateSyncSource'],
useKVStorage: (foreignOptions) => foreignOptions['useKVStorage'],
useConstructor: (foreignOptions) => foreignOptions['useConstructor'],
whitelistSources: () => this.rootOptions['whitelistSources']
whitelistSources: () => this.rootOptions['whitelistSources'],
transactionsPagesPerBatch: () => this.rootOptions['transactionsPagesPerBatch']
};

private readonly notConflictingEvaluationOptions: (keyof EvaluationOptions)[] = [
'useKVStorage',
'sourceType',
'useConstructor'
'useConstructor',
'transactionsPagesPerBatch'
];

/**
Expand Down
40 changes: 35 additions & 5 deletions src/contract/HandlerBasedContract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import {
InteractionResult,
InteractionType
} from '../core/modules/impl/HandlerExecutorFactory';
import { LexicographicalInteractionsSorter } from '../core/modules/impl/LexicographicalInteractionsSorter';
import {
genesisSortKey,
LexicographicalInteractionsSorter
} from '../core/modules/impl/LexicographicalInteractionsSorter';
import { InteractionsSorter } from '../core/modules/InteractionsSorter';
import { DefaultEvaluationOptions, EvalStateResult, EvaluationOptions } from '../core/modules/StateEvaluator';
import { WARP_TAGS } from '../core/KnownTags';
Expand Down Expand Up @@ -38,7 +41,7 @@ import { Mutex } from 'async-mutex';
import { Tag, Transaction, TransactionStatusResponse } from '../utils/types/arweave-types';
import { InteractionState } from './states/InteractionState';
import { ContractInteractionState } from './states/ContractInteractionState';
import { Crypto } from 'warp-isomorphic';
import { Buffer, Crypto } from 'warp-isomorphic';
import { VrfPluginFunctions } from '../core/WarpPlugin';
import { createData, tagsExceedLimit, DataItem, Signer } from 'warp-arbundles';

Expand Down Expand Up @@ -205,6 +208,29 @@ export class HandlerBasedContract<State> implements Contract<State> {
return this.readState(sortKey, undefined, interactions);
}

async readStateBatch(pagesPerBatch = 1): Promise<SortKeyCacheResult<EvalStateResult<State>>> {
if (!this.isRoot()) {
throw new Error('readStateBatch is only allowed for root contract calls');
}

const contractTxId = this._contractTxId;
const { interactionsLoader, stateEvaluator } = this.warp;
let cachedState = await stateEvaluator.latestAvailableState<State>(contractTxId);

const evaluationOptions = {
...this._evaluationOptions,
transactionsPagesInBatch: pagesPerBatch
};

let interactions: GQLNodeInterface[];
do {
interactions = await interactionsLoader.load(contractTxId, cachedState?.sortKey, undefined, evaluationOptions);
cachedState = await this.readStateFor(cachedState?.sortKey || genesisSortKey, interactions);
} while (interactions.length > 0);

return cachedState;
}

async viewState<Input, View>(
input: Input,
tags: Tags = [],
Expand Down Expand Up @@ -535,10 +561,10 @@ export class HandlerBasedContract<State> implements Contract<State> {
this.logger.debug('State fully cached, not loading interactions.');
if (forceDefinitionLoad || evolvedSrcTxId || interactions?.length) {
contractDefinition = await definitionLoader.load<State>(contractTxId, evolvedSrcTxId);
contractEvaluationOptions = this.resolveEvaluationOptions(contractDefinition.manifest?.evaluationOptions);
this.warp.executorFactory.checkWhiteListContractSources(contractDefinition, contractEvaluationOptions);
if (interactions?.length) {
sortedInteractions = (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map(
(i) => i.node
);
sortedInteractions = await this.getSortedInteractions(interactions);
}
}
} else {
Expand Down Expand Up @@ -620,6 +646,10 @@ export class HandlerBasedContract<State> implements Contract<State> {
};
}

private async getSortedInteractions(interactions: GQLNodeInterface[]) {
return (await this._sorter.sort(interactions.map((i) => ({ node: i, cursor: null })))).map((i) => i.node);
}

private resolveEvaluationOptions(rootManifestEvalOptions: EvaluationOptions) {
if (this.isRoot()) {
this._eoEvaluator = new EvaluationOptionsEvaluator(this.evaluationOptions(), rootManifestEvalOptions);
Expand Down
4 changes: 4 additions & 0 deletions src/core/modules/StateEvaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ export class DefaultEvaluationOptions implements EvaluationOptions {
useConstructor = false;

whitelistSources = [];

transactionsPagesPerBatch = null;
}

// an interface for the contract EvaluationOptions - can be used to change the behaviour of some features.
Expand Down Expand Up @@ -242,6 +244,8 @@ export interface EvaluationOptions {
remoteStateSyncSource: string;

whitelistSources: string[];

transactionsPagesPerBatch: number;
}

// https://github.com/nodejs/node/issues/40678 duh...
Expand Down
11 changes: 8 additions & 3 deletions src/core/modules/impl/ArweaveGQLTxsFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ export class ArweaveGQLTxsFetcher {

async transaction(transactionId: string): Promise<GQLTransaction> {
const response = await this.fetch<GQLTransactionResponse>(TRANSACTION_QUERY, { id: transactionId });

return response.transaction;
}

Expand All @@ -138,10 +137,15 @@ export class ArweaveGQLTxsFetcher {
return response.edges[0].node;
}

async transactions(variables: ArweaveTransactionQuery): Promise<GQLEdgeInterface[]> {
async transactions(variables: ArweaveTransactionQuery, pagesPerBatch: number): Promise<GQLEdgeInterface[]> {
let pageResult = (await this.fetch<GQLResultInterface['data']>(TRANSACTIONS_QUERY, variables)).transactions;
const edges: GQLEdgeInterface[] = [...pageResult.edges];
while (pageResult.pageInfo.hasNextPage) {
let pagesLoaded = 1;
if (pagesLoaded >= pagesPerBatch) {
return edges;
}

while (pageResult.pageInfo.hasNextPage && pagesLoaded < pagesPerBatch) {
const cursor = pageResult.edges[MAX_REQUEST - 1].cursor;

const newVariables = {
Expand All @@ -151,6 +155,7 @@ export class ArweaveGQLTxsFetcher {

pageResult = (await this.fetch<GQLResultInterface['data']>(TRANSACTIONS_QUERY, newVariables)).transactions;
edges.push(...pageResult.edges);
pagesLoaded++;
}
return edges;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade
evaluationOptions?: EvaluationOptions
): Promise<GQLNodeInterface[]> {
this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey });
if (evaluationOptions?.transactionsPagesPerBatch) {
throw new Error(`Loading in batches not yet implemented for ${ArweaveGatewayBundledInteractionLoader.name}`);
}

const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey) || 0;
const toBlockHeight = this.sorter.extractBlockHeight(toSortKey) || (await this.currentBlockHeight());
Expand Down Expand Up @@ -89,7 +92,7 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade
};

const loadingBenchmark = Benchmark.measure();
let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery);
let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery, Number.MAX_SAFE_INTEGER);

if (evaluationOptions.internalWrites) {
interactions = await this.appendInternalWriteInteractions(
Expand Down Expand Up @@ -242,7 +245,7 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade
},
first: MAX_REQUEST
};
const innerWritesInteractions = await this.arweaveFetcher.transactions(innerWritesVariables);
const innerWritesInteractions = await this.arweaveFetcher.transactions(innerWritesVariables, Number.MAX_SAFE_INTEGER);

Check failure on line 248 in src/core/modules/impl/ArweaveGatewayBundledInteractionLoader.ts

View workflow job for this annotation

GitHub Actions / build

Replace `innerWritesVariables,·Number.MAX_SAFE_INTEGER` with `⏎······innerWritesVariables,⏎······Number.MAX_SAFE_INTEGER⏎····`
this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length);
interactions = interactions.concat(innerWritesInteractions);
return interactions;
Expand Down
29 changes: 24 additions & 5 deletions src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader {
this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey });

const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey);
const toBlockHeight = this.sorter.extractBlockHeight(toSortKey);
let toBlockHeight = this.sorter.extractBlockHeight(toSortKey);
const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER;
this.logger.debug('Pages per batch', pagesPerBatch);

const mainTransactionsQuery: ArweaveTransactionQuery = {
tags: [
Expand All @@ -60,12 +62,29 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader {
};

const loadingBenchmark = Benchmark.measure();
let interactions = (await this.arweaveTransactionQuery.transactions(mainTransactionsQuery)).filter(
let interactions = (await this.arweaveTransactionQuery.transactions(mainTransactionsQuery, pagesPerBatch)).filter(
bundledTxsFilter
);
loadingBenchmark.stop();
if (evaluationOptions?.transactionsPagesPerBatch && interactions.length > 0) {
interactions = await this.sorter.sort(interactions);
toBlockHeight = interactions[interactions.length - 1].node.block.height;
}

if (evaluationOptions.internalWrites) {
const pagesPerBatchIw = (function () {
if (evaluationOptions?.transactionsPagesPerBatch) {
if (interactions.length > 0) {
// note: the limit in this case is the block height of the last 'direct' interaction
return Number.MAX_SAFE_INTEGER;
} else {
return evaluationOptions?.transactionsPagesPerBatch;
}
} else {
return Number.MAX_SAFE_INTEGER;
}
})();

const innerWritesVariables: ArweaveTransactionQuery = {
tags: [
{
Expand All @@ -79,9 +98,9 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader {
},
first: MAX_REQUEST
};
const innerWritesInteractions = (await this.arweaveTransactionQuery.transactions(innerWritesVariables)).filter(
bundledTxsFilter
);
const innerWritesInteractions = (
await this.arweaveTransactionQuery.transactions(innerWritesVariables, pagesPerBatchIw)
).filter(bundledTxsFilter);

this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length);
interactions = interactions.concat(innerWritesInteractions);
Expand Down
3 changes: 2 additions & 1 deletion src/core/modules/impl/WarpGatewayInteractionsLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader {
let page = 0;
let limit = 0;
let items = 0;
const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER;

const effectiveSourceType = evaluationOptions ? evaluationOptions.sourceType : this.source;
const benchmarkTotalTime = Benchmark.measure();
Expand Down Expand Up @@ -109,7 +110,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader {
items = response.paging.items;

this.logger.debug(`Loaded interactions length: ${interactions.length}, from: ${fromSortKey}, to: ${toSortKey}`);
} while (items == limit); // note: items < limit means that we're on the last page
} while (items == limit && page < pagesPerBatch); // note: items < limit means that we're on the last page

this.logger.debug('All loaded interactions:', {
from: fromSortKey,
Expand Down

0 comments on commit c9a9baf

Please sign in to comment.