From Polling to PubSub: Building an Asynchronous OPC UA Stack in Python
The architectural design and engineering required to build a native, asynchronous OPC UA Pub/Sub (IEC 62541-14) stack in Python for the open-source opcua-asyncio library.
Join the DZone community and get the full member experience.
Join For FreeIndustrial control systems are generating more data than ever before, but the Python tooling used to process this telemetry often encounters severe performance constraints. Traditional OPC UA libraries are built around synchronous, polling-based Client and Server architectures. When industrial networks scale to thousands of sensors broadcasting high-frequency data, these synchronous Python implementations choke. To handle this modern many-to-many topology, developers need a native Publisher and Subscriber solution that does not block the execution thread while waiting for network packets.
For Python developers unfamiliar with industrial protocols, OPC UA PubSub (IEC 62541-14) is a standard that decouples data producers from consumers by allowing devices to broadcast telemetry via stateless middleware like UDP Multicast. For industrial engineers new to Python concurrency, asyncio is a standard library that uses an event loop to handle thousands of simultaneous network operations concurrently without the heavy overhead of traditional threading.
Bridging these two paradigms requires a completely non-blocking architecture. To address this gap, a complete asyncio driven OPC UA PubSub implementation was architected and integrated into the open source opcua-asyncio library (merged in Commit 2b6f3e5). Implementing this standard from scratch in an asynchronous Python environment presented unique challenges. This article breaks down the engineering decisions and technical design patterns used to build this extension. By contributing this capability to a library that serves thousands of developers in the Python IIoT ecosystem, the goal is to ensure engineers can now build highly scalable publisher and subscriber sensor networks without migrating away from Python.
The Shift to Publisher and Subscriber in IIoT
In traditional OPC UA, a client polls a server or sets up monitored items. This creates a tightly coupled, connection-oriented topology. The PubSub extension decouples this by allowing publishers to broadcast telemetry data via stateless middleware like UDP Multicast or MQTT, which subscribers can passively ingest.
To bring this to the opcua-asyncio ecosystem, the architecture needed to bridge the gap between Python's asynchronous event loop and the highly deterministic, byte-packed UADP (OPC UA Datagram Protocol) structures. The design was broken down into four core pillars.
- Asynchronous transport layer: Managing non-blocking UDP and IP multicast.
- UADP binary protocol engine: Bit-level packing and unpacking of network messages.
- Data abstraction and node mapping: Linking arbitrary network payloads to the OPC UA Address Space.
- Concurrency and connection management: Orchestrating readers, writers, and tasks via
asyncio.

