Fast Data Processing Pipeline for Predicting Flight Delays Using Apache APIs (Part 1)
This is the first in a series of blogs that discuss the architecture of an end-to-end application that combines streaming data with machine learning and fast storage.
Join the DZone community and get the full member experience.
Join For FreeAccording to Thomas Davenport in the HBR, analytical technology has changed dramatically over the last decade, with more powerful and less expensive distributed computing across commodity servers, streaming analytics, and improved machine learning technologies, enabling companies to store and analyze both far more data and many different types of it. Werner Vogel stated in his recent keynote at AWS re:invent that key technology drivers of today are data, the Internet of Things (IoT), and machine learning.
Leveraging the huge amounts of data coming from the Internet of Things requires processing events in real-time, applying machine learning to add value, and having scalable fast storage. This is the first in a series of blogs that discuss the architecture of an end-to-end application that combines streaming data with machine learning and fast storage. In this first post, I’ll help you get started using Apache Spark’s ML pipelines with a decision tree classifier to predict flight delays. This first post is also an update to a previous Apache Spark machine learning tutorial that used the Spark RDD-based API.
Architectures for these types of applications are discussed in more detail in the e-books Machine Learning Logistics and Streaming Architecture.
What Is Machine Learning?
Machine learning uses algorithms to find patterns in data and then uses a model that recognizes those patterns to make predictions on new data.
There are typically two phases of machine learning with real-time data:
- Data discovery: The first phase involves analysis of historical data to build the machine learning model.
- Analytics using the model: The second phase uses the model in production on live events. (Note that Spark does provide some streaming machine learning algorithms, but you still often need to do an analysis of historical data.)
In general, machine learning may be broken down into three types: supervised, unsupervised, and in between those two.
Supervised algorithms use labeled data in which both the input and target outcome, or label, are provided to the algorithm.
Supervised learning is also called predictive modeling or predictive analytics because you build a model that is capable of making predictions.
Unsupervised learning algorithms find patterns in unlabeled data. Semi-supervised learning uses a mixture of labeled and unlabeled data. Reinforcement learning trains algorithms to maximize rewards based on feedback.
Three Categories of Techniques for Machine Learning
Three common categories of machine learning techniques are classification, clustering, and collaborative filtering.
- Classification: Gmail uses a machine learning technique called classification to designate if an email is spam based on the data of an email: the sender, recipients, subject, and message body. Classification takes a set of data with known labels and learns how to label new records based on that information.
- Clustering: Google News uses a technique called clustering to group news articles into different categories, based on title and content. Clustering algorithms discover groupings that occur in collections of data.
- Collaborative filtering: Amazon uses a machine learning technique called collaborative filtering (commonly referred to as recommendation) to determine which products users will like based on their history and similarity to other users.
In this example, we will be using a supervised machine learning algorithm for classification of flight delays.
Classification
Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (i.e. whether a transaction is fraud or not) based on labeled examples of known items (i.e. transactions known to be fraud or not). Classification takes a set of data with known labels and pre-determined features and learns how to label new records based on that information. Features are the “if questions” that you ask. The label is the answer to those questions. In the example below, if it walks, swims, and quacks like a duck, then the label is "duck."
Let’s go through an example for flight delays:
- What are we trying to predict?
- Whether a flight will be delayed.
- Delayed is the label: True or false.
- What are the “if questions” or properties that you can use to make predictions?
- What is the originating airport?
- What is the destination airport?
- What is the scheduled time of departure?
- What is the scheduled time of arrival?
- What is the day of the week?
- What is the airline carrier?
Decision Trees
Decision trees create a model that predicts the label (or class) by evaluating a set of rules that follow an if-then-else pattern. The if-then-else feature questions are the nodes, and the answers true or false are the branches in the tree to the child nodes. A decision tree model estimates the minimum number of true/false questions needed to assess the probability of making a correct decision. Below is an example of a simplified decision tree for flight delays:
- Q1: If the scheduled departure time < 10:15 AM
- T:Q2: If the originating airport is in the set {ORD, ATL, SFO}
- T:Q3: If the day of the week is in {Monday, Sunday}
- T: Delayed=1
- F: Delayed=0
- F: Q3: If the destination airport is in the set {SFO, ORD, EWR}
- T: Delayed=1
- F: Delayed=0
- T:Q3: If the day of the week is in {Monday, Sunday}
- F:Q2: If the originating airport is not in the set {BOS, MIA}
- T:Q3: If the day of the week is in the set {Monday, Sunday}
- T: Delayed=1
- F: Delayed=0
- F: Q3: If the destination airport is not in the set {BOS, MIA}
- T: Delayed=1
- F: Delayed=0
- T:Q3: If the day of the week is in the set {Monday, Sunday}
- T:Q2: If the originating airport is in the set {ORD, ATL, SFO}
Example Use Case Dataset
Our data is from here. We are using flight information for January, February, March, April, and May 2017. For each flight, we have the following information:
Field | Description | Example Value |
---|---|---|
id | Unique identifier: composed of carrier code, date, origin, destination, flight number | AA_2017-02-22_SFO_ORD_150 |
dofW (Integer) | Day of week (1=Monday 7=Sunday) | 1 |
carrier (String) | Carrier code | AA |
origin(String) | Origin Airport Code | JFK |
dest(String | Destination airport code | LAX |
crsdephour(Integer) | Scheduled departure hour | 9 |
crsdeptime(Double) | Scheduled departure time | 900.0 |
depdelay (Double) | Departure delay in minutes | 40.0 |
crsarrtime (Double) | Scheduled arrival time | 1230.0 |
arrdelay (Double) | Arrival delay minutes | 40.0 |
crselapsedtime (Double) | Elapsed time | 390.0 |
dist (Double) | Distance | 2475.0 |
I have already cleaned up, limited the number of airports and carriers, and transformed the data into two JSON files: one for training and one for testing. (You can see the code for the cleanup in the GitHub repository.) The JSON file has the following format:
{
"_id": "AA_2017-01-01_ATL_LGA_1678",
"dofW": 7,
"carrier": "AA",
"origin": "ATL",
"dest": "LGA",
"crsdephour": 17,
"crsdeptime": 1700,
"depdelay": 0.0,
"crsarrtime": 1912,
"arrdelay": 0.0,
"crselapsedtime": 132.0,
"dist": 762.0
}
You can run the code for this example with MapR 5.2.1 or MapR 6.0 (which includes Spark 2.1), and you can view the notebook code with the Zeppelin viewer.
Load the Data From a File Into a Dataframe
We use a Scala case class and Structype to define the schema, corresponding to a line in the JSON data file.
Below, we specify the data source, schema, and class to load into a dataset. We load the data from January and February, which we will use for training the model. (Note that specifying the schema when loading data into a DataFrame will give better performance than schema inference.)
The DataFrame show method displays the first 20 rows:
Here, we load data from March and April, which we will use for testing the model:
Summary Statistics
Spark DataFrames include some built-in functions for statistical processing. The describe()
function performs summary statistics calculations on all numeric columns and returns them as a DataFrame.
Data Exploration
We can use Spark SQL to explore the dataset. Here are some example queries using the Spark SQL:
Below, we display information for the longest departure delays:
Below, we display the average departure delay by Carrier:
We want to predict flight delays where depdelay > 40 minutes, so let’s explore this data. Below, we see that United Airlines and Delta have the highest count of flight delays for January and February 2017 (the training set).
In the query below, we see that Monday and Sunday have the highest count of flight delays.
In the query below, we see that the hours between 13:00-19:00 have the highest count of flight delays.
In the query below, we see that the origin airports Chicago and Atlanta have the highest count of flight delays.
In the query below, we see that the destination airports San Francisco and Newark have the highest count of flight delays.
In the query below, we see the count of departure delays by Origin and destination. The routes ORD -> SFO and DEN -> SFO have the highest delays, maybe because of weather in January and February. Adding weather to this dataset would give better results, but that is left as an exercise for the reader.
In the code below, a Spark Bucketizer is used to split the dataset into delayed and not delayed flights with a delayed 0/1 column. Then, the resulting total counts are displayed. Grouping the data by the delayed field and counting the number of instances in each group shows that there are roughly eight times as many not delayed samples as delayed samples.
In the query below, we see the count of not delayed (0=dark blue) and delayed (1=light blue) flights by origin.
Stratified Sampling
In order to ensure that our model is sensitive to the delayed samples, we can put the two sample types on the same footing using stratified sampling. The DataFrames sampleBy()
function does this when provided with fractions of each sample type to be returned. Here, we're keeping all instances of delayed, but downsampling the not delayed instances to 29%, then displaying the results.
Features Array
To build a classifier model, you extract the features that most contribute to the classification. In this scenario, we will build a tree to predict the label of delayed or not based on the following features:
- Label -> delayed = 0
- Delayed = 1 if delay > 40 minutes
- Features -> {day of the week, scheduled departure time, scheduled arrival time, carrier, scheduled elapsed time, origin, destination, distance}
delayed | dofW | crsdepTime | crsArrTime | carrier | elapTime | origin | dest | dist |
---|---|---|---|---|---|---|---|---|
1.0/0.0 | 1 | 1015 | 1230 | AA | 385.0 | JKF | LAX | 2475.0 |
In order for the features to be used by a machine learning algorithm, they must be transformed and put into feature vectors, which are vectors of numbers representing the value for each feature.
Using the Spark ML Package
The ML package is the newer library of machine learning routines. Spark ML provides a uniform set of high-level APIs built on top of DataFrames.
We will use an ML Pipeline to pass the data through transformers in order to extract the features and an estimator to produce the model.
- Transformer: A Transformer is an algorithm that transforms one DataFrame into another DataFrame. We will use a transformer to get a DataFrame with a features vector column.
- Estimator: An Estimator is an algorithm that can be fit on a DataFrame to produce a Transformer. We will use an estimator to train a model that can transform data to get predictions.
- Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
Feature Extraction and Pipelining
The ML package needs the label and feature vector to be added as columns to the input DataFrame. We set up a pipeline to pass the data through transformers in order to extract the features and label. We use a StringIndexer to encode a string columns to a column of number indices. We use a OneHotEncoder to map a number indices column to a column of binary vectors, with at most a single one-value. Encoding categorical features allows decision trees to treat categorical features appropriately, improving performance. An example of StringIndexing and OneHotEncoding for the carrier is shown below:
Below, a Bucketizer is used to add a label of delayed 0/1. The VectorAssembler combines a given list of columns into a single feature vector column.
The result of running these transformers in a pipeline will be to add a label and features column to the dataset as shown below.
The final element in our pipeline is an estimator (a decision tree classifier), training on the vector of labels and features.
Below, we chain the indexers and tree in a Pipeline.
Train the Model
We would like to determine which parameter values of the decision tree produce the best model. A common technique for model selection is k-fold cross-validation, where the data is randomly split into k partitions. Each partition is used once as the testing dataset, while the rest are used for training. Models are then generated using the training sets and evaluated with the testing sets, resulting in k model performance measurements. The model parameters leading to the highest performance metric produce the best model.
Spark ML supports k-fold cross-validation with a transformation/estimation pipeline to try out different combinations of parameters, using a process called grid search, where you set up the parameters to test, and a cross-validation evaluator to construct a model selection workflow.
Below, we use a ParamGridBuilder to construct the parameter grid. We define an Evaluator, which will evaluate the model by comparing the test label column with the test prediction column. We use a CrossValidator for model selection.
The CrossValidator uses the Estimator Pipeline, the Parameter Grid, and the Classification Evaluator to fit the training data set and returns a model.
The CrossValidator uses the ParamGridBuilder to iterate through the maxDepth parameter of the decision tree and evaluate the models, repeating three times per parameter value for reliable results.
Next, we can get the best decision tree model in order to print out the decision tree and feature importance. (Note that the OneHotEncoders increases the number of features. In order to understand this printout better, I built a tree without the encoders, which gave a slightly lower accuracy.)
We find that the best tree model produced using the cross-validation process is one with a depth of 6. The toDebugString()
function provides a print of the tree's decision nodes and final prediction outcomes at the end leaves. Below is a partial printout of the decision tree:
The features numbers correspond to the following:
( 0=carrierIndexed, 1=destIndexed, 2=originIndexed, 3=dofW, 4=crsdephour, 5=crselapsedtime, 6=crsarrtime, 7=crsdeptime, 8=dist)
Below, we can see that the feature importance in order is:
- Scheduled departure time (feature 7)
- Destination (feature 1)
- Origin (feature 2)
- Scheduled arrival time (feature 6)
- Day of the week (feature 3)
Decision trees are often used for feature selection because they provide an automated mechanism for determining the most important features (those closest to the tree root).
Predictions and Model Evaluation
The actual performance of the model can be determined using the test dataset that has not been used for any training or cross-validation activities.
We transform the test DataFrame with the model pipeline, which will transform the features according to the pipeline, estimate, and then return the label predictions in a column of a new DataFrame.
The evaluator will provide us with the score of the predictions. Accuracy is measured by the area under the ROC curve. The area measures the ability of the test to correctly classify true positives from false positives. A random predictor would have .5 accuracy. The closer the value is to 1, the better its predictions are. In this case, the evaluation returns 81% precision.
Below, we calculate some more metrics. The number of false/true positive and negative predictions is also useful:
- True positives are how often the model correctly predicted delayed flights.
- False positives are how often the model incorrectly predicted delayed flights.
- True negatives indicate how often the model correctly predicted not delayed flights.
- False negatives indicate how often the model incorrectly predicted not delayed flights.
Save the Model
We can now save our fitted Pipeline for later use with streaming events. This saves both the feature extraction stage and the decision tree model chosen by model tuning.
The result of saving the pipeline model is a JSON file for metadata and a Parquet for model data. We can re-load the model with the load command; the original and re-loaded models are the same:
val sameCVModel = CrossValidatorModel.load(“../cfModel")
Code
- You can download the code and data to run these examples from here
- Zeppelin Notebook viewer of the code
- Zeppelin Notebook for the code
Running the Code
All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Converged Data Platform.
Published at DZone with permission of Carol McDonald, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments