Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Fast Data Processing Pipeline for Predicting Flight Delays Using Apache APIs (Part 1)

DZone's Guide to

Fast Data Processing Pipeline for Predicting Flight Delays Using Apache APIs (Part 1)

This is the first in a series of blogs that discuss the architecture of an end-to-end application that combines streaming data with machine learning and fast storage.

· AI Zone ·
Free Resource

Insight for I&O leaders on deploying AIOps platforms to enhance performance monitoring today. Read the Guide.

According to Thomas Davenport in the HBR, analytical technology has changed dramatically over the last decade, with more powerful and less expensive distributed computing across commodity servers, streaming analytics, and improved machine learning technologies, enabling companies to store and analyze both far more data and many different types of it. Werner Vogel stated in his recent keynote at AWS re:invent that key technology drivers of today are data, the Internet of Things (IoT), and machine learning.

Leveraging the huge amounts of data coming from the Internet of Things requires processing events in real-time, applying machine learning to add value, and having scalable fast storage. This is the first in a series of blogs that discuss the architecture of an end-to-end application that combines streaming data with machine learning and fast storage. In this first post, I’ll help you get started using Apache Spark’s ML pipelines with a decision tree classifier to predict flight delays. This first post is also an update to a previous Apache Spark machine learning tutorial that used the Spark RDD-based API.

Spark RDD-based API

Architectures for these types of applications are discussed in more detail in the e-books Machine Learning Logistics and Streaming Architecture.

Machine Learning Logistics and Streaming Architecture

What Is Machine Learning?

Machine learning uses algorithms to find patterns in data and then uses a model that recognizes those patterns to make predictions on new data.

Presdictions

There are typically two phases of machine learning with real-time data:

  • Data discovery: The first phase involves analysis of historical data to build the machine learning model.
  • Analytics using the model: The second phase uses the model in production on live events. (Note that Spark does provide some streaming machine learning algorithms, but you still often need to do an analysis of historical data.)

Historical Data

In general, machine learning may be broken down into three types: supervised, unsupervised, and in between those two.

Machine Learning

Supervised algorithms use labeled data in which both the input and target outcome, or label, are provided to the algorithm.

Labeled Data

Supervised learning is also called predictive modeling or predictive analytics because you build a model that is capable of making predictions.

Unsupervised learning algorithms find patterns in unlabeled data. Semi-supervised learning uses a mixture of labeled and unlabeled data. Reinforcement learning trains algorithms to maximize rewards based on feedback.

Three Categories of Techniques for Machine Learning

Three common categories of machine learning techniques are classification, clustering, and collaborative filtering.

Three Common Categories

  • Classification: Gmail uses a machine learning technique called classification to designate if an email is spam based on the data of an email: the sender, recipients, subject, and message body. Classification takes a set of data with known labels and learns how to label new records based on that information.
  • Clustering: Google News uses a technique called clustering to group news articles into different categories, based on title and content. Clustering algorithms discover groupings that occur in collections of data.
  • Collaborative filtering: Amazon uses a machine learning technique called collaborative filtering (commonly referred to as recommendation) to determine which products users will like based on their history and similarity to other users.

In this example, we will be using a supervised machine learning algorithm for classification of flight delays.

Classification

Classification is a family of supervised machine learning algorithms that identify which category an item belongs to (i.e. whether a transaction is fraud or not) based on labeled examples of known items (i.e. transactions known to be fraud or not). Classification takes a set of data with known labels and pre-determined features and learns how to label new records based on that information. Features are the “if questions” that you ask. The label is the answer to those questions. In the example below, if it walks, swims, and quacks like a duck, then the label is "duck."

Duck

Let’s go through an example for flight delays:

  • What are we trying to predict?
    • Whether a flight will be delayed.
    • Delayed is the label: True or false.
  • What are the “if questions” or properties that you can use to make predictions?
    • What is the originating airport?
    • What is the destination airport?
    • What is the scheduled time of departure?
    • What is the scheduled time of arrival?
    • What is the day of the week?
    • What is the airline carrier?

Decision Trees

Decision trees create a model that predicts the label (or class) by evaluating a set of rules that follow an if-then-else pattern. The if-then-else feature questions are the nodes, and the answers true or false are the branches in the tree to the child nodes. A decision tree model estimates the minimum number of true/false questions needed to assess the probability of making a correct decision. Below is an example of a simplified decision tree for flight delays:

  • Q1: If the scheduled departure time < 10:15 AM
    • T:Q2: If the originating airport is in the set {ORD, ATL, SFO}
      • T:Q3: If the day of the week is in {Monday, Sunday}
        • T: Delayed=1
        • F: Delayed=0
      • F: Q3: If the destination airport is in the set {SFO, ORD, EWR}
        • T: Delayed=1
        • F: Delayed=0
    • F:Q2: If the originating airport is not in the set {BOS, MIA}
      • T:Q3: If the day of the week is in the set {Monday, Sunday}
        • T: Delayed=1
        • F: Delayed=0
      • F: Q3: If the destination airport is not in the set {BOS, MIA}
        • T: Delayed=1
        • F: Delayed=0

Decision Tree

Example Use Case Dataset

Our data is from here. We are using flight information for January, February, March, April, and May 2017. For each flight, we have the following information:

Field Description Example Value
id Unique identifier: composed of carrier code, date, origin, destination, flight number AA_2017-02-22_SFO_ORD_150
dofW (Integer) Day of week (1=Monday 7=Sunday) 1
carrier (String) Carrier code AA
origin(String) Origin Airport Code JFK
dest(String Destination airport code LAX
crsdephour(Integer) Scheduled departure hour 9
crsdeptime(Double) Scheduled departure time 900.0
depdelay (Double) Departure delay in minutes 40.0
crsarrtime (Double) Scheduled arrival time 1230.0
arrdelay (Double) Arrival delay minutes 40.0
crselapsedtime (Double) Elapsed time 390.0
dist (Double) Distance 2475.0

I have already cleaned up, limited the number of airports and carriers, and transformed the data into two JSON files: one for training and one for testing. (You can see the code for the cleanup in the GitHub repository.) The JSON file has the following format:

{
    "_id": "AA_2017-01-01_ATL_LGA_1678",
    "dofW": 7,
    "carrier": "AA",
    "origin": "ATL",
    "dest": "LGA",
    "crsdephour": 17,
    "crsdeptime": 1700,
    "depdelay": 0.0,
    "crsarrtime": 1912,
    "arrdelay": 0.0,
    "crselapsedtime": 132.0,
    "dist": 762.0
}

You can run the code for this example with MapR 5.2.1 or MapR 6.0 (which includes Spark 2.1), and you can view the notebook code with the Zeppelin viewer.

Zepplin Notebook with Spark

Load the Data From a File Into a Dataframe

Scala Case Class

We use a Scala case class and Structype to define the schema, corresponding to a line in the JSON data file.

Define Schema for JSON file data

Below, we specify the data source, schema, and class to load into a dataset. We load the data from January and February, which we will use for training the model. (Note that specifying the schema when loading data into a DataFrame will give better performance than schema inference.)

Read the data from JSON file into a Dataset of type Flight

The DataFrame show method displays the first 20 rows:

Dataframe Show Method

Here, we load data from March and April, which we will use for testing the model:

Data from March and April

Summary Statistics

Spark DataFrames include some built-in functions for statistical processing. The describe() function performs summary statistics calculations on all numeric columns and returns them as a DataFrame.

Perform summary statistics on selected columns

Data Exploration

We can use Spark SQL to explore the dataset. Here are some example queries using the Spark SQL:

Register Dataset as a Temporary View in order to explore with SQL

Below, we display information for the longest departure delays:

Top 5 Longest departure delays

Below, we display the average departure delay by Carrier:

Average Departure Delay by Carrier

We want to predict flight delays where depdelay > 40 minutes, so let’s explore this data. Below, we see that United Airlines and Delta have the highest count of flight delays for January and February 2017 (the training set).

Count of Departure Delays by Carrier

In the query below, we see that Monday and Sunday have the highest count of flight delays.

Count of Departure Delays by Day of the Week

In the query below, we see that the hours between 13:00-19:00 have the highest count of flight delays.

Count of Departure Delays by Hour of Day

In the query below, we see that the origin airports Chicago and Atlanta have the highest count of flight delays.

Count of Departure Delays by Origin

In the query below, we see that the destination airports San Francisco and Newark have the highest count of flight delays.

Count of Departure Delays by Destination

In the query below, we see the count of departure delays by Origin and destination. The routes ORD -> SFO and DEN -> SFO have the highest delays, maybe because of weather in January and February. Adding weather to this dataset would give better results, but that is left as an exercise for the reader.

Count of Departure Delays by Origin, Destination

In the code below, a Spark Bucketizer is used to split the dataset into delayed and not delayed flights with a delayed 0/1 column. Then, the resulting total counts are displayed. Grouping the data by the delayed field and counting the number of instances in each group shows that there are roughly eight times as many not delayed samples as delayed samples.

Add Labels for Delayed Flights and count

In the query below, we see the count of not delayed (0=dark blue) and delayed (1=light blue) flights by origin.

Count of Departure Delays by Origin

Stratified Sampling

In order to ensure that our model is sensitive to the delayed samples, we can put the two sample types on the same footing using stratified sampling. The DataFrames sampleBy() function does this when provided with fractions of each sample type to be returned. Here, we're keeping all instances of delayed, but downsampling the not delayed instances to 29%, then displaying the results.

Stratify the Sampling to fewer Not Delayed

Features Array

To build a classifier model, you extract the features that most contribute to the classification. In this scenario, we will build a tree to predict the label of delayed or not based on the following features:

  • Label -> delayed = 0
    • Delayed = 1 if delay > 40 minutes
  • Features -> {day of the week, scheduled departure time, scheduled arrival time, carrier, scheduled elapsed time, origin, destination, distance}
delayed dofW crsdepTime crsArrTime carrier elapTime origin dest dist
1.0/0.0 1 1015 1230 AA 385.0 JKF LAX 2475.0

In order for the features to be used by a machine learning algorithm, they must be transformed and put into feature vectors, which are vectors of numbers representing the value for each feature.

Reference Learning Spark

Using the Spark ML Package

The ML package is the newer library of machine learning routines. Spark ML provides a uniform set of high-level APIs built on top of DataFrames.

ML Pipeline

We will use an ML Pipeline to pass the data through transformers in order to extract the features and an estimator to produce the model.

  • Transformer: A Transformer is an algorithm that transforms one DataFrame into another DataFrame. We will use a transformer to get a DataFrame with a features vector column.
  • Estimator: An Estimator is an algorithm that can be fit on a DataFrame to produce a Transformer. We will use an estimator to train a model that can transform data to get predictions.
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

Feature Extraction and Pipelining

The ML package needs the label and feature vector to be added as columns to the input DataFrame. We set up a pipeline to pass the data through transformers in order to extract the features and label. We use a StringIndexer to encode a string columns to a column of number indices. We use a OneHotEncoder to map a number indices column to a column of binary vectors, with at most a single one-value. Encoding categorical features allows decision trees to treat categorical features appropriately, improving performance. An example of StringIndexing and OneHotEncoding for the carrier is shown below:

Example of StringIndexing and OneHotEncoding

use a combination of StringIndexer and OneHotEncoder to encode categorical columns

Below, a Bucketizer is used to add a label of delayed 0/1. The VectorAssembler combines a given list of columns into a single feature vector column.

Use VectorAssembler, a transformer, to put features into a feature vector column

The result of running these transformers in a pipeline will be to add a label and features column to the dataset as shown below.

Add a label and features column

The final element in our pipeline is an estimator (a decision tree classifier), training on the vector of labels and features.

Create Decision Tree Estimator , set Label and Feature Columns

Below, we chain the indexers and tree in a Pipeline.

Chain the indexers

Tree in a Pipeline

Train the Model

We would like to determine which parameter values of the decision tree produce the best model. A common technique for model selection is k-fold cross-validation, where the data is randomly split into k partitions. Each partition is used once as the testing dataset, while the rest are used for training. Models are then generated using the training sets and evaluated with the testing sets, resulting in k model performance measurements. The model parameters leading to the highest performance metric produce the best model.

ML Cross-Validation Process

Spark ML supports k-fold cross-validation with a transformation/estimation pipeline to try out different combinations of parameters, using a process called grid search, where you set up the parameters to test, and a cross-validation evaluator to construct a model selection workflow.

Below, we use a ParamGridBuilder to construct the parameter grid. We define an Evaluator, which will evaluate the model by comparing the test label column with the test prediction column. We use a CrossValidator for model selection.

Set up a CrossValidator with the parameters, a tree estimator and evaluator

The CrossValidator uses the Estimator Pipeline, the Parameter Grid, and the Classification Evaluator to fit the training data set and returns a model.

Use Cross Validator Estimator to fit the training data set

The CrossValidator uses the ParamGridBuilder to iterate through the maxDepth parameter of the decision tree and evaluate the models, repeating three times per parameter value for reliable results.

The CrossValidator

Next, we can get the best decision tree model in order to print out the decision tree and feature importance. (Note that the OneHotEncoders increases the number of features. In order to understand this printout better, I built a tree without the encoders, which gave a slightly lower accuracy.)

Get the best decision tree Model

We find that the best tree model produced using the cross-validation process is one with a depth of 6. The toDebugString() function provides a print of the tree's decision nodes and final prediction outcomes at the end leaves. Below is a partial printout of the decision tree:

treeModel.toDebugString

The features numbers correspond to the following:

( 0=carrierIndexed, 1=destIndexed, 2=originIndexed, 3=dofW, 4=crsdephour, 5=crselapsedtime, 6=crsarrtime, 7=crsdeptime, 8=dist)

Below, we can see that the feature importance in order is:

  1. Scheduled departure time (feature 7)
  2. Destination (feature 1)
  3. Origin (feature 2)
  4. Scheduled arrival time (feature 6)
  5. Day of the week (feature 3)

featureImportances

features importances

Decision trees are often used for feature selection because they provide an automated mechanism for determining the most important features (those closest to the tree root).

Predictions and Model Evaluation

The actual performance of the model can be determined using the test dataset that has not been used for any training or cross-validation activities.

We transform the test DataFrame with the model pipeline, which will transform the features according to the pipeline, estimate, and then return the label predictions in a column of a new DataFrame.

Transform Dataframe

Get Predictions from test dataset

The evaluator will provide us with the score of the predictions. Accuracy is measured by the area under the ROC curve. The area measures the ability of the test to correctly classify true positives from false positives. A random predictor would have .5 accuracy. The closer the value is to 1, the better its predictions are. In this case, the evaluation returns 81% precision.

Evaluator predictions

Evaluate the predictions accuracy

Below, we calculate some more metrics. The number of false/true positive and negative predictions is also useful:

  • True positives are how often the model correctly predicted delayed flights.
  • False positives are how often the model incorrectly predicted delayed flights.
  • True negatives indicate how often the model correctly predicted not delayed flights.
  • False negatives indicate how often the model incorrectly predicted not delayed flights.

Calculate some more metrics

Calculate some more metrics

Save the Model

We can now save our fitted Pipeline for later use with streaming events. This saves both the feature extraction stage and the decision tree model chosen by model tuning.

Save the Model to the file system for later use

The result of saving the pipeline model is a JSON file for metadata and a Parquet for model data. We can re-load the model with the load command; the original and re-loaded models are the same:

val sameCVModel = CrossValidatorModel.load(“../cfModel")

Code

Running the Code

All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Converged Data Platform.

TrueSight is an AIOps platform, powered by machine learning and analytics, that elevates IT operations to address multi-cloud complexity and the speed of digital transformation.

Topics:
ai ,tutorial ,machine learning ,predictive analytics ,apache spark ,api

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}