DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • An AI-Driven Architecture for Autonomous Network Operations (NetOps)
  • Extracting Clean Excel Tables From PDFs Using Python + Docling
  • Python Development With Asynchronous SQLite and PostgreSQL
  • Python Async/Sync: Advanced Blocking Detection and Best Practices (Part 2)

Trending

  • I Built a VS Code Extension to Debug Azure AI Foundry Agents Without Leaving My Editor
  • Reducing Alert Fatigue in the SOC Using Correlation Rules and Detection-as-Code
  • Implementing the Planning Pattern With Java Enterprise and LangChain4j
  • Data Pipeline Observability: Why Your AI Model Fails in Production
  1. DZone
  2. Data Engineering
  3. IoT
  4. From Polling to PubSub: Building an Asynchronous OPC UA Stack in Python

From Polling to PubSub: Building an Asynchronous OPC UA Stack in Python

The architectural design and engineering required to build a native, asynchronous OPC UA Pub/Sub (IEC 62541-14) stack in Python for the open-source opcua-asyncio library.

By 
Harshith Narasimhan Srivatsa user avatar
Harshith Narasimhan Srivatsa
·
Jul. 03, 26 · Analysis
Likes (0)
Comment
Save
Tweet
Share
66 Views

Join the DZone community and get the full member experience.

Join For Free

Industrial control systems are generating more data than ever before, but the Python tooling used to process this telemetry often encounters severe performance constraints. Traditional OPC UA libraries are built around synchronous, polling-based Client and Server architectures. When industrial networks scale to thousands of sensors broadcasting high-frequency data, these synchronous Python implementations choke. To handle this modern many-to-many topology, developers need a native Publisher and Subscriber solution that does not block the execution thread while waiting for network packets.

For Python developers unfamiliar with industrial protocols, OPC UA PubSub (IEC 62541-14) is a standard that decouples data producers from consumers by allowing devices to broadcast telemetry via stateless middleware like UDP Multicast. For industrial engineers new to Python concurrency, asyncio is a standard library that uses an event loop to handle thousands of simultaneous network operations concurrently without the heavy overhead of traditional threading.

Bridging these two paradigms requires a completely non-blocking architecture. To address this gap, a complete asyncio driven OPC UA PubSub implementation was architected and integrated into the open source opcua-asyncio library (merged in Commit 2b6f3e5). Implementing this standard from scratch in an asynchronous Python environment presented unique challenges. This article breaks down the engineering decisions and technical design patterns used to build this extension. By contributing this capability to a library that serves thousands of developers in the Python IIoT ecosystem, the goal is to ensure engineers can now build highly scalable publisher and subscriber sensor networks without migrating away from Python.

The Shift to Publisher and Subscriber in IIoT

In traditional OPC UA, a client polls a server or sets up monitored items. This creates a tightly coupled, connection-oriented topology. The PubSub extension decouples this by allowing publishers to broadcast telemetry data via stateless middleware like UDP Multicast or MQTT, which subscribers can passively ingest.

To bring this to the opcua-asyncio ecosystem, the architecture needed to bridge the gap between Python's asynchronous event loop and the highly deterministic, byte-packed UADP (OPC UA Datagram Protocol) structures. The design was broken down into four core pillars.

  1. Asynchronous transport layer: Managing non-blocking UDP and IP multicast.
  2. UADP binary protocol engine: Bit-level packing and unpacking of network messages.
  3. Data abstraction and node mapping: Linking arbitrary network payloads to the OPC UA Address Space.
  4. Concurrency and connection management: Orchestrating readers, writers, and tasks via asyncio.

Asynchronous OPC UA PubSub stack

Pillar 1: The Asynchronous UDP Transport Layer

OPC UA UADP relies on UDP for low-latency transmission. In Python, synchronous socket operations block the main thread, which is fatal to an asyncio application.

To solve this, the networking layer was built directly on top of asyncio.DatagramProtocol. The OpcUdp class overrides the standard protocol callbacks to bridge the network socket with the PubSub receiver logic.

Here is a look at how the protocol was extended and hooked into the event loop to ensure incoming datagrams never block the main thread.

Python
 
class OpcUdp(asyncio.DatagramProtocol):
    def __init__(self, cfg: UdpSettings, receiver: Optional[PubSubReceiver], publisher_id: Variant) -> None:
        super().__init__()
        self.cfg = cfg
        self.receiver = receiver
        self.publisher_id = publisher_id.Value

    def datagram_received(self, data: bytes, source: Tuple[str, int]) -> None:
        try:
            buffer = Buffer(data)
            msg = UadpNetworkMessage.from_binary(buffer)
            if self.receiver is not None:
                asyncio.ensure_future(self.receiver.got_uadp(msg))
        except Exception:
            logging.exception("Received Invalid UadpPacket")


  • Socket lifecycle: The UdpSettings class manages socket creation by carefully applying SO_REUSEADDR and handling both IPv4 (AF_INET) and IPv6 (AF_INET6) multicasting.
  • Multicast configuration: Depending on the IP family, IP_ADD_MEMBERSHIP or IPV6_JOIN_GROUP are injected directly into the socket options via the struct module to ensure the application correctly subscribes to IGMP or MLD groups.
  • Non-blocking reception: When a datagram hits the interface, datagram_received immediately passes the raw bytes to the UADP decoding engine and dispatches the resulting parsed message to a background task using asyncio.ensure_future(). This guarantees the networking thread is instantly freed to handle the next packet.

Pillar 2: The UADP Binary Protocol Engine

The UADP specification defines an extremely dense, highly variable network packet. Headers can dynamically expand or contract based on a series of bit flags. Processing this in Python requires rigorous byte manipulation to maintain both memory efficiency and processing speed.

The uadp.py implementation utilizes Python's enum.IntFlag to map the exact bitwise schemas defined in OPC UA Part 14.

Python
 
class MessageHeaderFlags(IntFlag):
    NONE = 0
    UADP_VERSION_BIT0 = 0b1 
    PUBLISHER_ID = 0b00010000 
    GROUP_HEADER = 0b00100000
    PAYLOAD_HEADER = 0b01000000
    EXTENDED_FLAGS_1 = 0b10000000
    
    # FlagsExtend1
    PUBLISHER_ID_UINT16 = 0b0000000100000000
    PUBLISHER_ID_UINT32 = 0b0000001000000000
    PUBLISHER_ID_UINT64 = 0b0000011000000000
    PUBLISHER_ID_STRING = 0b0000010000000000


  • Flag-driven serialization: The UadpHeader and UadpDataSetMessageHeader are deeply nested and conditional. For example, the Extended Flags dictate whether a PublisherId is encoded as a Byte, UInt16, UInt32, UInt64, or String.
  • Bitwise extensibility: The implementation cascades flags using EXTENDED_FLAGS_1 and EXTENDED_FLAGS_2 bits. If the integer value of the required flags exceeds 0xFF, the engine dynamically shifts the bytes and appends the extension flags.
  • Binary packing: A standardized Primitives unpacking utility translates the raw buffer directly into strictly typed Python objects like UInt32, Guid, or DateTime. This avoids the overhead of intermediate object instantiation when parsing high-frequency sensor data.
  • Delta Frames vs. raw data: The parser dynamically routes payload deserialization based on MessageDataSetFlags. It distinguishes between Key Frames, Delta Frames, and Raw Data while packing the resulting generic DataValue structs into a unified UadpNetworkMessage.

Pillar 3: Data Abstraction and Address Space Integration

Receiving data is only half the battle because that data must meaningfully map to the server's Address Space. The architecture introduces PubSubInformationModel to handle this synchronization.

  • Datasets and metadata: A PublishedDataSet defines the structure of the data being transmitted. This includes tracking FieldMetaData, built in types, and value ranks.
  • Dynamic variable substitution: The PubSubDataSourceServer class abstracts the retrieval of data from the server tree. It safely reads attributes and falls back to a SubstituteValue if a node status code is bad. This ensures unbroken telemetric streams.
  • Subscribed mirrors: When an OPC UA client acts as a subscriber, it needs to see the incoming data reflected in its own node tree. The SubscribedDataSetMirror dynamically creates new variable nodes on the fly to match the incoming DataSetMetaData.

This dynamic node mapping was engineered by injecting new variables straight into the server tree based on the metadata specification.

Python
 
