Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Building Real-Time Analytics Dashboards With Postgres and Citus

DZone's Guide to

Building Real-Time Analytics Dashboards With Postgres and Citus

Learn how to build a real-time analytics dashboard with the Citus extension to Postgres, which works well for users where a data warehouse may not work nearly as well.

· Big Data Zone ·
Free Resource

The open source HPCC Systems platform is a proven, easy to use solution for managing data at scale. Visit our Easy Guide to learn more about this completely free platform, test drive some code in the online Playground, and get started today.

Citus scales out Postgres for a number of different use cases, both as a system of record and as a system of engagement. One use case we're seeing implemented a lot these days is using the Citus database to power customer-facing real-time analytics dashboards, even when dealing with billions of events per day. Dashboards and pipelines are easy to handle when you're at 10 GB of data; as you grow, even basic operations like a count of unique users require non-trivial engineering work to get performing well.

Citus is a good fit for these types of event dashboards because of its ability to ingest large amounts of data, perform rollups concurrently, mix raw unrolled-up data with pre-aggregated data, and support a large number of concurrent users. Adding all these capabilities together, the Citus extension to Postgres works well for end users where a data warehouse may not work nearly as well.

We've talked some here about various parts of building a real-time customer-facing dashboard, but today, we thought we'd go one step further and give you a guide for doing it end-to-end.

Ingesting Data

The first step is data ingestion. In any POC, we generally start with single-record inserts. This is usually the quickest item to put in place and we can start ingesting data easily at tens of thousands of events per second. We generally opt for single-row inserts as the quickest thing to set up, but before production, we move to a micro-batching approach.

With the \copy Postgres bulk loading utility, we're able to ingest millions of events per second, if needed. \copy is fully transactional and the load is distributed across all nodes with Citus, making your Citus coordinator node less of a bottleneck that you would expect.

Most event pipelines we've observed have some upstream process already handling events, such as Kafka or Kinesis, which makes batching easy to put in place. As for the batching process itself, you can opt to do this either on a time basis (say, minutely batches) or every X records (this could also be every five minutes or even hourly depending on your requirements.) Your batches don't have to be hundreds of thousands of events; even something like every 1,000 records can give you a nice performance boost, as you can have multiple \copy processes running in parallel.

Structuring Your Raw Events

Your raw event table will vary based on your use case, but there are some commonalities across most. In almost all cases, you have the time of the event and some customer identifier associated with it. Typically, there will be some categorization and details about the event. These details can be broken out as columns or could also be contained within a JSONB datatype. For this example dashboard, we'll use the following schema:

CREATE TABLE events(
  id bigint,
  timestamp timestamp, 
  customer_id bigint,
  event_type varchar,
  country varchar,
  browser varchar,
  device_id bigint,
  session_id bigint
  );

SELECT create_distributed_table('events','customer_id');

Rolling Up Your Data

Once you've got some raw data coming in, we can now start rolling up data. To do this, we're going to first create several roll-up tables — some for five-minute intervals, some for hourly, and some for daily.

CREATE TABLE rollup_events_5min (
 customer_id bigint,
 event_type varchar,
 country varchar,
 browser varchar,
 minute timestamp,
 event_count bigint,
 device_distinct_count hll,
 session_distinct_count hll
);

CREATE UNIQUE INDEX rollup_events_5min_unique_idx ON rollup_events_5min(customer_id,event_type,country,browser,minute);

SELECT create_distributed_table('rollup_events_5min','customer_id');

CREATE TABLE rollup_events_1hr (
 customer_id bigint,
 event_type varchar,
 country varchar,
 browser varchar,
 hour timestamp,
 event_count bigint,
 device_distinct_count hll,
 session_distinct_count hll
);
CREATE UNIQUE INDEX rollup_events_1hr_unique_idx ON rollup_events_1hr(customer_id,event_type,country,browser,hour);

SELECT create_distributed_table('rollup_events_1hr','customer_id');

One thing you'll notice in our roll-up tables is the use of the HLL (HyperLogLog) data type. HyperLogLog is a sketch algorithm that allows you to do operations over unique buckets. HyperLogLog makes it easy to find intersections, unions, etc. across various buckets, making it incredibly useful for the types of reports your dashboard may generate.

