22use_matlab_agent_interactive_async.py
33
44Asynchronous RabbitMQ client for sending interactive simulation requests
5- to MATLAB and handling input/output streaming with aio-pika and asyncio.
5+ to MATLAB
66"""
77
88# pylint: disable=missing-module-docstring, missing-class-docstring,
99# missing-function-docstring, too-many-instance-attributes,
1010# attribute-defined-outside-init
1111
12-
1312import asyncio
1413import argparse
1514import uuid
1615from typing import Any , Dict
16+ import anyio
1717import yaml
18- from aio_pika import connect_robust , Message , DeliveryMode , ExchangeType
18+ from aio_pika import (
19+ connect_robust ,
20+ Message ,
21+ DeliveryMode ,
22+ ExchangeType ,
23+ )
1924
2025
2126class AsyncInteractiveMatlabClient :
22- "Asynchronous client for sending interactive simulation requests to MATLAB."
27+ """ Asynchronous client for sending interactive simulation requests to MATLAB."" "
2328
24- def __init__ (self , agent_id : str , destination_id : str ,
25- config : Dict [str , Any ]):
29+ def __init__ (self , agent_id : str , destination_id : str , config : Dict [str , Any ]):
2630 self .agent_id = agent_id
2731 self .destination_id = destination_id
2832 self .config = config
2933 self .result_queue = f"Q.{ agent_id } .matlab.result"
3034
3135 async def setup (self ):
32- "Setup RabbitMQ connection, exchanges, and queues."
36+ """ Setup RabbitMQ connection, exchanges, and queues."" "
3337 rabbit_cfg = self .config .get ("rabbitmq" , {})
3438 self .connection = await connect_robust (
3539 host = rabbit_cfg .get ("host" , "localhost" ),
@@ -41,22 +45,27 @@ async def setup(self):
4145 self .channel = await self .connection .channel ()
4246 await self .channel .set_qos (prefetch_count = 1 )
4347 self .ex_bridge = await self .channel .declare_exchange (
44- "ex.bridge.output" , ExchangeType .TOPIC , durable = True )
48+ "ex.bridge.output" , ExchangeType .TOPIC , durable = True
49+ )
4550 self .ex_result = await self .channel .declare_exchange (
46- "ex.sim.result" , ExchangeType .TOPIC , durable = True )
51+ "ex.sim.result" , ExchangeType .TOPIC , durable = True
52+ )
4753 self .ex_stream = await self .channel .declare_exchange (
48- "ex.input.stream" , ExchangeType .TOPIC , durable = True )
49- self .queue = await self .channel .declare_queue (
50- self .result_queue , durable = True )
54+ "ex.input.stream" , ExchangeType .TOPIC , durable = True
55+ )
56+
57+ self .queue = await self .channel .declare_queue (self .result_queue , durable = True )
5158 await self .queue .bind (
52- self .ex_result , routing_key = f"{ self .destination_id } .result.{ self .agent_id } " )
59+ self .ex_result , routing_key = f"{ self .destination_id } .result.{ self .agent_id } "
60+ )
5361
5462 async def send_initial_interactive_request (
55- self , payload : Dict [str , Any ], request_id : str ):
56- "Send the initial interactive simulation request to MATLAB."
63+ self , payload : Dict [str , Any ], request_id : str
64+ ):
65+ """Send the initial interactive simulation request to MATLAB."""
5766 payload ["simulation" ]["request_id" ] = request_id
58- payload ["simulation" ].setdefault ("bridge_meta" , {})[
59- "protocol" ] = "rabbitmq"
67+ payload ["simulation" ].setdefault ("bridge_meta" , {})["protocol" ] = "rabbitmq"
68+
6069 yaml_body = yaml .dump (payload , default_flow_style = False )
6170 routing_key = f"{ self .agent_id } .{ self .destination_id } "
6271 await self .ex_bridge .publish (
@@ -71,14 +80,14 @@ async def send_initial_interactive_request(
7180 print (f"[INIT] Simulation request sent with request_id: { request_id } " )
7281
7382 async def stream_inputs (self , request_id : str , stream_key : str ):
74- "Stream input frames to MATLAB for the interactive simulation."
83+ """ Stream input frames to MATLAB for the interactive simulation."" "
7584 print (f"[INPUT STREAM] Sending input frames to { stream_key } ..." )
7685 for k in range (100 ):
77- t = k * 0.1
86+ t = k * 0.1
7887 vx = 1.0
7988 vy = 0.5
80- x = vx * t
81- y = vy * t
89+ x = vx * t
90+ y = vy * t
8291 frame = {
8392 "simulation" : {
8493 "request_id" : request_id ,
@@ -96,7 +105,7 @@ async def stream_inputs(self, request_id: str, stream_key: str):
96105 await asyncio .sleep (0.1 )
97106
98107 async def handle_results (self ):
99- "Handle incoming results from MATLAB and print them."
108+ """ Handle incoming results from MATLAB and print them."" "
100109 async with self .queue .iterator () as queue_iter :
101110 async for message in queue_iter :
102111 async with message .process ():
@@ -106,16 +115,18 @@ async def handle_results(self):
106115
107116
108117async def main ():
109- "Main entry point for the asynchronous MATLAB agent client."
118+ """ Main entry point for the asynchronous MATLAB agent client."" "
110119 parser = argparse .ArgumentParser ()
111120 parser .add_argument (
112121 "--api-payload" ,
113122 type = str ,
114- default = "../api/simulation.yaml" )
123+ default = "../api/simulation.yaml" ,
124+ )
115125 args = parser .parse_args ()
116126
117- with open (args .api_payload , "r" , encoding = "utf-8" ) as f :
118- payload = yaml .safe_load (f )
127+
128+ async with await anyio .open_file (args .api_payload , "r" , encoding = "utf-8" ) as f :
129+ payload = yaml .safe_load (await f .read ())
119130
120131 config = payload .get ("config" , {})
121132 client = AsyncInteractiveMatlabClient ("dt" , "matlab" , config )
@@ -135,5 +146,4 @@ async def main():
135146
136147
137148if __name__ == "__main__" :
138- # Run the main function in an asyncio event loop
139149 asyncio .run (main ())
0 commit comments