Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Make Crucial Predictions as Data Comes

DZone 's Guide to

Make Crucial Predictions as Data Comes

Flink to the rescue! Make real-time predictions.

· Big Data Zone ·
Free Resource

fat-squirrel-with-nut-in-mouth

Flink: as fast as a squirrel

Walking by the hottest IT streets in these days means you've likely heard about achieving Streaming Machine Learning, i.e. moving AI towards streaming scenario and exploiting the real-time capabilities along with new Artificial Intelligence techniques. Moreover, you will also notice the lack of research related to this topic, despite the growing interest in it.

If we try to investigate it a little bit deeper then, we realize that a step is missing: nowadays, well-known streaming applications still don't get the concept of Model Serving properly, and industries still lean on lambda architecture in order to achieve the goal. Suppose a bank has a concrete frequently updated batch trained Machine Learning model (e.g. an optimized Gradient Descent applied to past buffer overflow attack attempts) and it wants to deploy the model directly to their own canary. 

Distributed IDS -  backed by the streaming system — in order to achieve real-time responses about the quality of the model. Notionally, the bank should have the opportunity to automatically load the trained model to the IDS and exploit it in real-time in order to compute predictions on incoming events, achieving persisted and always up to date covering, online fraud detection, and saving a lot of money.

You may also like: Apache Flink Basic Transformation Example.

Unfortunately, it happens that the bank is forced to distribute the model across the infrastructure with a pre-defined layout, and — most of the time — you have to deploy directly your weight vector and compute predictions by hard programming math instructions on it; given the cumbersome reality, the bank will lean on the good safe old parallel batch job which investigates persisted events as they come available to disk. In order to solve this huge gap, herein we present Flink-JPMML (repo), a fresh-made open-source Scala library aimed at achieving streaming Model Serving predictions at scale on Apache Flink real-time engine.

As Fast as Squirrels

Apache Flink is an open-source distributed streaming-first processing engine; it provides high-availability and exactly-once consistency as long as real-time complex event processing at ridiculous scale. Flink also provides batch computation as a sub-case of streaming. Radicalbit uses Flink at its core and still, it amazes for efficiency, robustness and scalability features, making itself perfectly fitting the core of a Kappa architecture. 

PMML stands for Predictive Mark-Up Model Language, and it represents a well-established standard for the persistence of Machine Learning models across different systems. PMML is based on a really efficient xml semantic, which allows defining trained unsupervised/supervised, probabilistic, and deep learning models in order to persist a source-independent trained model. This can be imported/exported by any system. We employed the JPMML-evaluator library in order to adopt the standard within Flink-jpmml.Coming at this step, we're ready to put our hands dirty.

User-Defined Predictions Like Flink API

First of all, in order to run Flink-JPMML add the following dependency: if you're a sbt-er, then

 "io.radicalbit" %% "flink-jpmml-scala" % "0.6.3" 

For maven users instead

<dependencies>
  <dependency>
    <groupId>io.radicalbit</groupId>
    <artifactId>flink-jpmml-scala</artifactId>
    <version>0.6.3</version>
  </dependency>
</dependencies>


Probably, you’ll need also to publish the library locally; in order to do that, follow these steps: 

  1. Launch sbt interface within the flink -> sbt.
  2. Jump in flink-jpmml-scala project directory > project flink-jpmml-scala.
  3. Publish the library at your local repo > publishLocal.

At this point, flink-jpmml expects scala-core, flink-streaming and flink-clients libraries as provided. Let’s go ahead. Wherever your PMML model resides, just provide the path to it.

 val sourcePath = "/path/to/your/pmml/model.xml" 

This will be the only thing you need to bother about: Flink-JPMML automatically checks the distributed backend accordingly to Flink by implementing a dedicated ModelReader.

import io.radicalbit.flink.pmml.scala.api.reader.ModelReader

val modelReader = ModelReader(sourcePath)


Now, let's define an input stream.

import org.apache.flink.streaming.api.scala._

case class IrisInput(pLength: Double, pWidth: Double, sLength: Double, sWidth: Double, timestamp: Long, color: Int, prediction: Option[String]) {
 def toVector: Vector = DenseVector(pLength, pWidth, sLength, sWidth)
 }

val env = StreamExecutionEnvironment.getExecutionEnvironment
val events: DataStream[IrisInput] = yourIrisSource(env)


Here we go. The following point import

 import io.radicalbit.flink.pmml.scala._ 

extends Flink DataStream with the evaluate method. Strictly speaking, it provides you the tool which lets us achieve streaming predictions in real-time.

import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector

val out = events.evaluate(modelReader) { (event, model) =>

// flink pmml model requires to be evaluated against Flink Vectors
val vectorEvent: Vector = event.toVector

// now we can call model: PmmlModel predict method
val prediction = model.predict(vectorEvent)

// Prediction container own the prediction result as a ADT called Score
prediction match {

case Prediction(Score(value)) =>
// return the event with updated prediction
event.copy(kind = Some(computeKind(value)))
case Prediction(EmptyScore) =>
// return just the event
logger.info("It was not possible to predict event {}", event); event
}
out.print()

env.execute("Flink JPMML simple execution.")
}

private def computeKind(value: Double): String = {
    value match {
    case 1.0 => "Iris-setosa"
    case 2.0 => "Iris-versicolor"
    case 3.0 => "Iris-virginica"
    case _ => "other"
    }
}


Now, you can take the sample PMML clustering model available here with the only duty to add class as output parameter; so lets simply add

<groupid>MiningField name="class" invalidValueTreatment="asIs" usageType="predicted"/</groupid>


to the mining fields list. Then, add class as output

<groupid>Output</groupid>
<groupid>OutputField name="PCluster" optype="class" dataType="integer" targetField="class" feature="entityId"/</groupid>
<groupid>/Output</groupid>


At this point, we’re ready to execute our job. Flink-JPMML will send you a log message about the loading state:

 19/09/10 14:33:11 INFO package$RichDataStream$$anon$1: Model has been read successfully, model name: k-means 
Finally, we have the operator output against some random flowers.

IrisInput(5.7,1.8,2.5,0.7, 34, 1495635020923, Some(Other))
IrisInput(5.5,3.8,5.2,4.3, 93, 1495635020233, Some(Iris-setosa))
IrisInput(4.3,2.3,2.0,3.1, 122, 1495635020100, Some(Other))
IrisInput(5.1,5.7,4.8,2.1,255, 1495635020583, Some(Iris-versicolor))
IrisInput(4.2,0.8,0.9,2.6, 0, 1495635020921, Some(Iris-virginica))


Flink-JPMML brings also a shortcut in order to perform quick predictions over a DataStream of Flink vectors. This feature comes as follow:

val vectorStream = events.map(_.toVector) 
val predictions: (Prediction, Vector) = vectorStream.evaluate(reader)


This comes extremely useful if the user needs to apply concrete math preprocessing before the evaluation and only the prediction result is required (e.g. model quality assessment).

What Happens Behind the Scenes?

Given a simple and easy to use API structure, flink-jpmml attempts to target out all the performance, making Flink one of the most powerful distributed processing engines today.

The Reader

The ModelReader object aims at retrieving the PMML model from every Flink supported distributed system; namely speaking, it’s able to load from any supported distributed file system (e.g. HDFS, Alluxio). The model reader instance is delivered to the Task Managers and the latter will leverage the former’s API at operator materialization time only: that means the model is lazily ridden.

The Model

The library allows Flink to load the model by the employment of a singleton loader per Task Manager, so it does read independently from the number of sub-tasks running on each TM. This optimization lets Flink scale the model evaluation in thread-safety, considering that even really base PMMLs can grow over several hundreds of MBs.

Evaluation as UDF

The evaluate method implements an underlying FlatMap implementation, and it’s enriched by the above-described user-defined function, provided by the user as a partial function. Formerly, the idea was to create something a-la-flinkML, i.e. a core object shaped by strategy patterns in order to compute predictions just like you’d do if you make use of typical ML libraries.

But, at the end of the day, we’re performing a streaming task, so the user has the unbounded input event and the model as an instance of PmmlModel. Herein Flink-JPMML demands the user to compute the prediction only, but anyway, the UDF allows to apply any kind of custom operation and any serializable output type is allowed.

Closing

We introduced a scalable light-weight library called Flink-JPMML, exploiting Apache Flink capabilities as a real-time processing engine and offering a brand-new way to serve any of your Machine Learning models exported with PMML standard. Along with the next post, we will discuss how Flink-JPMML lets the user manage NaN values and we will describe how the library handles failures; alongside, we will provide the reason behind Flink vector choice and we will point out the steps we expect to follow in order to keep this library better.

We’d be really pleased to welcome new contributors to Flink-JPMML, just check the repository and the open issues.


Related Articles

Topics:
machine learning ,machine learning algorithms ,machine learning and artificial intelligence ,streaming analytics ,spark structured streaming ,flink ,apache flink ,streaming technology ,big data analytics company

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}