Skip to content

Incomplete handling of simultaneous birth messages #68

@Cedesc

Description

@Cedesc

Description

When multiple NBirth messages are received simultaneously, only a subset of them are stored and made available via the certificates topic.

Steps to reproduce

  • run broker with sparkplug aware extension
  • have a client subscribed on spBv1.0/groupId/NBIRTH/+ (for verification purposes)
  • send birth messages from multiple edge nodes in parallel
  • have a client subscribed on $sparkplug/certificates/spBv1.0/groupId/NBIRTH/+

Expected behavior

The messages, retrieved by subscribing on the certificates topic, are the same as the messages, received over the verification client (subscribed on spBv1.0/groupId/NBIRTH/+)

Actual behavior

The messages, retrieved by subscribing on the certificates topic, are only a subset of the messages, received over the verification client

Further Information

If you have this reproduction setup but first subscribe on the certificates topic and then send the birth messages, you will get the same amount of messages as there should be, but some of them are duplicates and not all sent messages are included.
Example:

  • subscribe on spBv1.0/groupId/NBIRTH/+ (verification client)
  • subscribe on $sparkplug/certificates/spBv1.0/groupId/NBIRTH/+
  • send birth messages from edge nodes with names node0, node1, node2 and node3 in parallel
    A possible result:
    On the verification client, you get the message of node0, node1, node2 and node3.
    The certificates topic received four messages as well, but the messages are from node1, node1, node1 and node2.
    Now subscribing newly on the certificates topic again will only show you the birth message of node1 and node2

Reproducer

Here a python script to trigger 10 edge node birth messages (but with empty payload to keep the script small). For me, this script cause duplicated messages (in most cases)

import paho.mqtt.client as mqtt
import resources.sparkplug_b_pb2 as spb  # Generated from sparkplug_b.proto
import time
import threading

BROKER = "localhost"
PORT = 1883
TOPIC_TEMPLATE = "spBv1.0/ReadWrite/NBIRTH/EdgeNode{node_id}"

def create_nbirth_payload(node_id):
    payload = spb.Payload()
    payload.timestamp = int(time.time() * 1000)
    payload.seq = 1  # Minimal sequence number

    # Add a simple metric
    metric = payload.metrics.add()
    metric.name = "Node Control/Rebirth"
    metric.timestamp = int(time.time() * 1000)
    metric.datatype = 11
    metric.boolean_value = False

    return payload.SerializeToString()

def publish_nbirth(node_id):
    client = mqtt.Client()
    client.connect(BROKER, PORT, 60)
    topic = TOPIC_TEMPLATE.format(node_id=node_id)
    payload = create_nbirth_payload(node_id)
    client.publish(topic, payload, qos=0, retain=True)
    client.disconnect()

def main():
    threads = []
    node_count = 10  # Number of NBirth messages to publish

    # Launch multiple threads to simulate simultaneous publishes
    for i in range(node_count):
        t = threading.Thread(target=publish_nbirth, args=(i,))
        threads.append(t)
        t.start()

    # Wait for all threads to complete
    for t in threads:
        t.join()

    print(f"Published {node_count} NBIRTH messages.")

if __name__ == "__main__":
    main()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions