DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Adding Two Hours in DataWeave: Mule 4
  • Pydantic: Simplifying Data Validation in Python
  • Hybrid Search Using Postgres DB
  • Data Processing With Python: Choosing Between MPI and Spark

Trending

  • Implementing Explainable AI in CRM Using Stream Processing
  • Designing a Java Connector for Software Integrations
  • AI Agents: A New Era for Integration Professionals
  • Securing the Future: Best Practices for Privacy and Data Governance in LLMOps
  1. DZone
  2. Data Engineering
  3. Data
  4. Tracking Aircraft in Real-Time With Apache Pulsar and Apache Flink

Tracking Aircraft in Real-Time With Apache Pulsar and Apache Flink

Ingest, enrich, parse, transform, route and query live plane data with open source.

By 
Timothy Spann user avatar
Timothy Spann
·
Sep. 19, 22 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
4.0K Views

Join the DZone community and get the full member experience.

Join For Free

Use Case: Automatic Dependent Surveillance-Broadcast (ADS-B Analytics)

Software Stack:

  • Python 3.10
  • Apache Pulsar 2.10.1
  • Apache Pulsar Python Library
  • Apache Spark
  • Apache Flink
  • Java JDK 17
  • Apache Maven
  • SDKMan
  • Raspian Linux

Hardware Stack:

  • Flightaware Pro Plus Stick (Blue)
  • 1090 MHz Antenna and Cable
  • Raspberry Pi 4 with 2GB RAM
  • USB-C Power Supply

References:

  • What is ADS-B?
  • Function Source
  • Analytics Source

Main Source Code

Utilizing the Open Source FLiP Stack we can track aircraft overhead with ease! It does require a little bit of hardware and some Python magic. You can build your own at home with a few bits of hardware and some easy open-source software.

Not only are we capturing, transmitting, enriching, and storing this data but we are also contributing back to the world. You can check out my feed at the FlightAware website.

 If you think that this looks like some data that you want for your Lakehouse, let’s start ingesting it now. Once you have your hardware configured, see the directions from Piaware. Once you’ve rebooted and everything is running you should be able to see the status of everything running.
Raspberry Pi4 Console
Raspberry Pi 4 Console
Online Dashboards Of Data Published
Local Flight Feed Running on Raspberry Pi 4

If you want to skip to a video demoing what I did, you can do that.

Step 1

Let’s build out the topics we are going to need for all this raw and processed flight data.bin/pulsar-admin topics create persistent://public/default/adsbraw

Shell
 
bin/pulsar-admin topics create persistent://public/default/aircraftbin/pulsar-admin topics create persistent://public/default/adsblogbin/pulsar-admin topics create persistent://public/default/adsbdead


The first topic is our raw JSON ADS-B data. We may want to use that later so we can let this topic store the data forever, perhaps at some point we will turn on tiered storage and have it automagically stored to object storage on AWS, Google Cloud, or Azure. We also have a topic for clean data, aircraft. Finally, we have a topic for logs output from our processing and one for messages that can be processed. They aren’t dead, since we can raise them like zombies and process them again. Never give up on your data!

The first thing I did was examine the data that was displayed on that pretty local map. By running Chrome in Developer Mode, I saw all the REST Calls made. I see JSON data from simple REST calls. This is where I grabbed what I needed and put it into that handy Python script.

Since we are going to be transmitting, storing, analyzing, querying, and generally sharing this data with different developers, analysts, data scientists, and other people we should make sure we know what this data is.

Plain Text
 
 hex (String optional)
  flight (String optional - name of plane)
  alt_baro (int optional - altitude)
  alt_geom (int optional) 
  track (int optional)
  baro_rate (int optional)
  category (string optional)
  nav_qnh (float optional)
  nav_altitude_mcp (int optional)
  nav_heading (float optional)
  nic (int optional)
  rc (int optional)
  seen_pos (float optional)
  version (int optional)
  nic_baro (int optional)
  nac_p (int optional)
  nac_v (int optional)
  sil (int optional) 
  sil_type (string optional)
  mlat (array optional)
  tisb (array optional)
  messages (int optional)
  seen (float optional)
  rssi (float optional)  
  squawk (optional) - look at # conversion 7600, 7700, 4000, 5000, 7777, 6100, 5400, 4399, 4478, ...)
  speed (optional)
  mach (optional speed, mac to mph *767)
  emergency (optional string)
  lat (long optional)
  lon (long optional)


