DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modern Digital Website Security: Prepare to face any form of malicious web activity and enable your sites to optimally serve your customers.

Low-Code Development: Learn the concepts of low code, features + use cases for professional devs, and the low-code implementation process.

E-Commerce Development Essentials: Considering starting or working on an e-commerce business? Learn how to create a backend that scales.

Getting Started With Jenkins: Learn fundamentals that underpin CI/CD, how to create a pipeline, and when and where to use Jenkins.

Related

  • Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
  • How to Detect VPN Proxies With Python and IP2Location.io API
  • Building a Dynamic Chat Application: Setting up ChatGPT in FastAPI and Displaying Conversations in ReactJS
  • Optimizing Coroutine Execution

Trending

  • Modern Application Performance
  • JPA Criteria With Pagination
  • How to Write API Documentation: Best Practices and Examples
  • Optimizing API Lifecycles: A Comprehensive Guide for Product Managers
  1. DZone
  2. Coding
  3. Languages
  4. Pulsar in Python on Pi for Sensors

Pulsar in Python on Pi for Sensors

Utilizing Apache Pulsar's Python Client on Raspberry Pi - FLiP-Py Stack

Tim Spann user avatar by
Tim Spann
DZone Core CORE ·
Timothy Spann user avatar by
Timothy Spann
·
Feb. 27, 22 · Tutorial
Like (4)
Save
Tweet
Share
14.1K Views

Join the DZone community and get the full member experience.

Join For Free

I have a new Raspberry Pi with a Breakout Garden with a thermal camera, 1.12" OLED screen, and a CO2+ sensor.

We first need to install the Pulsar Python Client, if you are running on certain architectures you will need to compile the Apache Pulsar C++ Client first.

Python
 
pip3 install fastavro
pip3 install pytz
pip3 install pulsar-client[avro]


Our Python 3 App reads our sensor values and publishes the results as JSON Schema Records over Pulsar utilizing the Pulsar Client of Python.

Plain Text
 