Pillar 1: The Asynchronous UDP Transport Layer
OPC UA UADP relies on UDP for low-latency transmission. In Python, synchronous socket operations block the main thread, which is fatal to an asyncio application.
To solve this, the networking layer was built directly on top of asyncio.DatagramProtocol. The OpcUdp class overrides the standard protocol callbacks to bridge the network socket with the PubSub receiver logic.
Here is a look at how the protocol was extended and hooked into the event loop to ensure incoming datagrams never block the main thread.
class OpcUdp(asyncio.DatagramProtocol):
def __init__(self, cfg: UdpSettings, receiver: Optional[PubSubReceiver], publisher_id: Variant) -> None:
super().__init__()
self.cfg = cfg
self.receiver = receiver
self.publisher_id = publisher_id.Value
def datagram_received(self, data: bytes, source: Tuple[str, int]) -> None:
try:
buffer = Buffer(data)
msg = UadpNetworkMessage.from_binary(buffer)
if self.receiver is not None:
asyncio.ensure_future(self.receiver.got_uadp(msg))
except Exception:
logging.exception("Received Invalid UadpPacket")
- Socket lifecycle: The
UdpSettingsclass manages socket creation by carefully applyingSO_REUSEADDRand handling both IPv4 (AF_INET) and IPv6 (AF_INET6) multicasting. - Multicast configuration: Depending on the IP family,
IP_ADD_MEMBERSHIPorIPV6_JOIN_GROUPare injected directly into the socket options via thestructmodule to ensure the application correctly subscribes to IGMP or MLD groups. - Non-blocking reception: When a datagram hits the interface,
datagram_receivedimmediately passes the raw bytes to the UADP decoding engine and dispatches the resulting parsed message to a background task usingasyncio.ensure_future(). This guarantees the networking thread is instantly freed to handle the next packet.
Pillar 2: The UADP Binary Protocol Engine
The UADP specification defines an extremely dense, highly variable network packet. Headers can dynamically expand or contract based on a series of bit flags. Processing this in Python requires rigorous byte manipulation to maintain both memory efficiency and processing speed.
The uadp.py implementation utilizes Python's enum.IntFlag to map the exact bitwise schemas defined in OPC UA Part 14.
class MessageHeaderFlags(IntFlag):
NONE = 0
UADP_VERSION_BIT0 = 0b1
PUBLISHER_ID = 0b00010000
GROUP_HEADER = 0b00100000
PAYLOAD_HEADER = 0b01000000
EXTENDED_FLAGS_1 = 0b10000000
# FlagsExtend1
PUBLISHER_ID_UINT16 = 0b0000000100000000
PUBLISHER_ID_UINT32 = 0b0000001000000000
PUBLISHER_ID_UINT64 = 0b0000011000000000
PUBLISHER_ID_STRING = 0b0000010000000000
- Flag-driven serialization: The
UadpHeaderandUadpDataSetMessageHeaderare deeply nested and conditional. For example, the Extended Flags dictate whether aPublisherIdis encoded as a Byte, UInt16, UInt32, UInt64, or String. - Bitwise extensibility: The implementation cascades flags using
EXTENDED_FLAGS_1andEXTENDED_FLAGS_2bits. If the integer value of the required flags exceeds 0xFF, the engine dynamically shifts the bytes and appends the extension flags. - Binary packing: A standardized
Primitivesunpacking utility translates the raw buffer directly into strictly typed Python objects likeUInt32,Guid, orDateTime. This avoids the overhead of intermediate object instantiation when parsing high-frequency sensor data. - Delta Frames vs. raw data: The parser dynamically routes payload deserialization based on
MessageDataSetFlags. It distinguishes between Key Frames, Delta Frames, and Raw Data while packing the resulting genericDataValuestructs into a unifiedUadpNetworkMessage.
Pillar 3: Data Abstraction and Address Space Integration
Receiving data is only half the battle because that data must meaningfully map to the server's Address Space. The architecture introduces PubSubInformationModel to handle this synchronization.
- Datasets and metadata: A
PublishedDataSetdefines the structure of the data being transmitted. This includes trackingFieldMetaData, built in types, and value ranks. - Dynamic variable substitution: The
PubSubDataSourceServerclass abstracts the retrieval of data from the server tree. It safely reads attributes and falls back to aSubstituteValueif a node status code is bad. This ensures unbroken telemetric streams. - Subscribed mirrors: When an OPC UA client acts as a subscriber, it needs to see the incoming data reflected in its own node tree. The
SubscribedDataSetMirrordynamically creates new variable nodes on the fly to match the incomingDataSetMetaData.
This dynamic node mapping was engineered by injecting new variables straight into the server tree based on the metadata specification.
async def _create_and_set_node(self, f: FieldMetaData):
if self._node is None:
raise RuntimeError("SubscribedDataSetMirror._node is not initialized.")
n = await self._node.add_variable(
NodeId(NamespaceIndex=Int16(1)), "1:" + str(f.Name), Variant(), datatype=f.DataType
)
await n.write_attribute(AttributeIds.Description, f.Description)
await n.write_attribute(AttributeIds.ValueRank, f.ValueRank)
await n.write_attribute(AttributeIds.ArrayDimensions, f.ArrayDimensions)
return n
- Target variables: Alternatively,
SubScribedTargetVariablesmaps incoming dataset fields directly to existingNodeIdreferences in the server. These references update in real time as UDP packets are decoded.
Pillar 4: Concurrency and Connection Management
The top-level orchestration is handled by the PubSubConnection and PubSub classes. These act as the asynchronous lifecycle managers.
- Task gathering: When
start()is invoked on a connection, the lifecycle manager utilizesasyncio.gather()to concurrently spin up all associatedDataSetReaderandDataSetWritertasks without blocking the main OPC UA server loop.
async def start(self) -> None:
logging.info("Starting Connection %s", await self.get_name())
loop = asyncio.get_event_loop()
sock, _, _ = self._network_settings.create_socket()
self._transport, self._protocol = await loop.create_datagram_endpoint(
lambda: self._network_factory(self._network_settings, self._receiver, self._cfg.PublisherId),
sock=sock,
)
self._writer_tasks = asyncio.gather(*[writer.run(self._protocol, self._app) for writer in self._writer_groups])
reader_tasks = asyncio.gather(*[reader.start() for reader in self._reader_groups])
await reader_tasks
if self._protocol is not None:
self._protocol.set_receiver(self._receiver)
await self._set_state(PubSubState.Operational)
- Protocol decoupling: To prevent circular dependencies between the network transport and the information model, strict interfaces defined in
protocols.pyare used. The UDP protocol layer communicates with the logical layer strictly through these abstract protocols. - Wildcard routing and readers: The
ReaderGroupacts as an intelligent multiplexer. When a multi-payload UADP packet arrives, it analyzes theGroupHeaderandDataSetPayloadHeader. It then routes individualDataSetMessagesto the correctDataSetReaderinstances by matching wildcard filters. - Timeouts and state machines: Robust industrial systems must handle connection drops. The
DataSetReaderwraps its operation in a dedicated timeout task. Usingasyncio.wait_for(), it actively monitors forMessageReceiveTimeoutevents. If a heartbeat or payload is missed, it transitions the internalPubSubStatetoError. This allows higher-level application logic to gracefully degrade.
Conclusion
Building a production-ready OPC UA PubSub stack in Python requires harmonizing the stringent bit-packed demands of the IEC 62541-14 specification with the asynchronous paradigms of asyncio. By leveraging asyncio.DatagramProtocol for deterministic networking, abstracting the UADP bit flags into structured classes, and deeply integrating with the OPC UA Address space via mirrored target variables, this implementation provides a scalable foundation for modern IIoT architectures.
Code and Open Source Contributions
The architecture and implementation details discussed in this article were merged into the core FreeOpcUa/opcua-asyncio repository. You can explore the complete implementation, including the raw protocol parsing and asyncio abstractions, via the links below.
-
Primary commit:
2b6f3e5(Initial implementation of OPC UA PubSub UDP and UADP).
Key files to explore in the commit:
asyncua/pubsub/udp.py: Contains theOpcUdptransport layer and multicast socket configuration.asyncua/pubsub/uadp.py: Houses the flag driven serialization and binary protocol engine.asyncua/pubsub/connection.py: Demonstrates theasynciotask management and lifecycle orchestration.
Opinions expressed by DZone contributors are their own.
Comments