Skip to content

Commit

Permalink
feat: read state in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ppedziwiatr committed Dec 8, 2023
1 parent eb044fe commit 6b12c00
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 118 deletions.
7 changes: 7 additions & 0 deletions src/contract/Contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ export interface Contract<State = unknown> {
interactions?: GQLNodeInterface[]
): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

/**
* Reads state in batches - i.e. it first loads max. 5k interactions, evaluates them, then reads another 5k..and so on.
*
* Consider this as an experimental feature
*/
readStateBatch(batchSize: number, signal: AbortSignal): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

readStateFor(sortKey: string, interactions: GQLNodeInterface[]): Promise<SortKeyCacheResult<EvalStateResult<State>>>;

/**
Expand Down
6 changes: 4 additions & 2 deletions src/contract/EvaluationOptionsEvaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ export class EvaluationOptionsEvaluator {
remoteStateSyncSource: () => this.rootOptions['remoteStateSyncSource'],
useKVStorage: (foreignOptions) => foreignOptions['useKVStorage'],
useConstructor: (foreignOptions) => foreignOptions['useConstructor'],
whitelistSources: () => this.rootOptions['whitelistSources']
whitelistSources: () => this.rootOptions['whitelistSources'],
transactionsPagesPerBatch: () => this.rootOptions['transactionsPagesPerBatch']
};

private readonly notConflictingEvaluationOptions: (keyof EvaluationOptions)[] = [
'useKVStorage',
'sourceType',
'useConstructor'
'useConstructor',
'transactionsPagesPerBatch'
];

/**
Expand Down
269 changes: 164 additions & 105 deletions src/contract/HandlerBasedContract.ts

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/core/modules/StateEvaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ export class DefaultEvaluationOptions implements EvaluationOptions {
useConstructor = false;

whitelistSources = [];

transactionsPagesPerBatch = null;
}

// an interface for the contract EvaluationOptions - can be used to change the behaviour of some features.
Expand Down Expand Up @@ -242,6 +244,8 @@ export interface EvaluationOptions {
remoteStateSyncSource: string;

whitelistSources: string[];

transactionsPagesPerBatch: number;
}

// https://github.com/nodejs/node/issues/40678 duh...
Expand Down
11 changes: 8 additions & 3 deletions src/core/modules/impl/ArweaveGQLTxsFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ export class ArweaveGQLTxsFetcher {

async transaction(transactionId: string): Promise<GQLTransaction> {
const response = await this.fetch<GQLTransactionResponse>(TRANSACTION_QUERY, { id: transactionId });

return response.transaction;
}

Expand All @@ -138,10 +137,15 @@ export class ArweaveGQLTxsFetcher {
return response.edges[0].node;
}

async transactions(variables: ArweaveTransactionQuery): Promise<GQLEdgeInterface[]> {
async transactions(variables: ArweaveTransactionQuery, pagesPerBatch: number): Promise<GQLEdgeInterface[]> {
let pageResult = (await this.fetch<GQLResultInterface['data']>(TRANSACTIONS_QUERY, variables)).transactions;
const edges: GQLEdgeInterface[] = [...pageResult.edges];
while (pageResult.pageInfo.hasNextPage) {
let pagesLoaded = 1;
if (pagesLoaded >= pagesPerBatch) {
return edges;
}

while (pageResult.pageInfo.hasNextPage && pagesLoaded < pagesPerBatch) {
const cursor = pageResult.edges[MAX_REQUEST - 1].cursor;

const newVariables = {
Expand All @@ -151,6 +155,7 @@ export class ArweaveGQLTxsFetcher {

pageResult = (await this.fetch<GQLResultInterface['data']>(TRANSACTIONS_QUERY, newVariables)).transactions;
edges.push(...pageResult.edges);
pagesLoaded++;
}
return edges;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade
evaluationOptions?: EvaluationOptions
): Promise<GQLNodeInterface[]> {
this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey });
if (evaluationOptions?.transactionsPagesPerBatch) {
throw new Error(`Loading in batches not yet implemented for ${ArweaveGatewayBundledInteractionLoader.name}`);
}

const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey) || 0;
const toBlockHeight = this.sorter.extractBlockHeight(toSortKey) || (await this.currentBlockHeight());
Expand Down Expand Up @@ -89,7 +92,7 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade
};

const loadingBenchmark = Benchmark.measure();
let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery);
let interactions = await this.arweaveFetcher.transactions(mainTransactionsQuery, Number.MAX_SAFE_INTEGER);

if (evaluationOptions.internalWrites) {
interactions = await this.appendInternalWriteInteractions(
Expand Down Expand Up @@ -242,7 +245,7 @@ export class ArweaveGatewayBundledInteractionLoader implements InteractionsLoade
},
first: MAX_REQUEST
};
const innerWritesInteractions = await this.arweaveFetcher.transactions(innerWritesVariables);
const innerWritesInteractions = await this.arweaveFetcher.transactions(innerWritesVariables, Number.MAX_SAFE_INTEGER);
this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length);
interactions = interactions.concat(innerWritesInteractions);
return interactions;
Expand Down
29 changes: 24 additions & 5 deletions src/core/modules/impl/ArweaveGatewayInteractionsLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader {
this.logger.debug('Loading interactions for', { contractId, fromSortKey, toSortKey });

const fromBlockHeight = this.sorter.extractBlockHeight(fromSortKey);
const toBlockHeight = this.sorter.extractBlockHeight(toSortKey);
let toBlockHeight = this.sorter.extractBlockHeight(toSortKey);
const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER;
this.logger.debug('Pages per batch', pagesPerBatch);

const mainTransactionsQuery: ArweaveTransactionQuery = {
tags: [
Expand All @@ -60,12 +62,29 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader {
};

const loadingBenchmark = Benchmark.measure();
let interactions = (await this.arweaveTransactionQuery.transactions(mainTransactionsQuery)).filter(
let interactions = (await this.arweaveTransactionQuery.transactions(mainTransactionsQuery, pagesPerBatch)).filter(
bundledTxsFilter
);
loadingBenchmark.stop();
if (evaluationOptions?.transactionsPagesPerBatch && interactions.length > 0) {
interactions = await this.sorter.sort(interactions);
toBlockHeight = interactions[interactions.length - 1].node.block.height;
}

if (evaluationOptions.internalWrites) {
const pagesPerBatchIw = (function () {
if (evaluationOptions?.transactionsPagesPerBatch) {
if (interactions.length > 0) {
// note: the limit in this case is the block height of the last 'direct' interaction
return Number.MAX_SAFE_INTEGER;
} else {
return evaluationOptions?.transactionsPagesPerBatch;
}
} else {
return Number.MAX_SAFE_INTEGER;
}
})();

const innerWritesVariables: ArweaveTransactionQuery = {
tags: [
{
Expand All @@ -79,9 +98,9 @@ export class ArweaveGatewayInteractionsLoader implements InteractionsLoader {
},
first: MAX_REQUEST
};
const innerWritesInteractions = (await this.arweaveTransactionQuery.transactions(innerWritesVariables)).filter(
bundledTxsFilter
);
const innerWritesInteractions = (
await this.arweaveTransactionQuery.transactions(innerWritesVariables, pagesPerBatchIw)
).filter(bundledTxsFilter);

this.logger.debug('Inner writes interactions length:', innerWritesInteractions.length);
interactions = interactions.concat(innerWritesInteractions);
Expand Down
3 changes: 2 additions & 1 deletion src/core/modules/impl/WarpGatewayInteractionsLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader {
let page = 0;
let limit = 0;
let items = 0;
const pagesPerBatch = evaluationOptions?.transactionsPagesPerBatch || Number.MAX_SAFE_INTEGER;

const effectiveSourceType = evaluationOptions ? evaluationOptions.sourceType : this.source;
const benchmarkTotalTime = Benchmark.measure();
Expand Down Expand Up @@ -109,7 +110,7 @@ export class WarpGatewayInteractionsLoader implements InteractionsLoader {
items = response.paging.items;

this.logger.debug(`Loaded interactions length: ${interactions.length}, from: ${fromSortKey}, to: ${toSortKey}`);
} while (items == limit); // note: items < limit means that we're on the last page
} while (items == limit && page < pagesPerBatch); // note: items < limit means that we're on the last page

this.logger.debug('All loaded interactions:', {
from: fromSortKey,
Expand Down

0 comments on commit 6b12c00

Please sign in to comment.