Building Real-Time Analytics Dashboards With Postgres and Citus
Building Real-Time Analytics Dashboards With Postgres and Citus
Learn how to build a real-time analytics dashboard with the Citus extension to Postgres, which works well for users where a data warehouse may not work nearly as well.
Join the DZone community and get the full member experience.Join For Free
How to Simplify Apache Kafka. Get eBook.
Citus scales out Postgres for a number of different use cases, both as a system of record and as a system of engagement. One use case we're seeing implemented a lot these days is using the Citus database to power customer-facing real-time analytics dashboards, even when dealing with billions of events per day. Dashboards and pipelines are easy to handle when you're at 10 GB of data; as you grow, even basic operations like a count of unique users require non-trivial engineering work to get performing well.
Citus is a good fit for these types of event dashboards because of its ability to ingest large amounts of data, perform rollups concurrently, mix raw unrolled-up data with pre-aggregated data, and support a large number of concurrent users. Adding all these capabilities together, the Citus extension to Postgres works well for end users where a data warehouse may not work nearly as well.
We've talked some here about various parts of building a real-time customer-facing dashboard, but today, we thought we'd go one step further and give you a guide for doing it end-to-end.
The first step is data ingestion. In any POC, we generally start with single-record inserts. This is usually the quickest item to put in place and we can start ingesting data easily at tens of thousands of events per second. We generally opt for single-row inserts as the quickest thing to set up, but before production, we move to a micro-batching approach.
\copy Postgres bulk loading utility, we're able to ingest millions of events per second, if needed.
\copy is fully transactional and the load is distributed across all nodes with Citus, making your Citus coordinator node less of a bottleneck that you would expect.
Most event pipelines we've observed have some upstream process already handling events, such as Kafka or Kinesis, which makes batching easy to put in place. As for the batching process itself, you can opt to do this either on a time basis (say, minutely batches) or every X records (this could also be every five minutes or even hourly depending on your requirements.) Your batches don't have to be hundreds of thousands of events; even something like every 1,000 records can give you a nice performance boost, as you can have multiple
\copy processes running in parallel.
Structuring Your Raw Events
Your raw event table will vary based on your use case, but there are some commonalities across most. In almost all cases, you have the time of the event and some customer identifier associated with it. Typically, there will be some categorization and details about the event. These details can be broken out as columns or could also be contained within a JSONB datatype. For this example dashboard, we'll use the following schema:
CREATE TABLE events( id bigint, timestamp timestamp, customer_id bigint, event_type varchar, country varchar, browser varchar, device_id bigint, session_id bigint ); SELECT create_distributed_table('events','customer_id');
Rolling Up Your Data
Once you've got some raw data coming in, we can now start rolling up data. To do this, we're going to first create several roll-up tables — some for five-minute intervals, some for hourly, and some for daily.
CREATE TABLE rollup_events_5min ( customer_id bigint, event_type varchar, country varchar, browser varchar, minute timestamp, event_count bigint, device_distinct_count hll, session_distinct_count hll ); CREATE UNIQUE INDEX rollup_events_5min_unique_idx ON rollup_events_5min(customer_id,event_type,country,browser,minute); SELECT create_distributed_table('rollup_events_5min','customer_id'); CREATE TABLE rollup_events_1hr ( customer_id bigint, event_type varchar, country varchar, browser varchar, hour timestamp, event_count bigint, device_distinct_count hll, session_distinct_count hll ); CREATE UNIQUE INDEX rollup_events_1hr_unique_idx ON rollup_events_1hr(customer_id,event_type,country,browser,hour); SELECT create_distributed_table('rollup_events_1hr','customer_id');
One thing you'll notice in our roll-up tables is the use of the HLL (HyperLogLog) data type. HyperLogLog is a sketch algorithm that allows you to do operations over unique buckets. HyperLogLog makes it easy to find intersections, unions, etc. across various buckets, making it incredibly useful for the types of reports your dashboard may generate.
In the above example, we have chosen
(customer_id, event_type, country, browser, minute/hour) as the dimensions on which we evaluate metrics such as
session_distinct_count, etc. Based on your query workload and performance requirements, you can choose the dimensions that make sense for you. If needed, you can create multiple roll-up tables to serve different query types (just don't go too crazy with a table for every dimension).
Ideally, you should choose dimensions that you get suitable compression (>5-10x) compared to the raw tables. Based on our customers' experiences, we have, at times, seen orders of magnitude in compression after roll-ups — up to 100x or 1000x.
For our roll-up query, we're going to do an
INSERT INTO... SELECT, which will run across all the nodes in our cluster and parallelize.
Note: Our raw tables and roll-up tables are sharded on the same key. In this case, the sharding key is the
customer_id, which is more or less a proxy for customer/tenant ID. Other granular columns can also be chosen as shard keys depending on use case.
To perform our roll-up, we'll create the function:
CREATE OR REPLACE FUNCTION compute_rollups_every_5min(start_time TIMESTAMP, end_time TIMESTAMP) RETURNS void LANGUAGE PLPGSQL AS $function$ BEGIN RAISE NOTICE 'Computing 5min rollups from % to % (excluded)', start_time, end_time; RAISE NOTICE 'Aggregating data into 5 min rollup table'; EXECUTE $$ INSERT INTO rollup_events_5min SELECT customer_id, event_type, country, browser, date_trunc('seconds', (timestamp - TIMESTAMP 'epoch') / 300) * 300 + TIMESTAMP 'epoch' AS minute, count(*) as event_count, hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count, hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count FROM events WHERE timestamp >= $1 AND timestamp<=$2 GROUP BY customer_id, event_type, country, browser, minute ON CONFLICT (customer_id,event_type,country,browser,minute) DO UPDATE SET event_count=excluded.event_count, device_distinct_count = excluded.device_distinct_count, session_distinct_count= excluded.session_distinct_count;$$ USING start_time, end_time; RAISE NOTICE 'Aggregating/Upserting into 1 hr rollup table'; EXECUTE $$ INSERT INTO rollup_events_1hr SELECT customer_id, event_type, country, browser, date_trunc('hour', timestamp) as hour, count(*) as event_count, hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count, hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count FROM events WHERE timestamp >= $1 AND timestamp<=$2 GROUP BY customer_id, event_type, country, browser, hour ON CONFLICT (customer_id,event_type,country,browser,hour) DO UPDATE SET event_count=rollup_events_1hr.event_count+excluded.event_count, device_distinct_count = rollup_events_1hr.device_distinct_count || excluded.device_distinct_count, session_distinct_count= rollup_events_1hr.session_distinct_count || excluded.session_distinct_count;$$ USING start_time, end_time; END; $function$;
For rolling up the last five minutes of data, we can now trigger our function:
SELECT compute_rollups_every_5min((now()-interval '5 minutes')::timestamp, now()::timestamp);
Automating Your Roll-Ups
Scheduling your own background job to run and perform the roll-ups is an option. When using Citus, you can just as easily schedule a job directly in your database since the database is doing all the heavy lifting. With
pg_cron, you can schedule your rollup and call the function
compute_rollups_every_5min to run every five minutes on top of the raw data.
Querying the Real-Time Events Dashboard
With our system in place now ingesting data and performing roll-ups, we can get to the fun part of querying — because all of our roll-up tables are significantly smaller in size than the raw data you'll see performant queries against them. And all the while, Citus is now continually rolling up the raw data and storing it in an efficient data type (HyperLogLog) to let you compose all sorts of reports based on the data.
Let's look at a few examples of queries you may want to run:
--Get me the total number of events and count of distinct devices in the last 5 minutes? SELECT sum(event_count), hll_cardinality(sum(device_distinct_count)) FROM rollup_events_5min where minute >=now()-interval '5 minutes' AND minute <=now() AND customer_id=1; --Get me the count of distinct sessions over the last week? SELECT sum(event_count), hll_cardinality(sum(device_distinct_count)) FROM rollup_events_1hr where hour >=date_trunc('day',now())-interval '7 days' AND hour <=now() AND customer_id=1; -- Get me the trend of my app usage in the last 2 days broken by hour SELECT hour, sum(event_count) event_count, hll_cardinality(sum(device_distinct_count)) device_count, hll_cardinality(sum(session_distinct_count)) session_count FROM rollup_events_1hr where hour >=date_trunc('day',now())-interval '2 days' AND hour <=now() AND customer_id=1 GROUP BY hour;
Data is everywhere in this day and age, and key to being successful is being able to derive insights from your data. By making data available in real time, you're able to create more value for your users. With Citus, you can both have real-time ingestion as well as real-time reporting with high concurrency without having to throw out SQL, which is the lingua franca when it comes to data.
Published at DZone with permission of Craig Kerstiens , DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.