Skip to content

Commit ead8c5d

Browse files
committed
add tuple 4 elements in performancemonitor
1 parent a31382e commit ead8c5d

File tree

8 files changed

+201
-93
lines changed

8 files changed

+201
-93
lines changed

performance/performance_analysis.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import pandas as pd
77

88

9-
DEFAULT_INPUT = Path("../performance_log/performance_metrics.csv")
9+
DEFAULT_INPUT = Path("../performance_log/performance_metrics_test.csv")
1010
DEFAULT_OUTPUT = Path("overhead_summary.csv")
1111
AVG_INPUT_OVERHEAD = "Avg Input Overhead"
1212
AVG_OUTPUT_OVERHEAD = "Avg Output Overhead"

simulation_bridge/src/core/bridge_core.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,15 @@ def handle_input_message(self, sender, **kwargs): # pylint: disable=unused-argu
142142
# Initialize performance monitor
143143
performance_monitor = PerformanceMonitor()
144144
message_dict = kwargs.get('message', {})
145+
producer = kwargs.get('producer', 'unknown')
146+
simulation_type = message_dict.get(
147+
'simulation', {}).get('type', 'unknown')
145148
protocol = kwargs.get('protocol', 'unknown')
146149
operation_id = message_dict.get(
147150
'simulation', {}).get(
148151
'request_id', 'unknown')
149-
performance_monitor.record_core_received_input(operation_id, protocol)
152+
performance_monitor.record_core_received_input(
153+
operation_id, protocol, producer, simulation_type)
150154
try:
151155
message = MessageModel.model_validate(message_dict)
152156
except Exception as e: # pylint: disable=broad-exception-caught
@@ -177,6 +181,9 @@ def handle_result_rabbitmq_message(self, sender, **kwargs): # pylint: disable=u
177181
# Initialize performance monitor
178182
performance_monitor = PerformanceMonitor()
179183
message = kwargs.get('message', {})
184+
destinations = message.get('destinations', [])
185+
destination = destinations[0] if destinations else 'unknown'
186+
simulation_type = message.get('simulation', {}).get('type', 'unknown')
180187
producer = message.get('source', 'unknown')
181188
consumer = "result"
182189
operation_id = message.get('request_id', 'unknown')
@@ -188,9 +195,11 @@ def handle_result_rabbitmq_message(self, sender, **kwargs): # pylint: disable=u
188195
protocol='rabbitmq',
189196
operation_id=operation_id)
190197
status = message.get('status', 'unknown')
191-
performance_monitor.record_result_sent(operation_id, 'rabbitmq')
198+
performance_monitor.record_result_sent(
199+
operation_id, 'rabbitmq', destination, simulation_type)
192200
if status == 'completed':
193-
performance_monitor.finalize_operation(operation_id, 'rabbitmq')
201+
performance_monitor.finalize_operation(
202+
operation_id, 'rabbitmq', destination, simulation_type)
194203