async def _create_and_set_node(self, f: FieldMetaData):
    if self._node is None:
        raise RuntimeError("SubscribedDataSetMirror._node is not initialized.")
        
    n = await self._node.add_variable(
        NodeId(NamespaceIndex=Int16(1)), "1:" + str(f.Name), Variant(), datatype=f.DataType
    )
    await n.write_attribute(AttributeIds.Description, f.Description)
    await n.write_attribute(AttributeIds.ValueRank, f.ValueRank)
    await n.write_attribute(AttributeIds.ArrayDimensions, f.ArrayDimensions)
    return n


  • Target variables: Alternatively, SubScribedTargetVariables maps incoming dataset fields directly to existing NodeId references in the server. These references update in real time as UDP packets are decoded.

Pillar 4: Concurrency and Connection Management

The top-level orchestration is handled by the PubSubConnection and PubSub classes. These act as the asynchronous lifecycle managers.

  • Task gathering: When start() is invoked on a connection, the lifecycle manager utilizes asyncio.gather() to concurrently spin up all associated DataSetReader and DataSetWriter tasks without blocking the main OPC UA server loop.
Python
 
async def start(self) -> None:
    logging.info("Starting Connection %s", await self.get_name())
    loop = asyncio.get_event_loop()
    sock, _, _ = self._network_settings.create_socket()
    
    self._transport, self._protocol = await loop.create_datagram_endpoint(
        lambda: self._network_factory(self._network_settings, self._receiver, self._cfg.PublisherId),
        sock=sock,
    )
    
    self._writer_tasks = asyncio.gather(*[writer.run(self._protocol, self._app) for writer in self._writer_groups])
    reader_tasks = asyncio.gather(*[reader.start() for reader in self._reader_groups])
    await reader_tasks
    
    if self._protocol is not None:
        self._protocol.set_receiver(self._receiver)
    await self._set_state(PubSubState.Operational)


  • Protocol decoupling: To prevent circular dependencies between the network transport and the information model, strict interfaces defined in protocols.py are used. The UDP protocol layer communicates with the logical layer strictly through these abstract protocols.
  • Wildcard routing and readers: The ReaderGroup acts as an intelligent multiplexer. When a multi-payload UADP packet arrives, it analyzes the GroupHeader and DataSetPayloadHeader. It then routes individual DataSetMessages to the correct DataSetReader instances by matching wildcard filters.
  • Timeouts and state machines: Robust industrial systems must handle connection drops. The DataSetReader wraps its operation in a dedicated timeout task. Using asyncio.wait_for(), it actively monitors for MessageReceiveTimeout events. If a heartbeat or payload is missed, it transitions the internal PubSubState to Error. This allows higher-level application logic to gracefully degrade.

Conclusion

Building a production-ready OPC UA PubSub stack in Python requires harmonizing the stringent bit-packed demands of the IEC 62541-14 specification with the asynchronous paradigms of asyncio. By leveraging asyncio.DatagramProtocol for deterministic networking, abstracting the UADP bit flags into structured classes, and deeply integrating with the OPC UA Address space via mirrored target variables, this implementation provides a scalable foundation for modern IIoT architectures.

Code and Open Source Contributions

The architecture and implementation details discussed in this article were merged into the core FreeOpcUa/opcua-asyncio repository. You can explore the complete implementation, including the raw protocol parsing and asyncio abstractions, via the links below.

  • Primary commit: 2b6f3e5 (Initial implementation of OPC UA PubSub UDP and UADP).

Key files to explore in the commit:

  • asyncua/pubsub/udp.py: Contains the OpcUdp transport layer and multicast socket configuration.
  • asyncua/pubsub/uadp.py: Houses the flag driven serialization and binary protocol engine.
  • asyncua/pubsub/connection.py: Demonstrates the asyncio task management and lifecycle orchestration.
Event loop Open source Python (language) IoT

Opinions expressed by DZone contributors are their own.

Related

  • An AI-Driven Architecture for Autonomous Network Operations (NetOps)
  • Extracting Clean Excel Tables From PDFs Using Python + Docling
  • Python Development With Asynchronous SQLite and PostgreSQL
  • Python Async/Sync: Advanced Blocking Detection and Best Practices (Part 2)

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook