{{announcement.body}}
{{announcement.title}}

Design a Real-Time ETA Prediction System Using Kafka, DynamoDB, and Rockset

DZone 's Guide to

Design a Real-Time ETA Prediction System Using Kafka, DynamoDB, and Rockset

These are strange times. Cities are in lockdown, and few are venturing outside. Therefore, the increased use of on-demand logistics services, like online foo...

· Big Data Zone ·
Free Resource

These are strange times. Cities are in lockdown, and few are venturing outside. Therefore, the increased use of on-demand logistics services, like online food delivery, doesn't come as a surprise.

Most of these applications provide a near real-time tracking of the ETA once you place the order. Building a scalable, distributed, and real-time ETA prediction system is a tough task, but what if we could simplify its design? We'll break our system into pieces such that each component is responsible for one primary job.

Let's take a look at components that constitute the system.

  1. Delivery driver/rider app - The Android/iOS app installed on a delivery person's device.
  2. Customer app - The Android/iOS app installed on a customer's device.
  3. Rockset - The query engine powering all the models and services.
  4. Message queue - Used for transferring data between various components. For this example, we will use Kafka.
  5. Key-value storage - Used for storing orders and parameters for the model. For this example, we will use DynamoDB.

Inputs to the Model

Driver Location

To get an accurate ETA estimation, you will need the delivery person's position, specifically the latitude and longitude. You can get this information easily via GPS in a device. A call to the device GPS provider returns latitude, longitude, and the accuracy of the location in meters.

You can run a background service in the app that retrieves the GPS coordinates every 10 seconds. The coordinates, as such, are too fine-grained to make a prediction. To increase the granularity of the GPS, we will be using the concept of geohash. A geohash is a standardized N-letter hash of a location that represents an area of M sq. miles. N and M are inversely proportional, so a larger N represents a smaller area M. You can refer to this for more info on geohash.

There are tons of libraries available to convert latitude-longitude to geohash. Here we'll be using geo by davidmoten to get a 6-7 letter geohash.

The service then pushes the geohash along with the coordinates to a Kafka topic. Rockset ingests data from this Kafka topic and updates it into a collection called locations.

Orders

The orders placed by a customer are stored in DynamoDB for further processing. An order generally goes through a life cycle consisting of the following states:

All of the above state changes are updated in DynamoDB along with additional data such as the source location, destination location, order details, etc. Once an order is delivered, the actual time of arrival is also stored in the database.

Rockset also ingests updates from DynamoDB orders table and updates it into a collection called orders.

ML Model

Exponential Smoothing

We have the actual time of arrival along with the source and the destination for order available from the orders table. We will refer to it as TA. You can take the mean of all the TA with source as delivery person's latest location and destination as customer's location, and you can get an approximate ETA. However, this is not that accurate as it doesn't account for changing factors, such as new construction activities in the area or new shorter routes to the destination.

To do that, we need a prediction model that is simplistic and easy to debug and has good accuracy.

This is where exponential smoothing comes into play. An exponentially smoothened value is calculated using the formula:

St = Alpha * Xt + (1 - Alpha) * St-1

where


  • St => Smoothened value at time t
  • Xt => Actual value at time t
  • Alpha => Smoothing factor

In our context, S t represents the ETA and X t represents the most recent actual time of arrival for a source-destination pair in our orders table.

ETAt = Alpha * TAt + (1 - Alpha) * ETAt-1

Rockset

The serving layer for the current system needs to satisfy three primary criteria:

  1. Ability to handle millions of writes per minute - Each delivery person's app will be pushing GPS coordinates every 5-10 seconds, which will lead to a new ETA. A typical large scale food delivery company has almost 100K delivery persons.
  2. The data fetch latency should be minimal - For a great UX, we should be able to update ETA on the customer app as soon as it is updated.
  3. Ability to handle schema changes on the fly - we can store additional metadata such as ETA prediction accuracy and model version in the future. We don't want to create a new data source whenever we add a new field.

Rockset satisfies all of them. It has:

  1. Dynamic Scaling - More resources are added as and when needed to handle large volumes of data.
  2. Distributed Query Processing - Parallelisation of queries across multiple nodes to minimize latency
  3. Schemaless Ingest - to support schema changes on the fly.

Rockset has a built-in connector to Apache Kafka. We can use this Kafka connector to ingest location data of the delivery person.

To perform exponential smoothing in Rockset, we create two Query Lambdas. Query Lambdas in Rockset are named, parameterized SQL queries stored in Rockset that can be executed from a dedicated REST endpoint.

1. calculate_ETA: The Query Lambda expects alpha, source, and destination as a parameter. It returns an exponentially smoothened ETA. It runs the following query to get the desired result:

SQL
 




xxxxxxxxxx
1
32


1
SELECT
2
    (:alpha * SUM(term)) + (POW((1 - :alpha), MAX(idx))* MIN_BY(ta_i, time_i)) as ans
3
FROM
4
    (
5
        (
6
            SELECT
7
                order_id,
8
                ta_i,
9
                (ta_i * POW((1 - :alpha), (idx - 1))) AS term,
10
                time_i,
11
                idx
12
            FROM
13
                (
14
                    SELECT
15
                        order_id,
16
                        CAST(ta AS int) as ta_i,
17
                        time_i,
18
                        ROW_NUMBER() OVER(
19
                            ORDER BY
20
                                time_i DESC, order_id ASC
21
                        ) AS idx
22
                    FROM
23
                        commons.orders_fixed
24
                    WHERE
25
                        source_geohash = :source
26
                        AND
27
                        destination_geohash = :destination
28
                    ORDER BY
29
                        time_i DESC, order_id ASC
30
                ) AS idx
31
        ) AS terms
32
    )



2. calculate_speed: This Query Lambda requires order_id as param and returns the average speed of the delivery person while in transit. It runs the following query:

SQL
 




xxxxxxxxxx
1
27


1
SELECT
2
    SUM(ST_DISTANCE(prev_geo, geo) /(ts - prev_ts)) / COUNT(*) AS speed
3
FROM
4
    (
5
        SELECT
6
            geo,
7
            LEAD(geo, 1) OVER(
8
                ORDER BY
9
                    ts DESC
10
            ) AS prev_geo,
11
            ts,
12
            LEAD(ts, 1) OVER(
13
                ORDER BY
14
                    ts DESC
15
            ) AS prev_ts
16
        FROM
17
            (
18
                SELECT
19
                    ST_GEOGPOINT(CAST(lng AS double), CAST(lat AS double)) AS geo,
20
                    order_id,
21
                    CAST(timestamp as int) AS ts
22
                FROM
23
                    commons.locations
24
                WHERE
25
                    order_id = :order_id
26
            ) AS ts
27
    ) As speed



Predict the ETA

The customer app initiates the request to predict the ETA. It passes the order id in the API call.

The request goes to the query service. Query service performs the following functions:


  1. Fetch the latest smoothing factors Alpha and Beta from DynamoDB. Here, Alpha is the smoothing parameter and Beta is the weight assigned to historical ETA while calculating the final ETA. Refer step 6 for more details
  2. Fetch the destination geohash for the order id.
  3. Fetch the current driver geohash from the locations collection.
  4. Trigger calculate_ETA Query Lamba in Rockset with smoothing factor alpha as param and driver geohash as source geohash and destination geohash from step 2. Let's call this historical ETA. 
Shell
 







     5. Trigger calculate_speed Query Lambda in Rockset with current order id as param

Shell
 







     6. The predicted ETA is then calculated by query service as

Predicted ETA = Beta * (historical ETA) + (1 - Beta) * distance(driver, destination)/speed

The predicted ETA is then returned to the customer app.

Feedback Loop

ML models require retraining so that their predictions are accurate. In our scenario, it is quite necessary to re-train the ML model so as to account for changing weather conditions, festivals, etc. This is where the parameter tuning service comes into play.

Parameter Tuning Service

Once an ETA is predicted, you can store the predicted ETA, and the actual ETA in a collection called predictions. The primary motivation to store this data in Rockset instead of any other datastore is to create a real-time dashboard for measuring the accuracy of the model. This is needed to make sure the customers do not see absurd ETA values in their apps.

The next question is how to determine the smoothing factor Alpha. To solve this issue, we create a parameter tuning service, which is just a Flink batch Job. We fetch all the historical ETAs and TAs for orders for the past 7-30 days. We use the difference in these ETAs to calculate appropriate Alpha and Beta values. This can be done using a simple model such as logistic regression.

Once the service calculates the Alpha and Beta parameters, they are stored in DynamoDB in a table named smoothing_parameters. The query service fetches the parameters from this table when it receives a request from the consumer app.

You can train the parameter tuning model once a week using ETA data in locations collection.

Conclusion

The architecture is designed to handle more than a million requests per minute while being flexible enough to support the scaling of the application on the fly. The architecture also allows developers to switch or insert components such as adding new features (e.g. weather) or adding a filter layer to refine the ETA predictions. Here, Rockset helps us solve three primary requirements:

  1. Low-latency complex queries - Rockset allows us to make complicated queries such as exponential smoothing with just an API call. This is done by leveraging Query Lambdas. The Lambdas also support parameters that allow us to query for different locations.
  2. Highly scalable real-time ingestion - If you have approximately 100K drivers on your platform and each of their apps sends a GPS location every 5 seconds, then you are dealing with a throughput of 1.2 million requests per minute. Rockset allows us to query this data within seconds of events occurring.
  3. Data from multiple sources - Rockset allows us to ingest from multiple sources, such as Kafka and DynamoDB, using fully managed connectors that require minimal configuration.
Topics:
big data, dynamodb, eta prediction system, kafka, rockset

Published at DZone with permission of Kartik Khare . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}