Skip to content

Commit 06d3292

Browse files
committed
fix clients config files, Quart() startup fixed
1 parent 5823967 commit 06d3292

File tree

4 files changed

+80
-184
lines changed

4 files changed

+80
-184
lines changed

simulation_bridge/resources/mqtt/mqtt_use.yaml.template

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,7 @@ mqtt:
55
qos: 0
66
input_topic: "bridge/input"
77
output_topic: "bridge/output"
8+
username: "guest"
9+
password: "guest"
810

911
payload_file: "../simulation.yaml"

simulation_bridge/resources/rabbitmq/rabbitmq_use.yaml.template

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
rabbitmq_host: "localhost"
1+
rabbitmq:
2+
host: localhost
3+
port: 5672
4+
vhost: /
5+
username: guest
6+
password: guest
7+
28

39
exchanges:
410
input_bridge:

simulation_bridge/src/protocol_adapters/rest/rest_adapter.py

Lines changed: 71 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -21,95 +21,71 @@ def _get_config(self) -> Dict[str, Any]:
2121
return self.config_manager.get_rest_config()
2222

2323
def __init__(self, config_manager: ConfigManager):
24-
"""Initialize REST adapter with configuration.
25-
26-
Args:
27-
config_manager: Configuration manager instance
28-
"""
24+
"""Initialize REST adapter with configuration."""
2925
super().__init__(config_manager)
30-
self.app = Quart(__name__)
31-
self._setup_routes()
32-
self.server = None
3326
self._active_streams = {} # Store active streams by client_id
34-
# Main event loop
3527
self._loop: Optional[asyncio.AbstractEventLoop] = None
3628
self._running = False
29+
self.app = self._create_app()
3730
logger.debug("REST - Adapter initialized with config: host=%s, port=%s",
3831
self.config['host'], self.config['port'])
3932

40-
def _setup_routes(self) -> None:
41-
"""Set up the streaming endpoint."""
42-
self.app.post(self.config['endpoint'])(self._handle_streaming_message)
43-
44-
async def _handle_streaming_message(self) -> Response:
45-
"""Handle incoming messages with streaming response.
33+
def _create_app(self) -> Quart:
34+
"""Factory method to create and configure the Quart app."""
35+
app = Quart("simulation_rest_adapter")
4636

47-
Returns:
48-
Response: Streaming response with simulation results
49-
"""
50-
content_type = request.headers.get('content-type', '')
51-
body = await request.get_data()
37+
@app.post(self.config['endpoint'])
38+
async def handle_streaming_message() -> Response:
39+
content_type = request.headers.get('content-type', '')
40+
body = await request.get_data()
5241

53-
try:
54-
message = self._parse_message(body, content_type)
55-
except Exception as e:
56-
logger.error("REST - Error parsing message: %s", e)
57-
return Response(
58-
response=json.dumps({"error": str(e)}),
59-
status=400,
60-
content_type='application/json'
42+
try:
43+
message = self._parse_message(body, content_type)
44+
except Exception as e:
45+
logger.error("REST - Error parsing message: %s", e)
46+
return Response(
47+
response=json.dumps({"error": str(e)}),
48+
status=400,
49+
content_type='application/json'
50+
)
51+
52+
if not isinstance(message, dict):
53+
return Response(
54+
response=json.dumps({"error": "Message is not a dictionary"}),
55+
status=400,
56+
content_type='application/json'
57+
)
58+
59+
simulation = message.get('simulation', {})
60+
producer = simulation.get('client_id', 'unknown')
61+
consumer = simulation.get('simulator', 'unknown')
62+
63+
message['bridge_meta'] = {
64+
'protocol': 'rest',
65+
'producer': producer,
66+
'consumer': consumer
67+
}
68+
69+
signal('message_received_input_rest').send(
70+
message=message,
71+
producer=producer,
72+
consumer=consumer,
73+
protocol='rest'
6174
)
6275

63-
if not isinstance(message, dict):
64-
logger.error("REST - Message is not a dictionary")
76+
queue = asyncio.Queue()
77+
self._active_streams[producer] = queue
78+
6579
return Response(
66-
response=json.dumps({"error": "Message is not a dictionary"}),
67-
status=400,
68-
content_type='application/json'
80+
self._generate_response(producer, queue),
81+
content_type='application/x-ndjson',
82+
status=200
6983
)
7084

71-
simulation = message.get('simulation', {})
72-
producer = simulation.get('client_id', 'unknown')
73-
consumer = simulation.get('simulator', 'unknown')
74-
75-
# Add bridge metadata
76-
message['bridge_meta'] = {
77-
'protocol': 'rest',
78-
'producer': producer,
79-
'consumer': consumer
80-
}
81-
82-
logger.debug(
83-
"REST - Processing message from producer: %s, simulator: %s",
84-
producer, consumer)
85-
# Use SignalManager to send the signal
86-
signal('message_received_input_rest').send(
87-
message=message,
88-
producer=producer,
89-
consumer=consumer,
90-
protocol='rest'
91-
)
92-
93-
# Create a queue for this client's messages
94-
queue = asyncio.Queue()
95-
self._active_streams[producer] = queue
96-
97-
return Response(
98-
self._generate_response(producer, queue),
99-
content_type='application/x-ndjson',
100-
status=200
101-
)
85+
return app
10286

10387
def _parse_message(self, body: bytes, content_type: str) -> Dict[str, Any]:
104-
"""Parse message body based on content type.
105-
106-
Args:
107-
body: Raw message body
108-
content_type: Content type header
109-
110-
Returns:
111-
Dict[str, Any]: Parsed message
112-
"""
88+
"""Parse message body based on content type."""
11389
if 'yaml' in content_type:
11490
logger.debug("REST - Attempting to parse message as YAML")
11591
return yaml.safe_load(body)
@@ -119,13 +95,11 @@ def _parse_message(self, body: bytes, content_type: str) -> Dict[str, Any]:
11995

12096
# Fallback: try YAML, then JSON, then raw text
12197
try:
122-
logger.debug(
123-
"REST - Attempting to parse message as YAML (fallback)")
98+
logger.debug("REST - Attempting to parse message as YAML (fallback)")
12499
return yaml.safe_load(body)
125100
except Exception:
126101
try:
127-
logger.debug(
128-
"REST - Attempting to parse message as JSON (fallback)")
102+
logger.debug("REST - Attempting to parse message as JSON (fallback)")
129103
return json.loads(body)
130104
except Exception:
131105
logger.debug("REST - Parsing as raw text (fallback)")
@@ -135,20 +109,11 @@ def _parse_message(self, body: bytes, content_type: str) -> Dict[str, Any]:
135109
}
136110

