2
2
use_matlab_agent_interactive.py
3
3
4
4
Asynchronous RabbitMQ client for sending interactive simulation requests
5
- to MATLAB
5
+ to a MATLAB agent, with optional TLS and separate config/payload files.
6
6
"""
7
7
8
8
# pylint: disable=missing-module-docstring, missing-class-docstring,
9
9
# missing-function-docstring, too-many-instance-attributes,
10
10
# attribute-defined-outside-init
11
11
12
- import asyncio
13
12
import argparse
13
+ import asyncio
14
+ import ssl
14
15
import uuid
15
16
from typing import Any , Dict
17
+
16
18
import anyio
17
19
import yaml
18
20
from aio_pika import (
19
21
connect_robust ,
22
+ ExchangeType ,
20
23
Message ,
21
24
DeliveryMode ,
22
- ExchangeType ,
23
25
)
24
26
25
27
26
28
class InteractiveUsageMatlabAgent :
27
- """Asynchronous client for sending interactive simulation requests to MATLAB ."""
29
+ """Asynchronous client for sending interactive simulation requests."""
28
30
29
31
def __init__ (self , agent_id : str , destination_id : str ,
30
- config : Dict [str , Any ]):
32
+ rabbitmq_cfg : Dict [str , Any ]) -> None :
31
33
self .agent_id = agent_id
32
34
self .destination_id = destination_id
33
- self .config = config
35
+ self .cfg = rabbitmq_cfg
34
36
self .result_queue = f"Q.{ agent_id } .matlab.result"
35
37
36
- async def setup (self ):
37
- """Setup RabbitMQ connection, exchanges, and queues."""
38
- rabbit_cfg = self .config .get ("rabbitmq" , {})
38
+ async def setup (self ) -> None :
39
+ """Connect to RabbitMQ, declare exchanges/queues."""
40
+ tls_enabled : bool = bool (self .cfg .get ("tls" , False ))
41
+ port = self .cfg .get ("port" , 5671 if tls_enabled else 5672 )
42
+
43
+ ssl_ctx = None
44
+ if tls_enabled :
45
+ ssl_ctx = ssl .create_default_context ()
46
+ ssl_ctx .minimum_version = ssl .TLSVersion .TLSv1_2
47
+
39
48
self .connection = await connect_robust (
40
- host = rabbit_cfg .get ("host" , "localhost" ),
41
- port = rabbit_cfg .get ("port" , 5672 ),
42
- virtualhost = rabbit_cfg .get ("vhost" , "/" ),
43
- login = rabbit_cfg .get ("username" , "guest" ),
44
- password = rabbit_cfg .get ("password" , "guest" ),
49
+ host = self .cfg .get ("host" , "localhost" ),
50
+ port = port ,
51
+ virtualhost = self .cfg .get ("vhost" , "/" ),
52
+ login = self .cfg .get ("username" , "guest" ),
53
+ password = self .cfg .get ("password" , "guest" ),
54
+ heartbeat = self .cfg .get ("heartbeat" , 600 ),
55
+ ssl = tls_enabled ,
45
56
)
57
+
46
58
self .channel = await self .connection .channel ()
47
59
await self .channel .set_qos (prefetch_count = 1 )
60
+
61
+ # Exchanges
48
62
self .ex_bridge = await self .channel .declare_exchange (
49
63
"ex.bridge.output" , ExchangeType .TOPIC , durable = True
50
64
)
@@ -55,47 +69,45 @@ async def setup(self):
55
69
"ex.input.stream" , ExchangeType .TOPIC , durable = True
56
70
)
57
71
58
- self .queue = await self .channel .declare_queue (self .result_queue , durable = True )
72
+ # Result queue/binding
73
+ self .queue = await self .channel .declare_queue (
74
+ self .result_queue , durable = True
75
+ )
59
76
await self .queue .bind (
60
- self .ex_result , routing_key = f"{
61
- self .destination_id } .result.{
62
- self .agent_id } "
77
+ self .ex_result ,
78
+ routing_key = f"{ self .destination_id } .result.{ self .agent_id } " ,
63
79
)
64
80
65
81
async def send_initial_interactive_request (
66
82
self , payload : Dict [str , Any ], request_id : str
67
- ):
68
- """Send the initial interactive simulation request to MATLAB ."""
83
+ ) -> None :
84
+ """Publish the first message that kicks off the interactive sim ."""
69
85
payload ["simulation" ]["request_id" ] = request_id
70
- payload ["simulation" ].setdefault ("bridge_meta" , {})[
71
- "protocol" ] = "rabbitmq"
86
+ payload ["simulation" ].setdefault ("bridge_meta" , {})["protocol" ] = "rabbitmq"
72
87
73
- yaml_body = yaml .dump (payload , default_flow_style = False )
74
88
routing_key = f"{ self .agent_id } .{ self .destination_id } "
75
89
await self .ex_bridge .publish (
76
90
Message (
77
- body = yaml_body .encode (),
91
+ body = yaml . dump ( payload , default_flow_style = False ) .encode (),
78
92
delivery_mode = DeliveryMode .PERSISTENT ,
79
93
content_type = "application/x-yaml" ,
80
94
message_id = str (uuid .uuid4 ()),
81
95
),
82
96
routing_key = routing_key ,
83
97
)
84
- print (f"[INIT] Simulation request sent with request_id: { request_id } " )
98
+ print (f"[INIT] Sent interactive request (rk='{ routing_key } ') "
99
+ f"request_id={ request_id } " )
85
100
86
- async def stream_inputs (self , request_id : str , stream_key : str ):
87
- """Stream input frames to MATLAB for the interactive simulation ."""
88
- print (f"[INPUT STREAM] Sending input frames to { stream_key } ... " )
89
- for k in range (100 ):
101
+ async def stream_inputs (self , request_id : str , stream_key : str ) -> None :
102
+ """Continuously send input frames to MATLAB."""
103
+ print (f"[INPUT STREAM] Publishing frames on ' { stream_key } ' … " )
104
+ for k in range (10000 ):
90
105
t = k * 0.1
91
- vx = 1.0
92
- vy = 0.5
93
- x = vx * t
94
- y = vy * t
106
+ vx , vy = 1.0 , 0.5
95
107
frame = {
96
108
"simulation" : {
97
109
"request_id" : request_id ,
98
- "inputs" : {"t" : t , "x" : x , "y" : y , "vx" : vx , "vy" : vy },
110
+ "inputs" : {"t" : t , "x" : vx * t , "y" : vy * t , "vx" : vx , "vy" : vy },
99
111
}
100
112
}
101
113
await self .ex_stream .publish (
@@ -108,31 +120,40 @@ async def stream_inputs(self, request_id: str, stream_key: str):
108
120
)
109
121
await asyncio .sleep (0.1 )
110
122
111
- async def handle_results (self ):
112
- """Handle incoming results from MATLAB and print them."""
113
- async with self .queue .iterator () as queue_iter :
114
- async for message in queue_iter :
115
- async with message .process ():
116
- result = yaml .safe_load (message .body )
117
- print (f"\n [RESULT] Received: { result } " )
118
- print ("-" * 40 )
123
+ async def handle_results (self ) -> None :
124
+ """Consume results asynchronously and print them."""
125
+ async with self .queue .iterator () as q :
126
+ async for msg in q :
127
+ async with msg .process ():
128
+ result = yaml .safe_load (msg .body )
129
+ print (f"\n [RESULT] { result } \n " + "-" * 40 )
119
130
120
131
121
- async def main ():
122
- """Main entry point for the asynchronous MATLAB agent client."""
123
- parser = argparse .ArgumentParser ()
132
+ async def main () -> None :
133
+ parser = argparse .ArgumentParser (description = "MATLAB interactive client" )
134
+ parser .add_argument (
135
+ "--config" ,
136
+ "-c" ,
137
+ default = "use.yaml" ,
138
+ help = "YAML with RabbitMQ connection settings (default: use.yaml)" ,
139
+ )
124
140
parser .add_argument (
125
141
"--api-payload" ,
126
- type = str ,
127
- default = "../api/simulation.yaml" ,
142
+ "-p" ,
143
+ default = "simulation.yaml" ,
144
+ help = "YAML simulation payload to send (default: simulation.yaml)" ,
128
145
)
129
146
args = parser .parse_args ()
130
147
131
- async with await anyio .open_file (args .api_payload , "r" , encoding = "utf-8" ) as f :
132
- payload = yaml .safe_load (await f .read ())
148
+ # --- Load config & payload
149
+ async with await anyio .open_file (args .config , "r" , encoding = "utf-8" ) as f_cfg :
150
+ rabbit_cfg = yaml .safe_load (await f_cfg .read ()).get ("rabbitmq" , {})
151
+
152
+ async with await anyio .open_file (args .api_payload , "r" , encoding = "utf-8" ) as f_pl :
153
+ payload = yaml .safe_load (await f_pl .read ())
133
154
134
- config = payload . get ( "config" , {})
135
- client = InteractiveUsageMatlabAgent ("dt" , "matlab" , config )
155
+ # --- Client
156
+ client = InteractiveUsageMatlabAgent ("dt" , "matlab" , rabbit_cfg )
136
157
137
158
request_id = str (uuid .uuid4 ())
138
159
stream_source = payload ["simulation" ]["inputs" ]["stream_source" ]
@@ -141,7 +162,7 @@ async def main():
141
162
await client .setup ()
142
163
await client .send_initial_interactive_request (payload , request_id )
143
164
144
- # Start listener and streaming concurrently
165
+ # run listener & streamer concurrently
145
166
await asyncio .gather (
146
167
client .handle_results (),
147
168
client .stream_inputs (request_id , stream_key ),
0 commit comments