Skip to content

Commit e8e8894

Browse files
committed
feat: add buffer reset functionality
1 parent 8108357 commit e8e8894

File tree

4 files changed

+65
-6
lines changed

4 files changed

+65
-6
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@invition/rxjs-sharelatest",
3-
"version": "1.0.2",
3+
"version": "1.0.3",
44
"description": "An RxJS operator designed to limit source invocation and guarantee a swift initial response",
55
"main": "./dist/index.js",
66
"module": "./dist/index.mjs",

src/reuseSubject.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export class ReuseSubject<T> extends ReplaySubject<T> {
2525
this['_buffer'] = __buffer;
2626
}
2727

28-
public getBuffer(): (T | number)[] {
29-
return (this['_buffer'] as (T | number)[]).slice();
30-
}
28+
public getBuffer = (): (T | number)[] => (this['_buffer'] as (T | number)[]).slice();
29+
30+
public resetBuffer = (): void => void (this['_buffer'].length = 0);
3131
}

src/shareLatest-bufferReset.spec.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { afterEach, beforeEach, describe, expect, it } from '@jest/globals';
2+
import { firstValueFrom, map, Observable, of, pipe, Subject, Subscription, tap, timeout, timer } from 'rxjs';
3+
import { shareLatest } from './shareLatest';
4+
5+
describe('RxJS shareLatest operator with bufferLifetime', () => {
6+
let count: number;
7+
let subject: Subject<number>;
8+
let value: number | undefined;
9+
let randomNumber: number;
10+
let observable: Observable<number>;
11+
let subscription: Subscription;
12+
let bufferReset: Subject<void>;
13+
14+
beforeEach(() => {
15+
count = 0;
16+
value = undefined;
17+
randomNumber = Math.random() * 1000;
18+
subject = new Subject();
19+
bufferReset = new Subject();
20+
observable = subject.pipe(
21+
tap(() => ++count),
22+
map(v => v * 2),
23+
shareLatest(undefined, bufferReset.asObservable()),
24+
);
25+
subscription = observable.subscribe(newValue => (value = newValue));
26+
});
27+
28+
afterEach(() => {
29+
subscription?.unsubscribe();
30+
});
31+
32+
it(`should't emit value if buffer expired`, async () => {
33+
const InvSqrtMagic = 0x5f3759df;
34+
const timeoutWithMagic = pipe(timeout({ first: 10, with: () => of(InvSqrtMagic) }));
35+
36+
subject.next(randomNumber);
37+
await firstValueFrom(timer(0));
38+
subscription.unsubscribe();
39+
expect(count).toBe(1);
40+
expect(value).toBe(randomNumber * 2);
41+
42+
let value2 = await firstValueFrom(observable.pipe(timeoutWithMagic));
43+
expect(count).toBe(1);
44+
expect(value2).toBe(randomNumber * 2);
45+
46+
// reset buffer
47+
bufferReset.next();
48+
value2 = await firstValueFrom(observable.pipe(timeoutWithMagic));
49+
expect(count).toBe(1);
50+
expect(value2).toBe(InvSqrtMagic);
51+
});
52+
});

src/shareLatest.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
import type { MonoTypeOperatorFunction, Observable } from 'rxjs';
2-
import { asapScheduler, observeOn, share, timer } from 'rxjs';
2+
import { EMPTY, asapScheduler, ignoreElements, observeOn, share, takeUntil, tap, timer } from 'rxjs';
33
import { ReuseSubject } from './reuseSubject';
44

5-
export function shareLatest<T>(bufferLifetime?: number): MonoTypeOperatorFunction<T> {
5+
export function shareLatest<T>(bufferLifetime?: number, bufferReset?: Observable<unknown>): MonoTypeOperatorFunction<T> {
66
let previousBuffer: (number | T)[]
77
let subject: ReuseSubject<T>;
88
const connector = (): ReuseSubject<T> => subject = new ReuseSubject<T>(previousBuffer, bufferLifetime);
99
return (source$: Observable<T>) =>
1010
source$.pipe(
1111
observeOn(asapScheduler),
12+
takeUntil(bufferReset?.pipe(
13+
// Reset the buffer when the lifetime observable emits.
14+
tap(() => subject.resetBuffer()),
15+
// Ignore all emissions from the lifetime observable to prevent it from completing the source.
16+
// This make takeUntil sit here as a noop but just manage subscriber for bufferLifetimeObs.
17+
ignoreElements(),
18+
) ?? EMPTY),
1219
share({
1320
connector,
1421
resetOnComplete: true,

0 commit comments

Comments
 (0)