We need to define a schema with names, types, and optionality. Once we have done this we can build a JSON or Python schema for it and utilize that in Apache Pulsar, Pulsar SQL (Presto/Trino), Apache Spark SQL, Apache Flink SQL, and any consumer that can read a schema from the Pulsar Schema Registry. Data without a contract is just bytes.

  • field: hex is the ICAO identifier
  • field: flight is the IDENT identifier
  • field: altBaro is the altitude in feet (barometric)
  • field: lat, lon is latitude and longitude
  • field: gs is the ground speed in knots
  • field: altGeom is altitude (geometric)

I looked up what the fields were and made some notes. Squawk values are interesting and that may be of interest to people running SQL later.

Once you get data into a Java class and start sending messages to your aircraft topic, you can pull out the autogenerated schema.

Shell
 
bin/pulsar-admin schemas get persistent://public/default/aircraft


Step 2

Let’s build our Python application to acquire data and publish it to Pulsar! We could choose to use many different libraries as Pulsar supports lots of protocols like Websockets, Kafka, MQTT, AMQP, and RocketMQ. To keep things simple and vanilla I am going to use the tried and true Pulsar protocol and the standard Python Pulsar library. I installed the latest version with pip3.10 install pulsar-client[all]. I did it all since I wanted the FastAvr library, GRPC, Schemas, and other fancy stuff. You can install what you need.

The full Python code is here. I will show you the important bits.

Python
 
import pulsar
from pulsar import Client, AuthenticationOauth2client = pulsar.Client(service_url, authentication =          AuthenticationOauth2(auth_params))producer = client.create_producer(topic=topic ,properties={"producer-name": "adsb-rest","producer-id": "adsb-py" })uniqueid = 'thrml_{0}_{1}'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())url_data = "http://localhost:8080/data/aircraft.json?_=" + str(uuid.uuid4())response = json.dumps(requests.get(url_data).json())producer.send(response.encode('utf-8'),partition_key=uniqueid)


Then we can run it and start producing raw ADS-B JSON data to our Pulsar topic.

Step 3

Let’s do a simple check to see if data is coming in.

Shell
 
bin/pulsar-client consume "persistent://public/default/adsbraw" -s adsbrawreader -n 0


What does this raw data look like?

JSON
 
{'now': 1659471117.0, 'messages': 7381380, 'aircraft': [{'hex': 'ae6d7a', 'alt_baro': 25000, 'mlat': [], 'tisb': [], 'messages': 177, 'seen': 0.1, 'rssi': -22.7}, {'hex': 'a66174', 'alt_baro': 23000, 'mlat': [], 'tisb': [], 'messages': 5, 'seen': 23.6, 'rssi': -27.8}, .. }


Well, that is a lot of that. I abbreviated it for the sake of scrolling.

Thanks for staying tuned so far, here’s a cat.

This cat is mine, this cat is fat. She is okay with that.

Raw data is nice and all, I can surely have Apache Spark, Flink, Python or others clean it up. I recommend you set up a Delta Lake, Apache Hudi, or Apache Iceberg sink to store this raw data in your lakehouse if you wish.

Architect Note: You could just keep it in Apache Pulsar forever or in Apache Pulsar-controlled tiered storage.

Step 4

I wanted to do this quickly and automagically inside the Pulsar environment so I wrote a quick Java Pulsar Function to split, parse, enrich, cleanup, and route data to a new topic. This will be the cleaned data topic. We could have built this function in Python or Golang as well. I chose Java this time.

Yes, we had to build our own function before we could deploy it in Step 4. Let’s take a quick look at our Java function:

Java
 
public class ADSBFunction implements Function<byte[], Void> {


This means our function takes raw bytes but does not return anything. We don’t have an output specified here since I will dynamically decide where to send the output. We could send to any number of topics on the fly.

Java
 
context.newOutputMessage(PERSISTENT_PUBLIC_DEFAULT, JSONSchema.of(Aircraft.class))                                    .key(UUID.randomUUID().toString())                                    .property(LANGUAGE, JAVA)                                    .value(aircraft).send();


At the end of my function, I am going to send the data to a topic (could create these on the fly if we want to send them to different topics). We might want to send all of Elon Musk’s flights to a special topic. We could do those lookups with something like Scylla, I did that for my Air Quality application. I add a key, add a property, set the schema, and send the data. What is nice here is that I don’t have to use formal language to define the schema. I can just build a plain old Java Bean. Keeping it simple and old school, that works for me.

In between that code, I have a helper service that parses that jumble of JSON and pulls out the good bits one aircraft event at a time. It’s pretty simple but nice to keep that code inside a simple function that runs on every event or message that enters the ADSBRAW topic. You will need the Java JDK (11 or 17) and Maven. I recommend you utilize SDKMan so you can run multiple JVMS and tools.

To build the function we just need to type:

Shell
 
mvn package


Step 5

Pulsar makes it easy to add event processing on each topic, so I created a simple one in Java. It’s very easy to deploy, monitor, start, stop, and delete these.

Let’s deploy our function:

Shell
 
bin/pulsar-admin functions create --auto-ack true --jar /opt/demo/java/pulsar-adsb-function/target/adsb-1.0.jar --classname "dev.pulsarfunction.adsb.ADSBFunction" --dead-letter-topic "persistent://public/default/adsbdead" --inputs "persistent://public/default/adsbraw" --log-topic "persistent://public/default/adsblog" --name ADSB --namespace default --tenant public --max-message-retries 5


For people new to Apache Pulsar, please note that we have to specify a namespace and tenant for where this will live. This is for discoverability, multitenancy, and just plain cleanliness. We can have as many input topics as we want. Log and Dead letter topics are for special outputs. In this instance, for Java, we have our application stored in a JAR file.

Once deployed, let’s check the status:

Shell
 
bin/pulsar-admin functions status --name ADSB


The results are JSON, which lends itself to DevOps automation. If we wanted we could administrate this with REST or a DevOps tool.

JSON
 
{
  "numInstances" : 1,
  "numRunning" : 1,
  "instances" : [ {
    "instanceId" : 0,
    "status" : {
      "running" : true,
      "error" : "",
      "numRestarts" : 0,
      "numReceived" : 28,
      "numSuccessfullyProcessed" : 28,
      "numUserExceptions" : 0,
      "latestUserExceptions" : [ ],
      "numSystemExceptions" : 0,
      "latestSystemExceptions" : [ ],
      "averageLatency" : 144.23374035714286,
      "lastInvocationTime" : 1659725881406,
      "workerId" : "c-standalone-fw-127.0.0.1-8080"
    }
  } ]
}


If we wanted to stop it:

Shell
 
bin/pulsar-admin functions stop --name ADSB --namespace default --tenant public


If we needed to delete it:

Shell
 
bin/pulsar-admin functions delete --name ADSB --namespace default --tenant public


Step 6

Now that the function has processed the data, let’s do a quick check of that clean data.

Shell
 
bin/pulsar-client consume "persistent://public/default/aircraft" -s "aircraftconsumer" -n 0


An example of a JSON row returned:

JSON
 
----- got message -----
key:[c480cd8e-a803-47fe-81b4-aafdec0f6b68], properties:[language=Java], content:{"flight":"N86HZ","category":"A7","emergency":"none","squawk":1200,"hex":"abcd45","gs":52.2,"track":106.7,"lat":40.219757,"lon":-74.580566,"nic":9,"rc":75,"version":2,"sil":3,"gva":2,"sda":2,"mlat":[],"tisb":[],"messages":2259,"seen":1.1,"rssi":-19.9}


Architect Note: Always set a key when you produce messages.

Step 7

We now have clean data!

Let’s check that data stream with Apache Spark Structured Streaming!

SPARQL
 
val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/aircraft").load()

dfPulsar.printSchema()
root
 |-- altBaro: integer (nullable = true)
 |-- altGeom: integer (nullable = true)
 |-- baroRate: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- emergency: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- gs: double (nullable = true)
 |-- gva: integer (nullable = true)
 |-- hex: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- mach: double (nullable = true)
 |-- messages: integer (nullable = true)
 |-- mlat: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |-- nacP: integer (nullable = true)
 |-- nacV: integer (nullable = true)
 |-- navAltitudeMcp: integer (nullable = true)
 |-- navHeading: double (nullable = true)
 |-- navQnh: double (nullable = true)
 |-- nic: integer (nullable = true)
 |-- nicBaro: integer (nullable = true)
 |-- rc: integer (nullable = true)
 |-- rssi: double (nullable = true)
 |-- sda: integer (nullable = true)
 |-- seen: double (nullable = true)
 |-- seenPos: double (nullable = true)
 |-- sil: integer (nullable = true)
 |-- silType: string (nullable = true)
 |-- speed: double (nullable = true)
 |-- squawk: integer (nullable = true)
 |-- tisb: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |-- track: double (nullable = true)
 |-- version: integer (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)


val pQuery = dfPulsar.selectExpr("*").writeStream.format("console")
.option("truncate", false).start()


The above Spark code connected to the Pulsar cluster grabbed data from our Pulsar topic and had a table built. As you see setting that schema is a great idea. We can then easily query it as a micro-batch and in this case output it to the console for debugging. We could have also sent that stream somewhere else like S3.

Step 8

Let’s run a continuous SQL query with Apache Flink.

SQL
 
CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

USE CATALOG pulsar;


We create a catalog to connect from Flink to Pulsar.

Let’s take a look at our table.

Plain Text
 
describe aircraft;
+------------------+-----------------------+------+-----+--------+-----------+
|             name |                  type | null | key | extras | watermark |
+------------------+-----------------------+------+-----+--------+-----------+
|         alt_baro |                   INT | true |     |        |           |
|         alt_geom |                   INT | true |     |        |           |
|        baro_rate |                   INT | true |     |        |           |
|         category |                STRING | true |     |        |           |
|        emergency |                STRING | true |     |        |           |
|           flight |                STRING | true |     |        |           |
|               gs |                DOUBLE | true |     |        |           |
|              gva |                   INT | true |     |        |           |
|              hex |                STRING | true |     |        |           |
|              lat |                DOUBLE | true |     |        |           |
|              lon |                DOUBLE | true |     |        |           |
|             mach |                DOUBLE | true |     |        |           |
|         messages |                   INT | true |     |        |           |
|             mlat | ARRAY<ROW<> NOT NULL> | true |     |        |           |
|            nac_p |                   INT | true |     |        |           |
|            nac_v |                   INT | true |     |        |           |
| nav_altitude_mcp |                   INT | true |     |        |           |
|      nav_heading |                DOUBLE | true |     |        |           |
|          nav_qnh |                DOUBLE | true |     |        |           |
|              nic |                   INT | true |     |        |           |
|         nic_baro |                   INT | true |     |        |           |
|               rc |                   INT | true |     |        |           |
|             rssi |                DOUBLE | true |     |        |           |
|              sda |                   INT | true |     |        |           |
|             seen |                DOUBLE | true |     |        |           |
|        seen_post |                DOUBLE | true |     |        |           |
|              sil |                   INT | true |     |        |           |
|         sil_type |                STRING | true |     |        |           |
|            speed |                DOUBLE | true |     |        |           |
|           squawk |                   INT | true |     |        |           |
|             tisb | ARRAY<ROW<> NOT NULL> | true |     |        |           |
|            track |                DOUBLE | true |     |        |           |
|          version |                   INT | true |     |        |           |
+------------------+-----------------------+------+-----+--------+-----------+
33 rows in set


Let’s run some simple queries.

SQL
 
select alt_baro,
       gs,
       alt_geom,
       baro_rate,
       mach, 
       hex, flight, lat, lon
from aircraft;

select max(alt_baro) as MaxAltitudeFeet, min(alt_baro) as MinAltitudeFeet, avg(alt_baro) as AvgAltitudeFeet,
       max(alt_geom) as MaxGAltitudeFeet, min(alt_geom) as MinGAltitudeFeet, avg(alt_geom) as AvgGAltitudeFeet,
       max(gs) as MaxGroundSpeed, min(gs) as MinGroundSpeed, avg(gs) as AvgGroundSpeed, 
       count(alt_baro) as RowCount, 
       hex as ICAO, flight as IDENT
from aircraft 
group by flight, hex;


output

We can do basic queries that will return every row when it arrives or aggregate them.

Step 9

We did it. Let’s start streaming our little dream stream.

Thanks for staying for the entire app building. I hope to see you soon at a meetup or event. Contact me if you have any questions or are looking for other apps built powered by FLiPN+.

Aircraft Apache Flink Apache Spark Data science Data structure HTTPS JSON Data Types

Published at DZone with permission of Timothy Spann, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Adding Two Hours in DataWeave: Mule 4
  • Pydantic: Simplifying Data Validation in Python
  • Hybrid Search Using Postgres DB
  • Data Processing With Python: Choosing Between MPI and Spark

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!