Skip to content
This repository was archived by the owner on Feb 14, 2024. It is now read-only.

Commit 87c48e8

Browse files
jacobhericJacob Heric
andauthored
suppor multiple pipelines properly and use new response (#71)
Co-authored-by: Jacob Heric <jacob@jacobheric.com:q!>
1 parent 6df1b48 commit 87c48e8

File tree

16 files changed

+308
-278
lines changed

16 files changed

+308
-278
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const audience: Audience = {
3131

3232
export const example = async () => {
3333
const streamdal = new Streamdal(config);
34-
const result = await streamdal.processPipeline({
34+
const result = await streamdal.process({
3535
audience,
3636
data: new TextEncoder().encode(JSON.stringify(exampleData)),
3737
});

examples/commonjs/src/index.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { SDKResponse } from "@streamdal/node-sdk";
2+
13
const {
24
OperationType,
35
Streamdal,
@@ -48,16 +50,16 @@ const audience = {
4850

4951
export const example = async () => {
5052
const streamdal = new Streamdal(config);
51-
const result = await streamdal.processPipeline({
53+
const result: SDKResponse = await streamdal.process({
5254
audience,
5355
data: new TextEncoder().encode(JSON.stringify(exampleData)),
5456
});
5557

5658
if (result.error) {
57-
console.error("Pipeline error", result.message);
59+
console.error("Pipeline error", result.errorMessage);
5860
//
5961
// Optionally explore more detailed step status information
60-
console.dir(result.stepStatuses);
62+
console.dir(result.pipelineStatus);
6163
} else {
6264
console.info("Pipeline success!");
6365
//

examples/esm/src/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import {
22
Audience,
33
OperationType,
4+
SDKResponse,
45
Streamdal,
56
StreamdalConfigs,
6-
StreamdalResponse,
77
} from "@streamdal/node-sdk";
88

99
const exampleData = {
@@ -50,16 +50,16 @@ const audience: Audience = {
5050

5151
export const example = async () => {
5252
const streamdal = new Streamdal(config);
53-
const result: StreamdalResponse = await streamdal.processPipeline({
53+
const result: SDKResponse = await streamdal.process({
5454
audience,
5555
data: new TextEncoder().encode(JSON.stringify(exampleData)),
5656
});
5757

5858
if (result.error) {
59-
console.error("Pipeline error", result.message);
59+
console.error("Pipeline error", result.errorMessage);
6060
//
6161
// Optionally explore more detailed step status information
62-
console.dir(result.stepStatuses);
62+
console.dir(result.pipelineStatus);
6363
} else {
6464
console.info("Pipeline success!");
6565
//

package-lock.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"@grpc/proto-loader": "0.7.7",
4444
"@protobuf-ts/grpc-transport": "2.9.0",
4545
"@protobuf-ts/runtime-rpc": "2.9.0",
46-
"@streamdal/protos": "^0.0.123",
46+
"@streamdal/protos": "^0.0.125",
4747
"@types/rwlock": "^5.0.3",
4848
"@types/sinon": "^10.0.16",
4949
"dotenv": "^16.3.1",

src/__tests__/pipeline.test.ts

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -97,54 +97,55 @@ const testAttachCommandByServiceResponse = {
9797
};
9898

9999
describe("pipeline tests", () => {
100+
const key = audienceKey(testAudience);
100101
it("initPipelines should add a given pipeline to internal store and set pipelineInitialized ", async () => {
101102
sinon
102103
.stub(testConfigs.grpcClient, "getAttachCommandsByService")
103104
.resolves(testAttachCommandByServiceResponse as any);
104105

105106
await initPipelines(testConfigs);
106107

107-
expect(internal.pipelines.has(audienceKey(testAudience))).toEqual(true);
108+
expect(internal.pipelines.has(key)).toEqual(true);
108109
expect(internal.pipelineInitialized).toEqual(true);
109110
});
110111

111112
describe("processResponse", () => {
112-
it("attach command should add pipeline to internal store", () => {
113-
processResponse(testAttachCommand);
113+
it("attach command should add pipeline to internal store", async () => {
114+
await processResponse(testAttachCommand);
115+
expect(internal.pipelines.has(key)).toEqual(true);
114116

115-
expect(internal.pipelines.has(audienceKey(testAudience))).toEqual(true);
117+
const pipelines = internal.pipelines.get(key);
118+
expect(pipelines?.has(testPipeline.id)).toEqual(true);
116119
});
117120

118-
it("detach command should remove pipeline from internal store", () => {
119-
internal.pipelines.set(audienceKey(testAudience), {
120-
...testPipeline,
121-
paused: false,
122-
});
123-
processResponse(testDetachCommand);
121+
it("detach command should remove pipeline from internal store", async () => {
122+
internal.pipelines.set(key, new Map([[testPipeline.id, testPipeline]]));
123+
await processResponse(testDetachCommand);
124124

125-
expect(internal.pipelines.has(audienceKey(testAudience))).toEqual(false);
125+
expect(internal.pipelines.get(key)?.has(testPipeline.id)).toEqual(false);
126126
});
127127

128-
it("pause command should flag pipeline as paused in internal store", () => {
129-
internal.pipelines.set(audienceKey(testAudience), {
130-
...testPipeline,
131-
paused: false,
132-
});
133-
processResponse(testPauseCommand);
128+
it("pause command should flag pipeline as paused in internal store", async () => {
129+
internal.pipelines.set(
130+
key,
131+
new Map([[testPipeline.id, { ...testPipeline, paused: false }]])
132+
);
133+
134+
await processResponse(testPauseCommand);
134135

135-
expect(internal.pipelines.get(audienceKey(testAudience))?.paused).toEqual(
136+
expect(internal.pipelines.get(key)?.get(testPipeline.id)?.paused).toEqual(
136137
true
137138
);
138139
});
139140

140-
it("resume command should flag pipeline as not paused in internal store", () => {
141-
internal.pipelines.set(audienceKey(testAudience), {
142-
...testPipeline,
143-
paused: false,
144-
});
145-
processResponse(testResumeCommand);
141+
it("resume command should flag pipeline as not paused in internal store", async () => {
142+
internal.pipelines.set(
143+
key,
144+
new Map([[testPipeline.id, { ...testPipeline, paused: true }]])
145+
);
146+
await processResponse(testResumeCommand);
146147

147-
expect(internal.pipelines.get(audienceKey(testAudience))?.paused).toEqual(
148+
expect(internal.pipelines.get(key)?.get(testPipeline.id)?.paused).toEqual(
148149
false
149150
);
150151
});

src/internal/metrics.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import {
44
OperationType,
55
} from "@streamdal/protos/protos/sp_common";
66
import { IInternalClient } from "@streamdal/protos/protos/sp_internal.client";
7+
import { StepStatus } from "@streamdal/protos/protos/sp_sdk";
78
import ReadWriteLock from "rwlock";
89

9-
import { StepStatus } from "./process.js";
10+
import { InternalPipeline } from "./pipeline.js";
1011

1112
export const METRIC_INTERVAL = 1000;
1213

@@ -18,25 +19,34 @@ export interface MetricsConfigs {
1819
streamdalToken: string;
1920
}
2021

21-
export const getStepLabels = (audience: Audience, stepStatus: StepStatus) => ({
22+
export const getStepLabels = (
23+
audience: Audience,
24+
pipeline: InternalPipeline
25+
) => ({
2226
service: audience.serviceName,
2327
component: audience.componentName,
2428
operation: audience.operationName,
25-
pipeline_id: stepStatus.pipelineId,
26-
pipeline_name: stepStatus.pipelineName,
29+
pipeline_id: pipeline.id,
30+
pipeline_name: pipeline.name,
2731
});
2832

29-
export const stepMetrics = async (
30-
audience: Audience,
31-
stepStatus: StepStatus,
32-
payloadSize: number
33+
export const stepMetrics = async ({
34+
audience,
35+
stepStatus,
36+
pipeline,
37+
payloadSize,
38+
}: {
39+
audience: Audience;
40+
stepStatus: StepStatus;
41+
pipeline: InternalPipeline;
42+
payloadSize: number;
3343
// eslint-disable-next-line @typescript-eslint/require-await
34-
) => {
44+
}) => {
3545
lock.writeLock((release) => {
3646
const opName =
3747
audience.operationType === OperationType.CONSUMER ? "consume" : "produce";
3848

39-
const labels = getStepLabels(audience, stepStatus);
49+
const labels = getStepLabels(audience, pipeline);
4050
const stepErrorKey = `counter_${opName}_errors`;
4151
const stepProcessedKey = `counter_${opName}_processed`;
4252
const stepBytesKey = `counter_${opName}_bytes`;

src/internal/pipeline.ts

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@ export type InternalPipeline = Pipeline & {
1111
paused?: boolean;
1212
};
1313

14-
export type EnhancedStep = PipelineStep & {
15-
pipelineId: string;
16-
pipelineName: string;
17-
};
18-
1914
export const initPipelines = async (configs: Configs) => {
2015
try {
16+
if (internal.pipelineInitialized) {
17+
return;
18+
}
19+
2120
console.debug("initializing pipelines");
2221
const { response }: { response: GetAttachCommandsByServiceResponse } =
2322
await configs.grpcClient.getAttachCommandsByService(
@@ -27,20 +26,20 @@ export const initPipelines = async (configs: Configs) => {
2726
{ meta: { "auth-token": configs.streamdalToken } }
2827
);
2928

30-
for (const [k, v] of Object.entries(response.wasmModules)) {
31-
void instantiateWasm(k, v.bytes);
29+
for await (const [k, v] of Object.entries(response.wasmModules)) {
30+
await instantiateWasm(k, v.bytes);
3231
}
3332

34-
for (const command of response.active) {
35-
processResponse(command);
33+
for await (const command of response.active) {
34+
await processResponse(command);
3635
}
3736
internal.pipelineInitialized = true;
3837
} catch (e) {
3938
console.error("Error initializing pipelines", e);
4039
}
4140
};
4241

43-
export const processResponse = (response: Command) => {
42+
export const processResponse = async (response: Command) => {
4443
if (!response.audience) {
4544
response.command.oneofKind !== "keepAlive" &&
4645
console.debug("command response has no audience, ignoring");
@@ -50,10 +49,10 @@ export const processResponse = (response: Command) => {
5049
switch (response.command.oneofKind) {
5150
case "attachPipeline":
5251
response.command.attachPipeline.pipeline &&
53-
attachPipeline(
52+
(await attachPipeline(
5453
response.audience,
5554
response.command.attachPipeline.pipeline
56-
);
55+
));
5756
break;
5857
case "detachPipeline":
5958
detachPipeline(
@@ -81,40 +80,50 @@ export const processResponse = (response: Command) => {
8180
}
8281
};
8382

84-
export const buildPipeline = (pipeline: Pipeline): Pipeline => {
83+
export const buildPipeline = async (pipeline: Pipeline): Promise<Pipeline> => {
8584
return {
8685
...pipeline,
87-
steps: pipeline.steps.map((step: PipelineStep) => {
88-
void instantiateWasm(step.WasmId, step.WasmBytes);
89-
return {
90-
...step,
91-
WasmBytes: undefined,
92-
};
93-
}),
86+
steps: await Promise.all(
87+
pipeline.steps.map(async (step: PipelineStep) => {
88+
await instantiateWasm(step.WasmId, step.WasmBytes);
89+
return {
90+
...step,
91+
WasmBytes: undefined,
92+
};
93+
})
94+
),
9495
};
9596
};
9697

97-
export const attachPipeline = (audience: Audience, pipeline: Pipeline) => {
98-
internal.pipelines.set(audienceKey(audience), {
99-
...buildPipeline(pipeline),
100-
paused: false,
101-
});
102-
};
103-
104-
export const detachPipeline = (audience: Audience, pipelineId: string) => {
98+
export const attachPipeline = async (
99+
audience: Audience,
100+
pipeline: Pipeline
101+
) => {
105102
const key = audienceKey(audience);
106-
const p = internal.pipelines.get(key);
107-
pipelineId === p?.id && internal.pipelines.delete(key);
103+
const existing = internal.pipelines.get(key);
104+
const built = await buildPipeline(pipeline);
105+
internal.pipelines.set(
106+
key,
107+
existing
108+
? existing.set(pipeline.id, built)
109+
: new Map([[pipeline.id, built]])
110+
);
108111
};
109112

113+
export const detachPipeline = (audience: Audience, pipelineId: string) =>
114+
internal.pipelines.get(audienceKey(audience))?.delete(pipelineId);
115+
110116
export const togglePausePipeline = (
111117
audience: Audience,
112118
pipelineId: string,
113119
paused: boolean
114120
) => {
115121
const key = audienceKey(audience);
116-
const p = internal.pipelines.get(key);
117-
pipelineId === p?.id && internal.pipelines.set(key, { ...p, paused });
122+
const existing = internal.pipelines.get(key);
123+
const p = existing?.get(pipelineId);
124+
existing &&
125+
p &&
126+
internal.pipelines.set(key, existing.set(p.id, { ...p, paused }));
118127
};
119128

120129
export const tailPipeline = (audience: Audience, { request }: TailCommand) => {

0 commit comments

Comments
 (0)