root@thermal:/opt/demo# python3 thermal.py                                                                                                                                   
2022-02-22 19:41:44.380 INFO  [3069204864] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000                                   
2022-02-22 19:41:44.380 INFO  [3069204864] ConnectionPool:96 | Created connection for pulsar://pulsar1:6650                                                                  
2022-02-22 19:41:44.386 INFO  [3036730432] ClientConnection:368 | [192.168.1.204:34966 -> 192.168.1.230:6650] Connected to broker                                            
2022-02-22 19:41:44.390 INFO  [3036730432] HandlerBase:64 | [persistent://public/default/pi-thermal, ] Getting connection from pool                                          
2022-02-22 19:41:44.392 INFO  [3036730432] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000                                   
2022-02-22 19:41:44.392 INFO  [3036730432] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6650                                                                
2022-02-22 19:41:44.396 INFO  [3036730432] ClientConnection:370 | [192.168.1.204:34968 -> 192.168.1.230:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650                                                                                                                                                                  
2022-02-22 19:41:44.430 INFO  [3036730432] ProducerImpl:189 | [persistent://public/default/pi-thermal, ] Created producer on broker [192.168.1.204:34968 -> 192.168.1.230:6650] 
SCD4X, Serial: d3efd3efd3ef
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'wthr_uri_20220223004144', 'ipaddress': '192.168.1.204', 'cputempf': 108, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645576904.9345362', 'te': '0.0006937980651855469', 'cpu': 6.5, 'diskusage': '106326.3 MB', 'memory': 8.7, 'rowid': '20220223004144_b9de27fa-fc0b-46a0-8d1f-04664360f3b0', 'systemtime': '02/22/2022 19:41:50', 'ts': 1645576910, 'starttime': '02/22/2022 19:41:44', 'datetimestamp': '2022-02-23 00:41:49.613734+00:00', 'temperature': 28.4543, 'humidity': 28.6, 'co2': 670.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'wthr_qle_20220223004150', 'ipaddress': '192.168.1.204', 'cputempf': 108, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645576910.6230323', 'te': '0.0004811286926269531', 'cpu': 0.0, 'diskusage': '106326.3 MB', 'memory': 8.7, 'rowid': '20220223004150_040bc286-7778-4b87-8f20-abbe3028fe29', 'systemtime': '02/22/2022 19:41:55', 'ts': 1645576915, 'starttime': '02/22/2022 19:41:50', 'datetimestamp': '2022-02-23 00:41:54.384983+00:00', 'temperature': 27.9977, 'humidity': 29.2, 'co2': 683.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'wthr_sgh_20220223004155', 'ipaddress': '192.168.1.204', 'cputempf': 107, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645576915.400389', 'te': '0.0007336139678955078', 'cpu': 0.0, 'diskusage': '106326.3 MB', 'memory': 8.7, 'rowid': '20220223004155_1a7c21b3-3e0b-48c9-8c3c-73e11b538602', 'systemtime': '02/22/2022 19:42:00', 'ts': 1645576920, 'starttime': '02/22/2022 19:41:55', 'datetimestamp': '2022-02-23 00:41:59.164362+00:00', 'temperature': 27.7413, 'humidity': 29.66, 'co2': 682.0}
^C2022-02-22 19:42:02.086 INFO  [3069204864] ClientImpl:495 | Closing Pulsar client with 1 producers and 0 consumers
2022-02-22 19:42:02.086 INFO  [3069204864] ProducerImpl:686 | [persistent://public/default/pi-thermal, standalone-1-2217] Closing producer for topic persistent://public/default/pi-thermal
2022-02-22 19:42:02.092 INFO  [3036730432] ProducerImpl:729 | [persistent://public/default/pi-thermal, standalone-1-2217] Closed producer
2022-02-22 19:42:02.092 INFO  [3036730432] ClientConnection:1548 | [192.168.1.204:34968 -> 192.168.1.230:6650] Connection closed
2022-02-22 19:42:02.092 INFO  [3036730432] ClientConnection:1548 | [192.168.1.204:34966 -> 192.168.1.230:6650] Connection closed
2022-02-22 19:42:02.121 INFO  [3069204864] ProducerImpl:655 | Producer - [persistent://public/default/pi-thermal, standalone-1-2217] , [batching  = off]
2022-02-22 19:42:02.122 INFO  [3069204864] ClientConnection:256 | [192.168.1.204:34966 -> 192.168.1.230:6650] Destroyed connection
2022-02-22 19:42:02.122 INFO  [3069204864] ClientConnection:256 | [192.168.1.204:34968 -> 192.168.1.230:6650] Destroyed connection


We can also use an Avro schema instead of a JSON schema if that is to your liking.

Plain Text
 
root@thermal:/opt/demo# python3 thermalavro.py 
Schema info is: {
 "type": "record",
 "name": "thermal",
 "fields": [
  {
   "name": "uuid",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "ipaddress",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "cputempf",
   "type": [
    "null",
    "int"
   ]
  },
  {
   "name": "runtime",
   "type": [
    "null",
    "int"
   ]
  },
  {
   "name": "host",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "hostname",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "macaddress",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "endtime",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "te",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "cpu",
   "type": [
    "null",
    "float"
   ]
  },
  {
   "name": "diskusage",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "memory",
   "type": [
    "null",
    "float"
   ]
  },
  {
   "name": "rowid",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "systemtime",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "ts",
   "type": [
    "null",
    "int"
   ]
  },
  {
   "name": "starttime",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "datetimestamp",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "temperature",
   "type": [
    "null",
    "float"
   ]
  },
  {
   "name": "humidity",
   "type": [
    "null",
    "float"
   ]
  },
  {
   "name": "co2",
   "type": [
    "null",
    "float"
   ]
  }
 ]
}
2022-02-23 10:11:51.953 INFO  [3069213056] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000
2022-02-23 10:11:51.953 INFO  [3069213056] ConnectionPool:96 | Created connection for pulsar://pulsar1:6650
2022-02-23 10:11:51.960 INFO  [3034862656] ClientConnection:368 | [192.168.1.204:34984 -> 192.168.1.230:6650] Connected to broker
2022-02-23 10:11:51.966 INFO  [3034862656] HandlerBase:64 | [persistent://public/default/pi-thermal-avro, ] Getting connection from pool
2022-02-23 10:11:51.970 INFO  [3034862656] ClientConnection:182 | [<none> -> pulsar://pulsar1:6650] Create ClientConnection, timeout=10000
2022-02-23 10:11:51.970 INFO  [3034862656] ConnectionPool:96 | Created connection for pulsar://127.0.0.1:6650
2022-02-23 10:11:51.974 INFO  [3034862656] ClientConnection:370 | [192.168.1.204:34986 -> 192.168.1.230:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650
2022-02-23 10:11:52.008 INFO  [3034862656] ProducerImpl:189 | [persistent://public/default/pi-thermal-avro, ] Created producer on broker [192.168.1.204:34986 -> 192.168.1.230:6650] 
SCD4X, Serial: d3efd3efd3ef
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_xtq_20220223151152', 'ipaddress': '192.168.1.204', 'cputempf': 110, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629112.5121799', 'te': '0.0006127357482910156', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151152_46867a0c-dcf1-4919-b4e6-47bd8ca87dc3', 'systemtime': '02/23/2022 10:11:58', 'ts': 1645629118, 'starttime': '02/23/2022 10:11:52', 'datetimestamp': '2022-02-23 15:11:57.192564+00:00', 'temperature': 30.0725, 'humidity': 28.01, 'co2': 1082.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_kzt_20220223151158', 'ipaddress': '192.168.1.204', 'cputempf': 109, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629118.2040725', 'te': '0.0007352828979492188', 'cpu': 6.5, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151158_fece78a9-4c51-4ad6-8ffa-87ae6bc970d6', 'systemtime': '02/23/2022 10:12:02', 'ts': 1645629122, 'starttime': '02/23/2022 10:11:58', 'datetimestamp': '2022-02-23 15:12:01.967448+00:00', 'temperature': 29.7414, 'humidity': 28.58, 'co2': 1076.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_udz_20220223151202', 'ipaddress': '192.168.1.204', 'cputempf': 109, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629122.9787736', 'te': '0.0005147457122802734', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151202_8fe911ba-5f02-47e4-a5bc-5535f3c789b9', 'systemtime': '02/23/2022 10:12:07', 'ts': 1645629127, 'starttime': '02/23/2022 10:12:02', 'datetimestamp': '2022-02-23 15:12:06.742775+00:00', 'temperature': 29.4877, 'humidity': 28.99, 'co2': 1071.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_kqr_20220223151207', 'ipaddress': '192.168.1.204', 'cputempf': 112, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629127.7585478', 'te': '0.0007393360137939453', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151207_c244eaa0-8b36-4040-8421-8bdafa0b46c0', 'systemtime': '02/23/2022 10:12:12', 'ts': 1645629132, 'starttime': '02/23/2022 10:12:07', 'datetimestamp': '2022-02-23 15:12:11.523104+00:00', 'temperature': 29.25, 'humidity': 29.35, 'co2': 1071.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_dfn_20220223151212', 'ipaddress': '192.168.1.204', 'cputempf': 110, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629132.5379777', 'te': '0.0008678436279296875', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151212_253d220a-34cd-4f1c-a435-1cb62834e3a2', 'systemtime': '02/23/2022 10:12:17', 'ts': 1645629137, 'starttime': '02/23/2022 10:12:12', 'datetimestamp': '2022-02-23 15:12:16.298915+00:00', 'temperature': 29.0097, 'humidity': 29.78, 'co2': 1064.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_flx_20220223151217', 'ipaddress': '192.168.1.204', 'cputempf': 111, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629137.3143985', 'te': '0.0007383823394775391', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151217_5d1cedb9-5fbb-4abe-bcf0-f836a6bec093', 'systemtime': '02/23/2022 10:12:22', 'ts': 1645629142, 'starttime': '02/23/2022 10:12:17', 'datetimestamp': '2022-02-23 15:12:21.179689+00:00', 'temperature': 28.8068, 'humidity': 30.11, 'co2': 1069.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_fjc_20220223151222', 'ipaddress': '192.168.1.204', 'cputempf': 109, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629142.1899443', 'te': '0.0007307529449462891', 'cpu': 5.1, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151222_0f31c89a-96b5-4d8c-9f69-db3ff721d362', 'systemtime': '02/23/2022 10:12:26', 'ts': 1645629146, 'starttime': '02/23/2022 10:12:22', 'datetimestamp': '2022-02-23 15:12:25.953985+00:00', 'temperature': 28.5931, 'humidity': 30.52, 'co2': 1063.0}
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'thrm_twe_20220223151226', 'ipaddress': '192.168.1.204', 'cputempf': 111, 'runtime': 0, 'host': 'thermal', 'hostname': 'thermal', 'macaddress': 'e4:5f:01:7c:3f:34', 'endtime': '1645629146.967186', 'te': '0.0004837512969970703', 'cpu': 0.0, 'diskusage': '106314.1 MB', 'memory': 9.3, 'rowid': '20220223151226_e85a8384-7f5b-4cc5-9bc7-ca5babd2fb54', 'systemtime': '02/23/2022 10:12:31', 'ts': 1645629151, 'starttime': '02/23/2022 10:12:26', 'datetimestamp': '2022-02-23 15:12:30.729362+00:00', 'temperature': 28.4009, 'humidity': 30.85, 'co2': 1061.0}
^C2022-02-23 10:12:31.743 INFO  [3069213056] ClientImpl:495 | Closing Pulsar client with 1 producers and 0 consumers
2022-02-23 10:12:31.744 INFO  [3069213056] ProducerImpl:686 | [persistent://public/default/pi-thermal-avro, standalone-1-2228] Closing producer for topic persistent://public/default/pi-thermal-avro
2022-02-23 10:12:31.746 INFO  [3034862656] ProducerImpl:729 | [persistent://public/default/pi-thermal-avro, standalone-1-2228] Closed producer
2022-02-23 10:12:31.746 INFO  [3034862656] ClientConnection:1548 | [192.168.1.204:34986 -> 192.168.1.230:6650] Connection closed
2022-02-23 10:12:31.747 INFO  [3034862656] ClientConnection:1548 | [192.168.1.204:34984 -> 192.168.1.230:6650] Connection closed
2022-02-23 10:12:31.782 INFO  [3069213056] ProducerImpl:655 | Producer - [persistent://public/default/pi-thermal-avro, standalone-1-2228] , [batching  = off]
2022-02-23 10:12:31.782 INFO  [3069213056] ClientConnection:256 | [192.168.1.204:34984 -> 192.168.1.230:6650] Destroyed connection
2022-02-23 10:12:31.783 INFO  [3069213056] ClientConnection:256 | [192.168.1.204:34986 -> 192.168.1.230:6650] Destroyed connection

bin/pulsar-client consume "persistent://public/default/pi-thermal-avro" -s "thermalpiavro" -n 0

----- got message -----
key:[thrm_twe_20220223151226], properties:[], content:�.thrm_twe_20220223151226�?192.168.1.204�����thermal�thermal�"e4:5f:01:7c:3f:34�"1645629146.967186�*0.0004837512969970703���106314.1 MB����A�f20220223151226_e85a8384-7f5b-4cc5-9bc7-ca5babd2fb54�&02/23/2022 10:12:31�����
                                                                                                            �&02/23/2022 10:12:26�@2022-02-23 15:12:30.729362+00:00�
              5�A����A���D
              


We can connect Apache Flink to Apache Pulsar and create a catalog of topics (tables) to query.

SQL
 
CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

USE CATALOG pulsar;

SHOW TABLES;

select * from `pi-thermal`;


We can query the table (topic) from Trino/PrestoSQL as well.

Schema object in Python:

Python
 
### Schema Object
# https://pulsar.apache.org/docs/en/client-libraries-python/

class thermal(Record):
    uuid = String()
    ipaddress = String()
    cputempf = Integer()
    runtime = Integer()
    host = String()
    hostname = String()
    macaddress = String()
    endtime = String()
    te = String()
    cpu = Float()
    diskusage = String()
    memory = Float()
    rowid = String()
    systemtime = String()
    ts = Integer()
    starttime = String()
    datetimestamp = String()
    temperature = Float()
    humidity = Float()
    co2 =  Float()


Avro schema code:

Python
 
client = pulsar.Client('pulsar://pulsar1:6650')

thermalschema = AvroSchema(thermal)
print("Schema info is: " + thermalschema.schema_info().schema())

producer = client.create_producer(topic='persistent://public/default/pi-thermal-avro' ,schema=thermalschema,properties={"producer-name": "thermal-pyavro-sensor","producer-id": "thermal-avro-sensor" })


JSON Schema code difference:

schema=JsonSchema(thermal)


Source:

https://github.com/tspannhw/FLiP-Pi-Thermal

References:

  • https://github.com/tspannhw/minifi-gasthermal.git
  • https://github.com/tspannhw/PulsarOnRaspberryPi
  • https://shop.pimoroni.com/products/mlx90640-thermal-camera-breakout?variant=12536948654163
Python (language)

Opinions expressed by DZone contributors are their own.

Related

  • Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud
  • How to Detect VPN Proxies With Python and IP2Location.io API
  • Building a Dynamic Chat Application: Setting up ChatGPT in FastAPI and Displaying Conversations in ReactJS
  • Optimizing Coroutine Execution

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: