Skip to content
This repository was archived by the owner on Feb 14, 2024. It is now read-only.

Commit cd82651

Browse files
jacobhericJacob Heric
andauthored
construct proper kv step response and send it back from host function (#74)
* construct proper kv step response and send it back from host function * remove commented code * add sandbox kv example for reference * flesh out kv example --------- Co-authored-by: Jacob Heric <jacob@jacobheric.com:q!>
1 parent 3931efe commit cd82651

File tree

5 files changed

+117
-26
lines changed

5 files changed

+117
-26
lines changed

src/internal/kv.ts

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
import { KVAction } from "@streamdal/protos/protos/shared/sp_shared";
22
import { KVCommand } from "@streamdal/protos/protos/sp_command";
33
import { KVInstruction } from "@streamdal/protos/protos/sp_kv";
4+
import {
5+
KVStatus,
6+
KVStep,
7+
KVStepResponse,
8+
} from "@streamdal/protos/protos/steps/sp_steps_kv";
49

510
import { internal } from "./register.js";
11+
import { writeResponse } from "./wasm.js";
612

713
export const kvInstruction = (instruction: KVInstruction) => {
814
switch (instruction.action) {
@@ -36,11 +42,29 @@ export const kvCommand = (command: KVCommand) => {
3642
};
3743

3844
export const kvExists = (
39-
memory: WebAssembly.Memory,
40-
pointer: number,
41-
length: number
42-
): boolean => {
43-
const bytes = new Uint8Array(memory.buffer, pointer, length);
44-
const key = new TextDecoder().decode(bytes);
45-
return internal.kv.has(key);
45+
exports: any,
46+
keyPointer: number,
47+
keyLength: number
48+
): bigint => {
49+
const kvStep = KVStep.fromBinary(
50+
new Uint8Array(exports.memory.buffer, keyPointer, keyLength)
51+
);
52+
const resultBytes = KVStepResponse.toBinary(
53+
KVStepResponse.create({
54+
status: internal.kv.has(kvStep.key)
55+
? KVStatus.KV_STATUS_SUCCESS
56+
: KVStatus.KV_STATUS_FAILURE,
57+
})
58+
);
59+
60+
const resultPointer = exports.alloc(resultBytes.length);
61+
const mem = new Uint8Array(
62+
exports.memory.buffer,
63+
resultPointer,
64+
resultBytes.length
65+
);
66+
mem.set(resultBytes);
67+
const response = writeResponse(resultPointer, resultBytes.length);
68+
69+
return response;
4670
};

src/internal/wasm.ts

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,8 @@ export const instantiateWasm = async (
3838
const instantiated = await WebAssembly.instantiate(wasm, {
3939
wasi_snapshot_preview1: wasi.wasiImport,
4040
env: {
41-
kvExists: (pointer: number, length: number): number => {
42-
const result = instantiated.exports.memory
43-
? kvExists(
44-
instantiated.exports.memory as WebAssembly.Memory,
45-
pointer,
46-
length
47-
)
48-
: false;
49-
//
50-
// Wasm function expects a bigint but our runtime type expects a number
51-
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
52-
// @ts-expect-error
53-
return BigInt(result ? 1 : 0);
54-
},
41+
kvExists: (pointer: number, length: number): bigint =>
42+
kvExists(instantiated.exports, pointer, length),
5543
},
5644
});
5745
internal.wasmModules.set(wasmId, instantiated);
@@ -68,6 +56,16 @@ export const readResponse = (pointer: bigint, buffer: Uint8Array): any => {
6856
return buffer.slice(start, start + length);
6957
};
7058

59+
export const writeResponse = (pointer: number, length: number): bigint => {
60+
//
61+
// Left shift the pointer value by 32 bits
62+
const start = BigInt(pointer) << BigInt(32);
63+
64+
//
65+
// Combine the shifted start and length using bitwise OR
66+
return start | BigInt(length);
67+
};
68+
7169
export const runWasm = ({
7270
step,
7371
originalData,

src/sandbox/billing.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ const parseJson = (d: string, i: number): any => {
8383
return null;
8484
};
8585

86-
const loadData = (path: string): any[] => {
86+
export const loadData = (path: string): any[] => {
8787
try {
8888
const data = readFileSync(path).toString();
8989

src/sandbox/index.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,12 @@ const logPipeline = async (streamdal: any, audience: Audience, input: any) => {
112112
audience.operationType
113113
].toLowerCase()}`
114114
);
115-
const { error, errorMessage, data } = await streamdal.process({
116-
audience: audience,
117-
data: new TextEncoder().encode(JSON.stringify(input)),
118-
});
115+
const { error, errorMessage, data, pipelineStatus } = await streamdal.process(
116+
{
117+
audience: audience,
118+
data: new TextEncoder().encode(JSON.stringify(input)),
119+
}
120+
);
119121
//
120122
// no active pipeline messages are technically errors
121123
// but more informational
@@ -129,6 +131,8 @@ const logPipeline = async (streamdal: any, audience: Audience, input: any) => {
129131
} catch (e) {
130132
console.error("could not parse data", e);
131133
}
134+
console.debug("pipeline status:");
135+
console.dir(pipelineStatus, { depth: 20 });
132136
console.debug("pipeline request done", new Date());
133137
console.debug("--------------------------------");
134138
console.debug("\n");

src/sandbox/kv.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { Audience } from "@streamdal/protos/protos/sp_common";
2+
3+
import { OperationType, Streamdal, StreamdalConfigs } from "../streamdal.js";
4+
import { loadData } from "./billing.js";
5+
6+
const serviceKVConfig: StreamdalConfigs = {
7+
streamdalUrl: "localhost:8082",
8+
streamdalToken: "1234",
9+
serviceName: "kv-service",
10+
pipelineTimeout: "100",
11+
stepTimeout: "10",
12+
dryRun: false,
13+
};
14+
15+
const KVProducer: Audience = {
16+
serviceName: "kv-service",
17+
componentName: "postgresql",
18+
operationType: OperationType.PRODUCER,
19+
operationName: "import",
20+
};
21+
22+
/**
23+
* 1. run this
24+
* 2. go to the console, create a pipeline with a "Key/Value" step type,
25+
* choose type "Exists", mode "Dynamic", and use "event_id" for key.
26+
* 3. Add the following key via the server rest api:
27+
*
28+
* curl --header "Content-Type: application/json" \
29+
* -H "Authorization: Bearer 1234" \
30+
* --request PUT \
31+
* --data '{"kvs": [{"key": "eaab67a7-f8af-48b9-b65f-1f0f15de9956","value": "eaab67a7-f8af-48b9-b65f-1f0f15de9956"}]}' \
32+
* "http://localhost:8081/api/v1/kv"
33+
*/
34+
export const kv = () => {
35+
const kvService = new Streamdal(serviceKVConfig);
36+
const kvData = loadData("./src/sandbox/assets/sample-welcome-producer.json");
37+
38+
//
39+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
40+
setInterval(async () => {
41+
const result = await kvService.process({
42+
audience: KVProducer,
43+
data: new TextEncoder().encode(JSON.stringify(kvData[0])),
44+
});
45+
46+
//
47+
// Key exists, this will result in a pipeline step running without error
48+
// if this is part of multi-step or multi-pipeline you will need to inspect pipelineStatus
49+
console.debug(result.error);
50+
}, 1000);
51+
52+
//
53+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
54+
setInterval(async () => {
55+
const result = await kvService.process({
56+
audience: KVProducer,
57+
data: new TextEncoder().encode(JSON.stringify(kvData[1])),
58+
});
59+
60+
//
61+
// Key does not exist, this will result in an error
62+
// if this is part of multi-step or multi-pipeline you will need to inspect pipelineStatus
63+
console.debug(result.error);
64+
}, 1000);
65+
};

0 commit comments

Comments
 (0)