Skip to content

Commit

Permalink
ObservableStream use ReadableStream instead of iterator (#12112)
Browse files Browse the repository at this point in the history
  • Loading branch information
phryneas authored Nov 7, 2024
1 parent a3f95c6 commit 33a1dae
Showing 1 changed file with 15 additions and 41 deletions.
56 changes: 15 additions & 41 deletions src/testing/internal/ObservableStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Observable } from "../../utilities/index.js";
import { ReadableStream } from "node:stream/web";

interface TakeOptions {
timeout?: number;
Expand All @@ -8,40 +9,23 @@ type ObservableEvent<T> =
| { type: "error"; error: any }
| { type: "complete" };

async function* observableToAsyncEventIterator<T>(observable: Observable<T>) {
let resolveNext: (value: ObservableEvent<T>) => void;
const promises: Promise<ObservableEvent<T>>[] = [];
queuePromise();

function queuePromise() {
promises.push(
new Promise<ObservableEvent<T>>((resolve) => {
resolveNext = (event: ObservableEvent<T>) => {
resolve(event);
queuePromise();
};
})
);
}

observable.subscribe(
(value) => resolveNext({ type: "next", value }),
(error) => resolveNext({ type: "error", error }),
() => resolveNext({ type: "complete" })
);
yield "initialization value" as unknown as Promise<ObservableEvent<T>>;

while (true) {
yield promises.shift()!;
export class ObservableStream<T> {
private reader: ReadableStreamDefaultReader<ObservableEvent<T>>;
constructor(observable: Observable<T>) {
this.reader = new ReadableStream<ObservableEvent<T>>({
start(controller) {
observable.subscribe(
(value) => controller.enqueue({ type: "next", value }),
(error) => controller.enqueue({ type: "error", error }),
() => controller.enqueue({ type: "complete" })
);
},
}).getReader();
}
}

class IteratorStream<T> {
constructor(private iterator: AsyncGenerator<T, void, unknown>) {}

async take({ timeout = 100 }: TakeOptions = {}): Promise<T> {
take({ timeout = 100 }: TakeOptions = {}) {
return Promise.race([
this.iterator.next().then((result) => result.value!),
this.reader.read().then((result) => result.value!),
new Promise<T>((_, reject) => {
setTimeout(
reject,
Expand All @@ -51,16 +35,6 @@ class IteratorStream<T> {
}),
]);
}
}

export class ObservableStream<T> extends IteratorStream<ObservableEvent<T>> {
constructor(observable: Observable<T>) {
const iterator = observableToAsyncEventIterator(observable);
// we need to call next() once to start the generator so we immediately subscribe.
// the first value is always "initialization value" which we don't care about
iterator.next();
super(iterator);
}

async takeNext(options?: TakeOptions): Promise<T> {
const event = await this.take(options);
Expand Down

0 comments on commit 33a1dae

Please sign in to comment.