Skip to content

Commit 02fa79a

Browse files
committed
improve
1 parent 29907f0 commit 02fa79a

File tree

4 files changed

+81
-44
lines changed

4 files changed

+81
-44
lines changed

packages/shared/src/iterator.test.ts

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -297,9 +297,7 @@ describe('replicateAsyncIterator', async () => {
297297

298298
const gen = async function* () {
299299
yield 1
300-
await new Promise(resolve => setTimeout(resolve, 1))
301-
yield 2
302-
yield 3
300+
await new Promise(resolve => setTimeout(resolve, 10))
303301
throw error
304302
}
305303

@@ -310,30 +308,18 @@ describe('replicateAsyncIterator', async () => {
310308
expect(await iterators[0]!.next()).toEqual({ done: false, value: 1 })
311309
expect(await iterators[1]!.next()).toEqual({ done: false, value: 1 })
312310

313-
expect(await iterators[0]!.next()).toEqual({ done: false, value: 2 })
314-
expect(await iterators[1]!.next()).toEqual({ done: false, value: 2 })
315-
316-
expect(await iterators[0]!.next()).toEqual({ done: false, value: 3 })
317-
expect(await iterators[1]!.next()).toEqual({ done: false, value: 3 })
318-
expect(await iterators[2]!.next()).toEqual({ done: false, value: 1 })
319-
320311
await Promise.all([
321312
expect(iterators[0]!.next()).rejects.toThrow(error),
322313
expect(iterators[1]!.next()).rejects.toThrow(error),
323314
])
324-
expect(await iterators[2]!.next()).toEqual({ done: false, value: 2 })
325315

326316
expect(await iterators[0]!.next()).toEqual({ done: true, value: undefined })
327317
expect(await iterators[1]!.next()).toEqual({ done: true, value: undefined })
328-
expect(await iterators[2]!.next()).toEqual({ done: false, value: 3 })
318+
expect(await iterators[2]!.next()).toEqual({ done: false, value: 1 })
329319

330320
expect(await iterators[0]!.next()).toEqual({ done: true, value: undefined })
331321
expect(await iterators[1]!.next()).toEqual({ done: true, value: undefined })
332322
await expect(iterators[2]!.next()).rejects.toThrow(error)
333-
334-
expect(await iterators[0]!.next()).toEqual({ done: true, value: undefined })
335-
expect(await iterators[1]!.next()).toEqual({ done: true, value: undefined })
336-
expect(await iterators[2]!.next()).toEqual({ done: true, value: undefined })
337323
})
338324

339325
it('on manual close', async () => {

packages/shared/src/iterator.ts

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { SetSpanErrorOptions } from './otel'
2-
import { defer, once, sequential } from './function'
2+
import { once, sequential } from './function'
33
import { runInSpanContext, setSpanError, startSpan } from './otel'
44
import { AsyncIdQueue } from './queue'
55

@@ -125,8 +125,12 @@ export function replicateAsyncIterator<T, TReturn, TNext>(
125125
}
126126
}
127127
}
128-
catch (e) {
129-
error = { value: e }
128+
catch (reason) {
129+
error = { value: reason }
130+
131+
queue.waiterIds.forEach((id) => {
132+
queue.close({ id, reason })
133+
})
130134
}
131135
})
132136

@@ -139,15 +143,14 @@ export function replicateAsyncIterator<T, TReturn, TNext>(
139143
start()
140144

141145
return new Promise((resolve, reject) => {
142-
queue.pull(id)
143-
.then(resolve)
144-
.catch(reject)
145-
146-
defer(() => {
147-
if (error) {
148-
reject(error.value)
149-
}
150-
})
146+
if (!error || queue.hasBufferedItems(id)) {
147+
queue.pull(id)
148+
.then(resolve)
149+
.catch(reject)
150+
}
151+
else {
152+
reject(error.value)
153+
}
151154
})
152155
},
153156
async (reason) => {

packages/shared/src/queue.test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,44 @@ describe('asyncIdQueue', () => {
132132
expect(queue.isOpen('2')).toBe(false)
133133
expect(queue.isOpen('3')).toBe(false)
134134
})
135+
136+
it('waiterIds', async () => {
137+
queue.open('1')
138+
queue.open('2')
139+
140+
const p1 = queue.pull('1')
141+
const p2 = queue.pull('2')
142+
143+
expect(queue.waiterIds).toEqual(['1', '2'])
144+
145+
queue.push('1', 'item1')
146+
queue.push('2', 'item2')
147+
148+
await expect(p1).resolves.toBe('item1')
149+
await expect(p2).resolves.toBe('item2')
150+
151+
expect(queue.waiterIds).toEqual([])
152+
})
153+
154+
it('hasBufferedItems', async () => {
155+
queue.open('1')
156+
queue.open('2')
157+
158+
expect(queue.hasBufferedItems('1')).toBe(false)
159+
expect(queue.hasBufferedItems('2')).toBe(false)
160+
161+
queue.push('1', 'item1')
162+
queue.push('2', 'item2')
163+
164+
expect(queue.hasBufferedItems('1')).toBe(true)
165+
expect(queue.hasBufferedItems('2')).toBe(true)
166+
167+
await queue.pull('1')
168+
expect(queue.hasBufferedItems('1')).toBe(false)
169+
expect(queue.hasBufferedItems('2')).toBe(true)
170+
171+
await queue.pull('2')
172+
expect(queue.hasBufferedItems('1')).toBe(false)
173+
expect(queue.hasBufferedItems('2')).toBe(false)
174+
})
135175
})