In the above example, we have chosen (customer_id, event_type, country, browser, minute/hour) as the dimensions on which we evaluate metrics such as event_countdevice_distinct_count, session_distinct_count, etc. Based on your query workload and performance requirements, you can choose the dimensions that make sense for you. If needed, you can create multiple roll-up tables to serve different query types (just don't go too crazy with a table for every dimension).

Ideally, you should choose dimensions that you get suitable compression (>5-10x) compared to the raw tables. Based on our customers' experiences, we have, at times, seen orders of magnitude in compression after roll-ups — up to 100x or 1000x.

For our roll-up query, we're going to do an INSERT INTO... SELECT, which will run across all the nodes in our cluster and parallelize.

Note: Our raw tables and roll-up tables are sharded on the same key. In this case, the sharding key is the customer_id, which is more or less a proxy for customer/tenant ID. Other granular columns can also be chosen as shard keys depending on use case.

To perform our roll-up, we'll create the function:

CREATE OR REPLACE FUNCTION compute_rollups_every_5min(start_time TIMESTAMP, end_time TIMESTAMP) RETURNS void LANGUAGE PLPGSQL AS $function$
BEGIN
  RAISE NOTICE 'Computing 5min rollups from % to % (excluded)', start_time, end_time;

RAISE NOTICE 'Aggregating data into 5 min rollup table';
EXECUTE $$
INSERT INTO rollup_events_5min
SELECT customer_id,
   event_type,
   country,
   browser,
   date_trunc('seconds', (timestamp - TIMESTAMP 'epoch') / 300) * 300 + TIMESTAMP 'epoch' AS minute,
   count(*) as event_count,
   hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count,
   hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count
FROM events WHERE timestamp >= $1 AND timestamp<=$2
GROUP BY
customer_id,
event_type,
country,
browser,
minute
ON CONFLICT (customer_id,event_type,country,browser,minute)
DO UPDATE
SET
   event_count=excluded.event_count,
   device_distinct_count = excluded.device_distinct_count,
   session_distinct_count= excluded.session_distinct_count;$$
USING start_time, end_time;

RAISE NOTICE 'Aggregating/Upserting into 1 hr rollup table';
EXECUTE $$
INSERT INTO rollup_events_1hr
SELECT customer_id,
   event_type,
   country,
   browser,
   date_trunc('hour', timestamp) as hour,
   count(*) as event_count,
   hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count,
   hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count
FROM events WHERE timestamp >= $1 AND timestamp<=$2
GROUP BY
customer_id,
event_type,
country,
browser,
hour
ON CONFLICT (customer_id,event_type,country,browser,hour)
DO UPDATE
SET
   event_count=rollup_events_1hr.event_count+excluded.event_count,
   device_distinct_count = rollup_events_1hr.device_distinct_count || excluded.device_distinct_count,
   session_distinct_count= rollup_events_1hr.session_distinct_count || excluded.session_distinct_count;$$
USING start_time, end_time;
END;
$function$;

For rolling up the last five minutes of data, we can now trigger our function:

SELECT compute_rollups_every_5min((now()-interval '5 minutes')::timestamp, now()::timestamp);

Automating Your Roll-Ups

Scheduling your own background job to run and perform the roll-ups is an option. When using Citus, you can just as easily schedule a job directly in your database since the database is doing all the heavy lifting. With pg_cron, you can schedule your rollup and call the function compute_rollups_every_5min to run every five minutes on top of the raw data.

Querying the Real-Time Events Dashboard

With our system in place now ingesting data and performing roll-ups, we can get to the fun part of querying — because all of our roll-up tables are significantly smaller in size than the raw data you'll see performant queries against them. And all the while, Citus is now continually rolling up the raw data and storing it in an efficient data type (HyperLogLog) to let you compose all sorts of reports based on the data.

Let's look at a few examples of queries you may want to run:

--Get me the total number of events and count of distinct devices in the last 5 minutes?

SELECT sum(event_count), hll_cardinality(sum(device_distinct_count)) FROM rollup_events_5min where minute >=now()-interval '5 minutes' AND minute <=now() AND customer_id=1;

--Get me the count of distinct sessions over the last week?

SELECT sum(event_count), hll_cardinality(sum(device_distinct_count)) FROM rollup_events_1hr where hour >=date_trunc('day',now())-interval '7 days' AND hour <=now() AND customer_id=1;

-- Get me the trend of my app usage in the last 2 days broken by hour

SELECT hour, sum(event_count) event_count, hll_cardinality(sum(device_distinct_count)) device_count, hll_cardinality(sum(session_distinct_count)) session_count FROM rollup_events_1hr where hour >=date_trunc('day',now())-interval '2 days' AND hour <=now() AND customer_id=1 GROUP BY hour;

Data is everywhere in this day and age, and key to being successful is being able to derive insights from your data. By making data available in real time, you're able to create more value for your users. With Citus, you can both have real-time ingestion as well as real-time reporting with high concurrency without having to throw out SQL, which is the lingua franca when it comes to data. 

Managing data at scale doesn’t have to be hard. Find out how the completely free, open source HPCC Systems platform makes it easier to update, easier to program, easier to integrate data, and easier to manage clusters. Download and get started today.

Topics:
big data ,real-time data analytics ,data analytics ,postgres ,sharding ,citus ,data ingestion ,data streaming ,batching ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}