Skip to content

Move RPC functions to room #434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async function main() {
}

const registerReceiverMethods = (greetersRoom: Room, mathGeniusRoom: Room) => {
greetersRoom.localParticipant?.registerRpcMethod(
greetersRoom.registerRpcMethod(
'arrival',
async (data: RpcInvocationData) => {
console.log(`[Greeter] Oh ${data.callerIdentity} arrived and said "${data.payload}"`);
Expand All @@ -70,7 +70,7 @@ const registerReceiverMethods = (greetersRoom: Room, mathGeniusRoom: Room) => {
},
);

mathGeniusRoom.localParticipant?.registerRpcMethod(
mathGeniusRoom.registerRpcMethod(
'square-root',
async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
Expand All @@ -88,7 +88,7 @@ const registerReceiverMethods = (greetersRoom: Room, mathGeniusRoom: Room) => {
},
);

mathGeniusRoom.localParticipant?.registerRpcMethod(
mathGeniusRoom.registerRpcMethod(
'divide',
async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
Expand Down
2 changes: 1 addition & 1 deletion packages/livekit-rtc/rust-sdks
Submodule rust-sdks updated 48 files
+2 −2 .github/workflows/webrtc-builds.yml
+1 −0 .nanpa/fixed-libwebrtc-jar-build.kdl
+7 −6 Cargo.lock
+3 −3 Cargo.toml
+138 −55 examples/Cargo.lock
+95 −11 examples/play_from_disk/Cargo.lock
+7 −6 examples/rpc/src/main.rs
+1 −1 libwebrtc/src/lib.rs
+102 −0 libwebrtc/src/native/apm.rs
+1 −0 libwebrtc/src/native/mod.rs
+1 −1 livekit-api/Cargo.toml
+1 −0 livekit-api/src/services/egress.rs
+1 −0 livekit-api/src/services/sip.rs
+1 −1 livekit-ffi/.nanparc
+35 −0 livekit-ffi/CHANGELOG.md
+3 −3 livekit-ffi/Cargo.toml
+58 −0 livekit-ffi/protocol/audio_frame.proto
+16 −2 livekit-ffi/protocol/ffi.proto
+2 −2 livekit-ffi/protocol/rpc.proto
+125 −4 livekit-ffi/src/livekit.proto.rs
+30 −0 livekit-ffi/src/server/audio_plugin.rs
+169 −5 livekit-ffi/src/server/audio_stream.rs
+5 −1 livekit-ffi/src/server/mod.rs
+0 −85 livekit-ffi/src/server/participant.rs
+127 −10 livekit-ffi/src/server/requests.rs
+100 −4 livekit-ffi/src/server/room.rs
+1 −1 livekit-protocol/.nanparc
+6 −0 livekit-protocol/CHANGELOG.md
+1 −1 livekit-protocol/Cargo.toml
+1 −1 livekit-protocol/protocol
+305 −7 livekit-protocol/src/livekit.rs
+1,183 −249 livekit-protocol/src/livekit.serde.rs
+16 −6 livekit-protocol/src/promise.rs
+1 −1 livekit/.nanparc
+6 −0 livekit/CHANGELOG.md
+2 −1 livekit/Cargo.toml
+3 −0 livekit/src/lib.rs
+309 −0 livekit/src/plugin.rs
+234 −7 livekit/src/room/mod.rs
+26 −226 livekit/src/room/participant/local_participant.rs
+3 −0 livekit/src/room/participant/mod.rs
+3 −0 webrtc-sys/build.rs
+1 −0 webrtc-sys/compile_flags.txt
+73 −0 webrtc-sys/include/livekit/apm.h
+8 −8 webrtc-sys/libwebrtc/patches/android_use_libunwind.patch
+49 −0 webrtc-sys/src/apm.cpp
+53 −0 webrtc-sys/src/apm.rs
+1 −0 webrtc-sys/src/lib.rs
82 changes: 13 additions & 69 deletions packages/livekit-rtc/src/participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import { LocalTrackPublication } from './track_publication.js';
import type { Transcription } from './transcription.js';
import type { ChatMessage } from './types.js';
import { numberToBigInt, splitUtf8 } from './utils.js';
import type { Room } from './room.js';

const STREAM_CHUNK_SIZE = 15_000;

Expand Down Expand Up @@ -155,10 +156,15 @@ export type DataPublishOptions = {
};

export class LocalParticipant extends Participant {
private rpcHandlers: Map<string, (data: RpcInvocationData) => Promise<string>> = new Map();
private room?: Room;

trackPublications: Map<string, LocalTrackPublication> = new Map();

constructor(owned_info: OwnedParticipant, room: Room) {
super(owned_info);
this.room = room;
}

async publishData(data: Uint8Array, options: DataPublishOptions) {
const req = new PublishDataRequest({
localParticipantHandle: this.ffi_handle.handle,
Expand Down Expand Up @@ -739,6 +745,8 @@ export class LocalParticipant extends Participant {
}

/**
* @deprecated Use `room.registerRpcMethod` instead
*
* Establishes the participant as a receiver for calls of the specified RPC method.
* Will overwrite any existing callback for the same method.
*
Expand Down Expand Up @@ -767,82 +775,18 @@ export class LocalParticipant extends Participant {
* Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error").
*/
registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise<string>) {
this.rpcHandlers.set(method, handler);

const req = new RegisterRpcMethodRequest({
localParticipantHandle: this.ffi_handle.handle,
method,
});

FfiClient.instance.request<RegisterRpcMethodResponse>({
message: { case: 'registerRpcMethod', value: req },
});
this.room?.registerRpcMethod(method, handler);
}

/**
* @deprecated Use `room.unregisterRpcMethod` instead
*
* Unregisters a previously registered RPC method.
*
* @param method - The name of the RPC method to unregister
*/
unregisterRpcMethod(method: string) {
this.rpcHandlers.delete(method);

const req = new UnregisterRpcMethodRequest({
localParticipantHandle: this.ffi_handle.handle,
method,
});

FfiClient.instance.request<UnregisterRpcMethodResponse>({
message: { case: 'unregisterRpcMethod', value: req },
});
}

/** @internal */
async handleRpcMethodInvocation(
invocationId: bigint,
method: string,
requestId: string,
callerIdentity: string,
payload: string,
responseTimeout: number,
) {
let responseError: RpcError | null = null;
let responsePayload: string | null = null;

const handler = this.rpcHandlers.get(method);

if (!handler) {
responseError = RpcError.builtIn('UNSUPPORTED_METHOD');
} else {
try {
responsePayload = await handler({ requestId, callerIdentity, payload, responseTimeout });
} catch (error) {
if (error instanceof RpcError) {
responseError = error;
} else {
console.warn(
`Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`,
error,
);
responseError = RpcError.builtIn('APPLICATION_ERROR');
}
}
}

const req = new RpcMethodInvocationResponseRequest({
localParticipantHandle: this.ffi_handle.handle,
invocationId,
error: responseError ? responseError.toProto() : undefined,
payload: responsePayload ?? undefined,
});

const res = FfiClient.instance.request<RpcMethodInvocationResponseResponse>({
message: { case: 'rpcMethodInvocationResponse', value: req },
});

if (res.error) {
console.warn(`error sending rpc method invocation response: ${res.error}`);
}
this.room?.unregisterRpcMethod(method);
}
}

Expand Down
Loading
Loading