195204
def handle_result_unknown_message(self, sender, **kwargs): # pylint: disable=unused-argument
196205
"""
@@ -222,7 +231,7 @@ def _publish_message(self, producer, consumer, message, # pylint: disable=too-m
222231

223232
# Initialize performance monitor
224233
performance_monitor = PerformanceMonitor()
225-
234+
simulation_type = message.get('simulation', {}).get('type', 'unknown')
226235
routing_key = f"{producer}.{consumer}"
227236
message['simulation']['bridge_meta'] = {
228237
'protocol': protocol
@@ -242,7 +251,7 @@ def _publish_message(self, producer, consumer, message, # pylint: disable=too-m
242251
# Record sent input time in performance monitor
243252
if exchange == 'ex.bridge.output':
244253
performance_monitor.record_core_sent_input(
245-
operation_id, protocol)
254+
operation_id, protocol, producer, simulation_type)
246255
except (pika.exceptions.AMQPConnectionError,
247256
pika.exceptions.AMQPChannelError) as e:
248257
logger.error("RabbitMQ connection error: %s", e)

simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,12 @@ def on_message(self, client, userdata, msg):
138138
producer = simulation.get('client_id', 'unknown')
139139
consumer = simulation.get('simulator', 'unknown')
140140
operation_id = simulation.get('request_id', 'unknown')
141-
141+
simulation_type = simulation.get('type', 'unknown')
142142
# Process message directly - no need for queuing
143143
logger.debug(
144144
"MQTT - Processing message %s, from producer: %s, simulator: %s",
145145
message, producer, consumer)
146146

147-
simulation_type = simulation.get('type', 'unknown')
148147
performance_monitor.start_operation(
149148
operation_id,
150149
client_id=producer,
@@ -228,6 +227,11 @@ def send_result(self, message):
228227
# Initialize performance monitor
229228
performance_monitor = PerformanceMonitor()
230229
operation_id = message.get('request_id', 'unknown')
230+
destinations = message.get('destinations', [])
231+
producer = destinations[0] if destinations else 'unknown'
232+
simulation_type = message.get(
233+
'simulation', {}).get(
234+
'type', 'unknown')
231235
output_topic = self.mqtt_config['output_topic']
232236
self.mqtt_client.publish(
233237
topic=output_topic,
@@ -237,9 +241,11 @@ def send_result(self, message):
237241
logger.debug(
238242
"Message published to MQTT topic '%s': %s", output_topic, message)
239243
status = message.get('status', 'unknown')
240-
performance_monitor.record_result_sent(operation_id, 'mqtt')
244+
performance_monitor.record_result_sent(
245+
operation_id, 'mqtt', producer, simulation_type)
241246
if status == 'completed':
242-
performance_monitor.finalize_operation(operation_id, 'mqtt')
247+
performance_monitor.finalize_operation(
248+
operation_id, 'mqtt', producer, simulation_type)
243249
except (ConnectionError, TimeoutError) as e:
244250
logger.error("Error publishing MQTT message: %s", e)
245251

simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ def _process_message(self, ch, method, properties, body, queue_name):
154154
)
155155
elif queue_name == 'Q.bridge.result':
156156
operation_id = message.get('request_id', 'unknown')
157+
destinations = message.get('destinations', [])
158+
producer = destinations[0] if destinations else 'unknown'
159+
simulation_type = message.get(
160+
'simulation', {}).get(
161+
'type', 'unknown')
157162
bridge_meta = message.get('bridge_meta', {})
158163
if isinstance(bridge_meta, str):
159164
if bridge_meta.strip().startswith('{'):
@@ -169,9 +174,7 @@ def _process_message(self, ch, method, properties, body, queue_name):
169174
bridge_meta = {}
170175
protocol = bridge_meta.get('protocol', 'unknown')
171176
performance_monitor.record_core_received_result(
172-
operation_id, protocol)
173-
destinations = message.get('destinations', [])
174-
producer = destinations[0] if destinations else 'unknown'
177+
operation_id, protocol, producer, simulation_type)
175178
consumer = message.get('source', 'unknown')
176179
kwargs = {
177180
"message": message,

simulation_bridge/src/protocol_adapters/rest/rest_adapter.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,17 @@ def publish_result_message_rest(self, sender, **kwargs):
225225
performance_monitor = PerformanceMonitor()
226226
message = kwargs.get('message', {})
227227
operation_id = message.get('request_id', 'unknown')
228+
simulation_type = message.get(
229+
'simulation', {}).get(
230+
'type', 'unknown')
228231
destination = message.get('destinations', [])[0]
229232
self.send_result_sync(destination, message)
230233
status = message.get('status', 'unknown')
231-
performance_monitor.record_result_sent(operation_id, 'rest')
234+
performance_monitor.record_result_sent(
235+
operation_id, 'rest', destination, simulation_type)
232236
if status == 'completed':
233-
performance_monitor.finalize_operation(operation_id, 'rest')
237+
performance_monitor.finalize_operation(
238+
operation_id, 'rest', destination, simulation_type)
234239
logger.debug(
235240
"Successfully scheduled result message for REST client: %s",
236241
destination)

0 commit comments

Comments
 (0)