-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Description
When multiple NBirth messages are received simultaneously, only a subset of them are stored and made available via the certificates topic.
Steps to reproduce
- run broker with sparkplug aware extension
- have a client subscribed on
spBv1.0/groupId/NBIRTH/+
(for verification purposes) - send birth messages from multiple edge nodes in parallel
- have a client subscribed on
$sparkplug/certificates/spBv1.0/groupId/NBIRTH/+
Expected behavior
The messages, retrieved by subscribing on the certificates topic, are the same as the messages, received over the verification client (subscribed on spBv1.0/groupId/NBIRTH/+
)
Actual behavior
The messages, retrieved by subscribing on the certificates topic, are only a subset of the messages, received over the verification client
Further Information
If you have this reproduction setup but first subscribe on the certificates topic and then send the birth messages, you will get the same amount of messages as there should be, but some of them are duplicates and not all sent messages are included.
Example:
- subscribe on
spBv1.0/groupId/NBIRTH/+
(verification client) - subscribe on
$sparkplug/certificates/spBv1.0/groupId/NBIRTH/+
- send birth messages from edge nodes with names
node0
,node1
,node2
andnode3
in parallel
A possible result:
On the verification client, you get the message ofnode0
,node1
,node2
andnode3
.
The certificates topic received four messages as well, but the messages are fromnode1
,node1
,node1
andnode2
.
Now subscribing newly on the certificates topic again will only show you the birth message ofnode1
andnode2
Reproducer
Here a python script to trigger 10 edge node birth messages (but with empty payload to keep the script small). For me, this script cause duplicated messages (in most cases)
import paho.mqtt.client as mqtt
import resources.sparkplug_b_pb2 as spb # Generated from sparkplug_b.proto
import time
import threading
BROKER = "localhost"
PORT = 1883
TOPIC_TEMPLATE = "spBv1.0/ReadWrite/NBIRTH/EdgeNode{node_id}"
def create_nbirth_payload(node_id):
payload = spb.Payload()
payload.timestamp = int(time.time() * 1000)
payload.seq = 1 # Minimal sequence number
# Add a simple metric
metric = payload.metrics.add()
metric.name = "Node Control/Rebirth"
metric.timestamp = int(time.time() * 1000)
metric.datatype = 11
metric.boolean_value = False
return payload.SerializeToString()
def publish_nbirth(node_id):
client = mqtt.Client()
client.connect(BROKER, PORT, 60)
topic = TOPIC_TEMPLATE.format(node_id=node_id)
payload = create_nbirth_payload(node_id)
client.publish(topic, payload, qos=0, retain=True)
client.disconnect()
def main():
threads = []
node_count = 10 # Number of NBirth messages to publish
# Launch multiple threads to simulate simultaneous publishes
for i in range(node_count):
t = threading.Thread(target=publish_nbirth, args=(i,))
threads.append(t)
t.start()
# Wait for all threads to complete
for t in threads:
t.join()
print(f"Published {node_count} NBIRTH messages.")
if __name__ == "__main__":
main()