Skip to content

Commit

Permalink
Update examples and example docs (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
huntharo authored Jun 29, 2023
1 parent 038e4d0 commit 2706cda
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 20 deletions.
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,23 @@ After installing the package, you might want to look at our [API Documentation](
### kinesis-retrier

1. Create Kinesis Data Stream using AWS Console or any other method
1. Default name is `kinesis-helpers-test-stream`
2. 1 shard is sufficient
3. 1 day retention is sufficient
4. No encryption is sufficient
5. On-demand throughput is sufficient
1. Example: `aws kinesis create-stream --stream-name kinesis-helpers-test-stream --shard-count 1`
2. Default name is `kinesis-helpers-test-stream`
3. 1 shard is sufficient
4. 1 day retention is sufficient
5. No encryption is sufficient
6. On-demand throughput is sufficient
2. `npm run example:kinesis-retrier`
1. If the stream name was changed: `KINESIS_STREAM_NAME=my-stream-name npm run example:kinesis-retrier`

### kinesis-background-writer

1. Create Kinesis Data Stream using AWS Console or any other method
1. Default name is `kinesis-helpers-test-stream`
2. 1 shard is sufficient
3. 1 day retention is sufficient
4. No encryption is sufficient
5. On-demand throughput is sufficient
1. Example: `aws kinesis create-stream --stream-name kinesis-helpers-test-stream --shard-count 1`
2. Default name is `kinesis-helpers-test-stream`
3. 1 shard is sufficient
4. 1 day retention is sufficient
5. No encryption is sufficient
6. On-demand throughput is sufficient
2. `npm run example:kinesis-background-writer`
1. If the stream name was changed: `KINESIS_STREAM_NAME=my-stream-name npm run example:kinesis-background-writer`
17 changes: 15 additions & 2 deletions examples/kinesis-background-writer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/* eslint-disable no-console */
import * as kinesis from '@aws-sdk/client-kinesis';
import { KinesisBackgroundWriter, KinesisRetrier } from '@shutterstock/kinesis-helpers';

const kinesisClient = new kinesis.KinesisClient({});
const { KINESIS_STREAM_NAME = 'kinesis-helpers-test-stream', RECORDS_TO_WRITE = '1000000' } =
const { KINESIS_STREAM_NAME = 'kinesis-helpers-test-stream', RECORDS_TO_WRITE = '40000' } =
process.env;
const RECORDS_TO_WRITE_NUM = parseInt(RECORDS_TO_WRITE, 10);
const RECORDS_PER_BATCH = 500;
Expand Down Expand Up @@ -35,15 +36,27 @@ async function main() {
});
}

// Log how many records we will be writing
console.log(`Writing ${RECORDS_TO_WRITE_NUM} records to ${KINESIS_STREAM_NAME}`);

// Send a whole lot of records so we start getting throttled within the batches
for (let i = 0; i < RECORDS_TO_WRITE_NUM; i += RECORDS_PER_BATCH) {
// Note how many records we are adding and how long it took
console.time(`Adding ${i} to ${i + RECORDS_PER_BATCH} records took`);
await backgroundWriter.send(new kinesis.PutRecordsCommand(records));
console.timeEnd(`Adding ${i} to ${i + RECORDS_PER_BATCH} records took`);
}

// Need to wait until the backgroundWriter is idle (has finished any pending requests)
console.time('Waiting for backgroundWriter to be idle');
await backgroundWriter.onIdle();
console.timeEnd('Waiting for backgroundWriter to be idle');

// TODO: If there were any errors, log them
// If there were any errors, log them
console.log('Number of errors:', backgroundWriter.errors.length);
backgroundWriter.errors.forEach((error) => {
console.error(error);
});
}

void main();
9 changes: 8 additions & 1 deletion examples/kinesis-retrier.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/* eslint-disable no-console */
import * as kinesis from '@aws-sdk/client-kinesis';
import { KinesisRetrierStatic } from '@shutterstock/kinesis-helpers';

const kinesisClient = new kinesis.KinesisClient({});
const { KINESIS_STREAM_NAME = 'kinesis-helpers-test-stream', RECORDS_TO_WRITE = '1000000' } =
const { KINESIS_STREAM_NAME = 'kinesis-helpers-test-stream', RECORDS_TO_WRITE = '10000' } =
process.env;
const RECORDS_TO_WRITE_NUM = parseInt(RECORDS_TO_WRITE, 10);
const RECORDS_PER_BATCH = 500;
Expand All @@ -24,12 +25,18 @@ async function main() {
});
}

// Log how many records we will be writing
console.log(`Writing ${RECORDS_TO_WRITE_NUM} records to ${KINESIS_STREAM_NAME}`);

// Send a whole lot of records so we start getting throttled within the batches
for (let i = 0; i < RECORDS_TO_WRITE_NUM; i += RECORDS_PER_BATCH) {
// Note how many records we are adding and how long it took
console.time(`Adding ${i} to ${i + RECORDS_PER_BATCH} records took`);
const result = await KinesisRetrierStatic.putRecords(
kinesisClient,
new kinesis.PutRecordsCommand(records),
);
console.timeEnd(`Adding ${i} to ${i + RECORDS_PER_BATCH} records took`);

if (result.FailedRecordCount ?? 0 > 0) {
throw new Error('this should not happen - we should get backoff retries on the batch puts');
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"scripts": {
"build": "tsc --build tsconfig.json && echo 'examples/\n*.tsbuildinfo' > dist/.npmignore",
"build:docs": "typedoc src/index.ts",
"example:aws-kinesis-writer": "KINESIS_STREAM_NAME=${KINESIS_STREAM_NAME:-kinesis-helpers-test-stream} ts-node -r tsconfig-paths/register examples/aws-kinesis-writer.ts",
"example:kinesis-background-writer": "KINESIS_STREAM_NAME=${KINESIS_STREAM_NAME:-kinesis-helpers-test-stream} ts-node -r tsconfig-paths/register examples/kinesis-background-writer.ts",
"example:kinesis-retrier": "KINESIS_STREAM_NAME=${KINESIS_STREAM_NAME:-kinesis-helpers-test-stream} ts-node -r tsconfig-paths/register examples/kinesis-retrier.ts",
"test": "AWS_EMF_ENVIRONMENT=Local jest",
"lint": "eslint ./ --ext .ts --ext .tsx",
"lint-and-fix": "eslint ./ --ext .ts --ext .tsx --fix"
Expand Down
6 changes: 0 additions & 6 deletions src/kinesis-retrier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ export class KinesisRetrier implements KinesisPutRecordsSend {

/**
* Send a PutRecordsCommand and retry any failures with exponential backoff
*
* @param command
* @returns
*/
public async send(command: PutRecordsCommand): Promise<PutRecordsCommandOutput> {
if (command.input.Records === undefined) {
Expand Down Expand Up @@ -130,9 +127,6 @@ export class KinesisRetrierStatic {

/**
* Send a PutRecordsCommand and retry any failures with exponential backoff
*
* @param command
* @returns
*/
public static async putRecords(
client: KinesisClient,
Expand Down

0 comments on commit 2706cda

Please sign in to comment.