packages/shared/src/queue.ts

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,21 @@ export interface AsyncIdQueueCloseOptions {
55

66
export class AsyncIdQueue<T> {
77
private readonly openIds = new Set<string>()
8-
private readonly items = new Map<string, T[]>()
9-
private readonly pendingPulls = new Map<string, (readonly [resolve: (item: T) => void, reject: (err: unknown) => void])[]>()
8+
private readonly queues = new Map<string, T[]>()
9+
private readonly waiters = new Map<string, (readonly [resolve: (item: T) => void, reject: (err: unknown) => void])[]>()
1010

1111
get length(): number {
1212
return this.openIds.size
1313
}
1414

15+
get waiterIds(): string[] {
16+
return Array.from(this.waiters.keys())
17+
}
18+
19+
hasBufferedItems(id: string): boolean {
20+
return Boolean(this.queues.get(id)?.length)
21+
}
22+
1523
open(id: string): void {
1624
this.openIds.add(id)
1725
}
@@ -23,77 +31,77 @@ export class AsyncIdQueue<T> {
2331
push(id: string, item: T): void {
2432
this.assertOpen(id)
2533

26-
const pending = this.pendingPulls.get(id)
34+
const pending = this.waiters.get(id)
2735

2836
if (pending?.length) {
2937
pending.shift()![0](item)
3038

3139
if (pending.length === 0) {
32-
this.pendingPulls.delete(id)
40+
this.waiters.delete(id)
3341
}
3442
}
3543
else {
36-
const items = this.items.get(id)
44+
const items = this.queues.get(id)
3745

3846
if (items) {
3947
items.push(item)
4048
}
4149
else {
42-
this.items.set(id, [item])
50+
this.queues.set(id, [item])
4351
}
4452
}
4553
}
4654

4755
async pull(id: string): Promise<T> {
4856
this.assertOpen(id)
4957

50-
const items = this.items.get(id)
58+
const items = this.queues.get(id)
5159

5260
if (items?.length) {
5361
const item = items.shift()!
5462

5563
if (items.length === 0) {
56-
this.items.delete(id)
64+
this.queues.delete(id)
5765
}
5866

5967
return item
6068
}
6169

6270
return new Promise<T>((resolve, reject) => {
63-
const waitingPulls = this.pendingPulls.get(id)
71+
const waitingPulls = this.waiters.get(id)
6472

6573
const pending = [resolve, reject] as const
6674

6775
if (waitingPulls) {
6876
waitingPulls.push(pending)
6977
}
7078
else {
71-
this.pendingPulls.set(id, [pending])
79+
this.waiters.set(id, [pending])
7280
}
7381
})
7482
}
7583

7684
close({ id, reason }: AsyncIdQueueCloseOptions = {}): void {
7785
if (id === undefined) {
78-
this.pendingPulls.forEach((pendingPulls, id) => {
86+
this.waiters.forEach((pendingPulls, id) => {
7987
pendingPulls.forEach(([, reject]) => {
8088
reject(reason ?? new Error(`[AsyncIdQueue] Queue[${id}] was closed or aborted while waiting for pulling.`))
8189
})
8290
})
8391

84-
this.pendingPulls.clear()
92+
this.waiters.clear()
8593
this.openIds.clear()
86-
this.items.clear()
94+
this.queues.clear()
8795
return
8896
}
8997

90-
this.pendingPulls.get(id)?.forEach(([, reject]) => {
98+
this.waiters.get(id)?.forEach(([, reject]) => {
9199
reject(reason ?? new Error(`[AsyncIdQueue] Queue[${id}] was closed or aborted while waiting for pulling.`))
92100
})
93101

94-
this.pendingPulls.delete(id)
102+
this.waiters.delete(id)
95103
this.openIds.delete(id)
96-
this.items.delete(id)
104+
this.queues.delete(id)
97105
}
98106

99107
assertOpen(id: string): void {

0 commit comments

Comments
 (0)