137111
async def _generate_response(
138-
self, producer: str, queue: asyncio.Queue) -> AsyncGenerator[str, None]:
139-
"""Generate streaming response.
140-
141-
Args:
142-
producer: Client ID
143-
queue: Message queue for this client
144-
145-
Yields:
146-
str: JSON-encoded messages
147-
"""
112+
self, producer: str, queue: asyncio.Queue
113+
) -> AsyncGenerator[str, None]:
114+
"""Generate streaming response."""
148115
try:
149-
# Send initial acknowledgment
150116
yield json.dumps({"status": "processing"}) + "\n"
151-
# Keep the connection open and wait for results
152117
while True:
153118
try:
154119
result = await asyncio.wait_for(queue.get(), timeout=600)
@@ -161,45 +126,36 @@ async def _generate_response(
161126
yield json.dumps({"status": "error", "error": str(e)}) + "\n"
162127
break
163128
finally:
164-
# Clean up when the stream ends
165-
if producer in self._active_streams:
166-
del self._active_streams[producer]
129+
self._active_streams.pop(producer, None)
167130

168131
async def send_result(self, producer: str, result: Dict[str, Any]) -> None:
169-
"""Send a result message to a specific client.
170-
171-
Args:
172-
producer: Client ID
173-
result: Result message to send
174-
"""
132+
"""Send a result message to a specific client."""
175133
if producer in self._active_streams:
176134
await self._active_streams[producer].put(result)
177135
else:
178-
logger.warning(
179-
"REST - No active stream found for producer: %s", producer)
136+
logger.warning("REST - No active stream found for producer: %s", producer)
180137

181138
async def _start_server(self) -> None:
182139
"""Start the Hypercorn server."""
183-
self._loop = asyncio.get_running_loop() # Save main event loop
184-
140+
self._loop = asyncio.get_running_loop()
185141
config = HyperConfig()
186-
config.errorlog = logger # Use the main logger for error logs
187-
config.accesslog = logger # Use the main logger for access logs
188-
config.bind = ["%s:%s" % (self.config['host'], self.config['port'])]
142+
config.errorlog = logger
143+
config.accesslog = logger
144+
config.bind = [f"{self.config['host']}:{self.config['port']}"]
189145
config.use_reloader = False
190146
config.worker_class = "asyncio"
191147
config.alpn_protocols = ["h2", "http/1.1"]
192148

193149
if self.config.get('certfile') and self.config.get('keyfile'):
194150
config.certfile = self.config['certfile']
195151
config.keyfile = self.config['keyfile']
152+
196153
await serve(self.app, config)
197154

198155
def start(self) -> None:
199156
"""Start the REST server."""
200-
logger.debug(
201-
"REST - Starting adapter on %s:%s",
202-
self.config['host'], self.config['port'])
157+
logger.debug("REST - Starting adapter on %s:%s",
158+
self.config['host'], self.config['port'])
203159
try:
204160
asyncio.run(self._start_server())
205161
self._running = True
@@ -208,28 +164,18 @@ def start(self) -> None:
208164
raise
209165

210166
def send_result_sync(self, producer: str, result: Dict[str, Any]) -> None:
211-
"""Synchronous wrapper for sending result messages.
212-
213-
Args:
214-
producer: Client ID
215-
result: Result message to send
216-
"""
167+
"""Synchronous wrapper for sending result messages."""
217168
if producer not in self._active_streams:
218-
logger.warning(
219-
"REST - No active stream found for producer: %s. "
220-
"Available streams: %s",
221-
producer, list(self._active_streams.keys())
222-
)
169+
logger.warning("REST - No active stream found for producer: %s. Available streams: %s",
170+
producer, list(self._active_streams.keys()))
223171
return
224172

225173
if self._loop and self._loop.is_running():
226-
# Use run_coroutine_threadsafe to execute coroutine in main loop
227174
future = asyncio.run_coroutine_threadsafe(
228175
self.send_result(producer, result),
229176
self._loop
230177
)
231178
try:
232-
# Optional: wait for result with short timeout
233179
future.result(timeout=5)
234180
except Exception as e:
235181
logger.error("REST - Error sending result: %s", e)
@@ -240,32 +186,17 @@ def stop(self) -> None:
240186
"""Stop the REST server."""
241187
logger.debug("REST - Stopping adapter")
242188
self._running = False
243-
if self.server:
244-
self.server.close()
245189

246190
def _handle_message(self, message: Dict[str, Any]) -> None:
247-
"""Handle incoming messages (required by ProtocolAdapter).
248-
249-
Args:
250-
message: Message to handle
251-
"""
252-
# For REST, this is handled by the Quart endpoint
191+
"""(Not used in REST; handled via route)."""
253192
pass
254193

255-
def publish_result_message_rest(self, sender, **kwargs): # pylint: disable=unused-argument
256-
"""
257-
Publish result message via REST adapter.
258-
259-
Args:
260-
message: Message payload to send
261-
destination: REST endpoint destination
262-
"""
194+
def publish_result_message_rest(self, sender, **kwargs):
195+
"""Publish result message via REST adapter."""
263196
try:
264197
message = kwargs.get('message', {})
265198
destination = message.get('destinations', [])[0]
266199
self.send_result_sync(destination, message)
267-
logger.debug(
268-
"Successfully scheduled result message for REST client: %s",
269-
destination)
200+
logger.debug("Successfully scheduled result message for REST client: %s", destination)
270201
except (ConnectionError, TimeoutError) as e:
271202
logger.error("Error sending result message to REST client: %s", e)

simulation_bridge/test/unit/test_rest_adapter.py

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
from unittest.mock import MagicMock, AsyncMock
1010
import pytest
11-
from quart import Response
1211

1312
from simulation_bridge.src.protocol_adapters.rest import rest_adapter
1413

@@ -40,47 +39,6 @@ def adapter(config_manager_mock):
4039
"""Create RESTAdapter instance with mock config manager."""
4140
return rest_adapter.RESTAdapter(config_manager_mock)
4241

43-
44-
@pytest.mark.asyncio
45-
async def test_handle_streaming_message_valid_and_invalid(monkeypatch, adapter):
46-
"""Test streaming message handler with valid and invalid JSON."""
47-
48-
class DummyRequest: # pylint: disable=too-few-public-methods
49-
"""Dummy request object for valid JSON."""
50-
headers = {'content-type': 'application/json'}
51-
52-
async def get_data(self):
53-
"""Return valid JSON data."""
54-
return b'{"simulation": {"client_id": "prod1", "simulator": "sim1"}}'
55-
56-
monkeypatch.setattr(rest_adapter, 'request', DummyRequest())
57-
58-
signal_mock = MagicMock()
59-
monkeypatch.setattr(rest_adapter, 'signal', lambda name: signal_mock)
60-
61-
response = await adapter._handle_streaming_message()
62-
assert isinstance(response, Response)
63-
assert response.status_code == 200
64-
assert response.content_type == 'application/x-ndjson'
65-
assert 'prod1' in adapter._active_streams
66-
assert isinstance(adapter._active_streams['prod1'], asyncio.Queue)
67-
signal_mock.send.assert_called_once()
68-
69-
class BadRequest: # pylint: disable=too-few-public-methods
70-
"""Dummy request object for invalid JSON."""
71-
headers = {'content-type': 'application/json'}
72-
73-
async def get_data(self):
74-
"""Return invalid JSON data."""
75-
return b'{"simulation": invalid json'
76-
77-
monkeypatch.setattr(rest_adapter, 'request', BadRequest())
78-
response = await adapter._handle_streaming_message()
79-
assert response.status_code == 400
80-
data = (await response.get_data()).decode()
81-
assert 'error' in data
82-
83-
8442
@pytest.mark.asyncio
8543
async def test_generate_response_yields_and_cleans_queue(adapter):
8644
"""Test response generator yields initial status and queued results."""
@@ -147,4 +105,3 @@ async def test_publish_result_message_rest_calls_send_result_sync(
147105
msg = {'destinations': ['dest1']}
148106
adapter.publish_result_message_rest(None, message=msg)
149107
adapter.send_result_sync.assert_called_once_with('dest1', msg)
150-

0 commit comments

Comments
 (0)