{{announcement.body}}
{{announcement.title}}

Handling Real-Time Updates in ClickHouse

DZone 's Guide to

Handling Real-Time Updates in ClickHouse

In this article, see how to handle real-time updates in ClickHouse by looking at a use case.

· Database Zone ·
Free Resource
Mutable data is generally unwelcome in OLAP databases. ClickHouse is no exception to the rule. Like some other OLAP products, ClickHouse did not even support updates originally. Later on, updates were added, but like many other things they were added in a “ClickHouse way.” 

Even now, ClickHouse updates are asynchronous, which makes them difficult to use in interactive applications. Still, in many use cases users need to apply modifications to existing data and expect to see the effect immediately. Can ClickHouse do that? Sure it can.  

A Short History of ClickHouse Updates

Back in 2016, the ClickHouse team published an article titled “How To Update Data in ClickHouse.” ClickHouse did not support data modifications at that time. Only special insert structures could be used in order to emulate updates, and data had to be dropped by partitions. 

Under the pressure of GDPR, requirements the ClickHouse team delivered UPDATEs and DELETEs in 2018. The follow-up article, Updates and Deletes in ClickHouse, is still one of the most read articles in the Altinity blog. Those asynchronous, non-atomic updates are implemented as ALTER TABLE UPDATE statements, and can potentially shuffle a lot of data. This is useful for bulk operations and infrequent updates, when immediate results are not needed. “Normal” SQL updates are still missing in ClickHouse, though they reliably appear in the roadmap every year. If real-time update behavior is required, we have to use a different approach. Let’s consider a practical use case and compare different ways of doing it in ClickHouse.

Use Case

Consider a system that generates various kinds of alerts. Users or machine learning algorithms query the database from time to time to review new alerts and acknowledge them. Acknowledgment operations need to modify the alert record in the database. Once acknowledged, alerts should disappear from the users’ views. This looks like an OLTP operation that is alien to ClickHouse.

Since we cannot use updates, we will have to insert a modified record instead. Once two records are in the database, we need an efficient way to get the latest one. For that we will try three different approaches:

  •  ReplacingMergeTree 

  • Aggregate functions

  •  AggregatingMergeTree   

ReplacingMergeTree

Let’s start by creating a table that stores alerts.

Java
 




x
12


 
1
CREATE TABLE alerts(
2
  tenant_id     UInt32,
3
  alert_id      String,
4
  timestamp     DateTime Codec(Delta, LZ4),
5
  alert_data    String,
6
  acked         UInt8 DEFAULT 0,
7
  ack_time      DateTime DEFAULT toDateTime(0),
8
  ack_user      LowCardinality(String) DEFAULT ''
9
)
10
ENGINE = ReplacingMergeTree(ack_time)
11
PARTITION BY tuple()
12
ORDER BY (tenant_id, timestamp, alert_id);



For simplicity, all alert specific columns are packaged into a generic  alert_data  column. But you can imagine that alert may contain dozens or even hundreds of columns. Also, alert_id is a random string in our example. 

Note the  ReplacingMergeTree engine. ReplacingMergeTee is a special table engine that replaces data by primary key (ORDER BY) -- the newer version of the row with the same key value will replace the older one. ‘Newness’ is determined by a column, ‘ack_time’ in our case. The replacement is performed during background merge operation. It does not happen immediately and there is no guarantee it happens at all, so consistency of the query results is a concern. ClickHouse has a special syntax to work with such tables, though, and we will be using it in the queries below.

Before we run queries, let’s fill the table with some data. We generate 10M alerts for 1000 tenants:

Java
 




x


 
1
INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
2
SELECT
3
  toUInt32(rand(1)%1000+1) AS tenant_id,
4
  randomPrintableASCII(64) as alert_id,
5
  toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
6
  randomPrintableASCII(1024) as alert_data
7
FROM numbers(10000000);



Next, let’s acknowledge 99% of alerts, providing new values for ‘acked’, ‘ack_user’ and ‘ack_time’ columns. Instead of an update, we just insert a new row.

Java
 




xxxxxxxxxx
1


 
1
INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
2
SELECT tenant_id, alert_id, timestamp, alert_data
3
  1 as acked
4
  concat('user', toString(rand()%1000)) as ack_user,       now() as ack_time
5
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;



If we query this table now, we will see something like:

Java
 




xxxxxxxxxx
1


 
1
SELECT count() FROM alerts
2
┌──count()─┐
3
 19898060 
4
└──────────┘
5
1 rows in set. Elapsed: 0.008 sec



So we definitely have both acknowledged and non-acknowledged rows in the table. So replacing does not happen yet. In order to see the ‘real’ data, we have to add a FINAL keyword.

Java
 




xxxxxxxxxx
1


 
1
SELECT count() FROM alerts FINAL
2
┌──count()─┐
3
 10000000 
4
└──────────┘
5
1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.) 



The count is correct now, but look at the query time! With FINAL, ClickHouse has to scan all rows and merge them by primary key in query time. That produces the correct answer but with a lot of overhead. Let’s see if we can do better by filtering only rows that have not been acknowledged.

Java
 




xxxxxxxxxx
1


 
1
SELECT count() FROM alerts FINAL WHERE NOT acked
2
┌─count()─┐
3
  101940 
4
└─────────┘
5
1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.) 



The query time and amount of data processed is the same, even though the count is much smaller. Filtering does not help to speed up the query. As the table size grows, the cost may be even more substantial. It does not scale.

Note: for the sake of readability, all queries and query times are presented as if they are running in ‘clickhouse-client’. In fact, we tried queries multiple times in order to make sure results are consistent and confirm it with the ‘clickhouse-benchmark’ utility as well.

Ok, querying the entire table is not that helpful. Can we still use ReplacingMergeTree for our use case? Let’s pick a random tenant_id, and select all records that were not acknowledged yet -- imagine there is a dashboard that the user is looking into. I like Ray Bradbury, so I picked 451. Since ‘alert_data’ is just random garbage, we will calculate a checksum, and will use it to confirm that the results are the same across multiple approaches:

Java
 




xxxxxxxxxx
1


 
1
SELECT 
2
  count(), 
3
  sum(cityHash64(*)) AS data
4
FROM alerts FINAL
5
WHERE (tenant_id = 451) AND (NOT acked)
6
┌─count()─┬─────────────────data─┐
7
      90  18441617166277032220 
8
└─────────┴──────────────────────┘
9
1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)



That was pretty fast! In 278 ms we could query all non-acknowledged data. Why is it fast this time? The difference is in the filter condition. ‘tenant_id’ is a part of a primary key, so ClickHouse can filter data before FINAL. In this case, ReplacingMergeTree becomes efficient.

Let’s try a user filter as well and query the number of alerts acknowledged by a particular user. The cardinality of the column is the same -- we have 1000 users and can try user451.

Java
 




xxxxxxxxxx
1


 
1
SELECT count() FROM alerts FINAL
2
WHERE (ack_user = 'user451') AND acked
3
┌─count()─┐
4
    9725 
5
└─────────┘
6
1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)



That is very slow now because the index is not used. ClickHouse scanned all 19.04 million rows. Note that we cannot add ‘ack_user’ to the index, since it will break ReplacingMergeTree semantics. We can do a trick with PREWHERE, though:

Java
 




xxxxxxxxxx
1


 
1
SELECT count() FROM alerts FINAL
2
PREWHERE (ack_user = 'user451') AND acked
3
┌─count()─┐
4
    9725 
5
└─────────┘
6
1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)



PREWHERE is a special hint for ClickHouse to apply a filter differently. Usually ClickHouse is smart enough to move conditions to PREWHERE automatically, so a user should not care. It did not happen this time, so it’s good we’ve checked.

Aggregate Functions

ClickHouse is known for supporting a wide variety of aggregate functions. In the latest versions it has got more than 100. Combined with 9 aggregate function combinators (see https://clickhouse.tech/docs/en/query_language/agg_functions/combinators/), this gives enormous flexibility to an experienced user. For this use case, we do not need anything advanced, and will be using only 3 functions: ‘argMax’, ‘max’ and ‘any’.

The same query for the 451st tenant can be executed with an ‘argMax’ aggregate function as follows:

Java
 




xxxxxxxxxx
1
14


1
SELECT count(), sum(cityHash64(*)) data FROM (
2
  SELECT tenant_id, alert_id, timestamp
3
         argMax(alert_data, ack_time) alert_data
4
         argMax(acked, ack_time) acked,
5
         max(ack_time) ack_time_,
6
         argMax(ack_user, ack_time) ack_user
7
  FROM alerts 
8
  GROUP BY tenant_id, alert_id, timestamp
9
10
WHERE tenant_id=451 AND NOT acked;
11
 
           
12
┌─count()─┬─────────────────data─┐
13
      90  18441617166277032220 
14
└─────────┴──────────────────────┘
15
 
           
16
1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)



Same result, same number of rows, but 4 times better performance! This is ClickHouse aggregation efficiency. The downside is that the query becomes more complex. But we can make it simpler.

Let’s note, that when acknowledging an alert, we only update 3 columns: 

  • acked: 0 => 1

  • ack_time: 0 => now()

  • ack_user: '' => 'user1'

In all 3 cases, the column value increases! So instead of the somewhat bulky ‘argMax’ we can use ‘max.’ Since we do not change ‘alert_data,’ we do not need any actual aggregation on this column. ClickHouse has a nice ‘any’ aggregate function for this purpose. It picks any value without extra overhead:

Java
 




xxxxxxxxxx
1
14


1
SELECT count(), sum(cityHash64(*)) data FROM (
2
  SELECT tenant_id, alert_id, timestamp
3
    any(alert_data) alert_data
4
    max(acked) acked
5
    max(ack_time) ack_time,
6
    max(ack_user) ack_user
7
  FROM alerts
8
  GROUP BY tenant_id, alert_id, timestamp
9
10
WHERE tenant_id=451 AND NOT acked;
11
 
           
12
┌─count()─┬─────────────────data─┐
13
      90  18441617166277032220 
14
└─────────┴──────────────────────┘
15
 
           
16
1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)


The query becomes simple and it is slightly faster! The reason is that with the ‘any’ function, ClickHouse does not need to calculate ‘max’ on the ‘alert_data’ column! 

AggregatingMergeTree

AggregatingMergeTree is one of the most powerful ClickHouse features. When coupled with materialized views, it enables real-time data aggregation. Since we used aggregate functions in the previous approach, can we make it even better with AggregatingMergeTree? Actually, it’s not much of an improvement. 

We only update a row once, so there are only two rows to aggregate for a group. For this scenario, AggregatingMergeTree is not the best option. We can do a trick, however. We know that alerts are always inserted as non-acknowledged first, and then become acknowledged. Once a user acknowledges the alert, only 3 columns need to be modified. Can we save a disk space and improve performance if we do not duplicate data for the other columns?

Let’s create a table that implements the aggregation using the ‘max’ aggregate function. Instead of ‘max’, we could also use ‘any’, but that would require columns to be Nullable -- ‘any’ would pick a not-null value. 

Java
 




xxxxxxxxxx
1
12


 
1
DROP TABLE alerts_amt_max;
2
CREATE TABLE alerts_amt_max (
3
  tenant_id     UInt32,
4
  alert_id      String,
5
  timestamp     DateTime Codec(Delta, LZ4),
6
  alert_data    SimpleAggregateFunction(max, String),
7
  acked         SimpleAggregateFunction(max, UInt8),
8
  ack_time      SimpleAggregateFunction(max, DateTime),
9
  ack_user      SimpleAggregateFunction(max, LowCardinality(String))
10
)
11
Engine = AggregatingMergeTree()
12
ORDER BY (tenant_id, timestamp, alert_id);



Since the original data was random, we will populate the new table using existing data from ‘alerts’. We will do it in two inserts, like before, one for non-acknowledged alerts and another one for acknowledged:

Java
 




xxxxxxxxxx
1


 
1
INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
2
INSERT INTO alerts_amt_max 
3
SELECT tenant_id, alert_id, timestamp,
4
  '' as alert_data
5
  acked, ack_time, ack_user 
6
FROM alerts WHERE acked;



Note that we insert an empty string instead of ‘alert_data’ for acknowledged events. We know that data does not change, and we can store it only once! The aggregate function will fill the gap. In the real application, we can just skip all columns that do not change and let them get default values.

Once we have the data, let’s check the data sizes first:

Java
 




xxxxxxxxxx
1
13


1
SELECT 
2
    table
3
    sum(rows) AS r
4
    sum(data_compressed_bytes) AS c
5
    sum(data_uncompressed_bytes) AS uc
6
    uc / c AS ratio
7
FROM system.parts
8
WHERE active AND (database = 'last_state')
9
GROUP BY table
10
 
           
11
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
12
 alerts          19039439  20926009562  21049307710  1.0058921003373666 
13
 alerts_amt_max  19039439  10723636061  10902048178  1.0166372782501314 
14
└────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘



Well, we have almost no compression, thanks to random strings. But aggregate is two times smaller, since we do not have to store ‘alerts_data’ twice. 

Now let’s try the query over the aggregate table:

Java
 




xxxxxxxxxx
1
14


1
SELECT count(), sum(cityHash64(*)) data FROM (
2
   SELECT tenant_id, alert_id, timestamp
3
          max(alert_data) alert_data
4
          max(acked) acked
5
          max(ack_time) ack_time,
6
          max(ack_user) ack_user
7
     FROM alerts_amt_max
8
   GROUP BY tenant_id, alert_id, timestamp
9
10
WHERE tenant_id=451 AND NOT acked;
11
 
           
12
┌─count()─┬─────────────────data─┐
13
      90  18441617166277032220 
14
└─────────┴──────────────────────┘
15
 
           
16
1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)



Thanks to AggregatingMergeTree, we process less data (40MB vs 82MB before) and it is now even more efficient. 

Materializing The Update

ClickHouse will do its best to merge data in the background, removing duplicate rows and performing aggregation. Sometimes, however, it makes sense to force the merge, in order to release disk space, for example. This can be done with the OPTIMIZE FINAL statement. OPTIMIZE is a blocking and expensive operation, therefore it cannot be performed too often. Let’s see if it makes any difference for the query performance.

Java
 




xxxxxxxxxx
1


 
1
OPTIMIZE TABLE alerts FINAL
2
Ok.
3
0 rows in set. Elapsed: 105.675 sec.
4
OPTIMIZE TABLE alerts_amt_max FINAL
5
Ok.
6
0 rows in set. Elapsed: 70.121 sec.



After OPTIMIZE FINAL, both tables have the same number of rows and identical data.

Java
 




xxxxxxxxxx
1


 
1
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
2
 alerts          10000000  10616223201  10859490300  1.02291465565429 
3
 alerts_amt_max  10000000  10616223201  10859490300  1.02291465565429 
4
└────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘



The difference in performance between different approaches becomes less notable. Here is the summary table:


After inserts After OPTIMIZE FINAL
ReplacingMergeTree FINAL 0.278 0.037
argMax 0.059 0.034
any/max 0.055 0.029
AggregatingMergeTree 0.036 0.026

Conclusion

ClickHouse provides a rich toolset to handle real-time updates such as ReplacingMergeTree, CollapsingMergeTree (not reviewed here), AggregatingMergeTree and aggregate functions. All those approaches have three common properties:

  • Data is “modified” by inserting the new version. Inserts in ClickHouse are extremely fast.

  • There are efficient ways to emulate update semantics similar to those of OLTP databases

  • However, the actual modification does not happen immediately.

The choice of the particular approach depends on the application use case. ReplacingMergeTree is straightforward and the most convenient for the user but may only be used for small to medium-sized tables or if the data is always queried by the primary key. The use of aggregate functions gives more flexibility and performance but requires quite a lot of query rewrite. And finally, AggregatingMergeTree allows storage saving, keeping only modified columns. These are good tools to have in the arsenal of ClickHouse DB designer and apply when needed.

Topics:
clickhouse ,columnar database ,database ,time series ,tutorial

Published at DZone with permission of Alexander Zaitsev . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}