Deep Dive: Window Functions in Couchbase Analytics
Deep Dive: Window Functions in Couchbase Analytics
Let's take a look at Window Functions in Couchbase Analytics.
Join the DZone community and get the full member experience.Join For Free
Couchbase Server 6.5 Beta brings a host of new features to the leading NoSQL database. One of the key additions to the N1QL query language is support for window functions. These functions were originally introduced in the SQL:2003 standard and provide a performant way of answering many complex business queries. Window functions were previously discussed in this series of posts , , . In this installment, we’ll dive deep into their implementations in Couchbase Analytics.
You may also like: Cost-Based Optimizer for Couchbase N1QL (SQL for JSON)
The Couchbase Analytics service  is designed to handle complex ad-hoc queries in the Couchbase data platform. Its key component is the MPP query engine that runs on a separate set of nodes in the cluster to guarantee workload isolation for the operational data nodes. Data is ingested into Analytics using the DCP change protocol  and is hash-partitioned among all available Analytics nodes. The MPP query processor divides a single query into subtasks and schedules those to run in parallel on all nodes, repartitioning data if necessary.
Window functions are also evaluated in a distributed, partition-parallel fashion by the Analytics query engine. The query compiler creates an execution plan that contains several operators working together to compute the result of the window function call. This execution plan is then sent to all Analytics nodes in the cluster where each operator works on a partition of the input data. The execution engine coordinates operator execution and delivers the query result to the client. For example, consider the following query, which ranks employees in each department by their salaries.
/* Couchbase N1QL query */ SELECT RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) AS rank, employee_id, department_id, salary FROM employee
The query processor evaluates this function in three steps as illustrated by Figure 2.
- After the data is selected from the employee dataset it is repartitioned according to the PARTITION BY subclause of the OVER clause. The initial data layout might have each of the department records scattered across different storage partitions on several Analytics nodes. After the repartitioning step, all employee records from a single department arrive in the same computation partition. The repartitioning step is executed in parallel on all nodes/partitions in the cluster. In the most common Analytics configuration, there is a one to one relationship between the number of data partitions and the number of CPU cores available in the cluster.
- Records within each department are sorted according to the ORDER BY subclause of the OVER clause. Once each department’s records have arrived at their corresponding computation partitions, the query processor starts sorting the data. This sorting step is also performed in parallel on all Analytics nodes.
- The RANK() function is then computed on sorted records within each department. This particular function only needs to look at the current record and compare it with the previous one, so it can be evaluated in a streaming fashion without requiring any additional data materialization.
Executing these steps in parallel across all available nodes enables Analytics to utilize all computational resources of the cluster. This allows Analytics to achieve linear scalability as more nodes are added to meet the required performance targets.
Let’s see how the above stages can be identified within a query execution plan. The Analytics explain plan feature was described in a previous post , so here we only focus on the plan fragment related to the window function evaluation. (Recall that Analytics query plans are to be read bottom-up.)
Data is read from the employee dataset by the “data-scan” operator and is passed to the “exchange” operator which is responsible for data repartitioning. The repartitioning field is “department_id” as requested by the PARTITION BY subclause. The “order” operator then sorts the data according to the ORDER BY subclause. Finally, the “window-aggregate” operator computes the RANK() function.
Notice how the “physical-operator” value for this operator is set to “WINDOW_STREAM”, which means that the operator works in a streaming manner and does not require any additional data materialization. The “execution-mode” field is set to “PARTITIONED” for all operators, so they will all be executed on all available computation partitions in the cluster.
Evaluation of some window functions might require information pertaining to a whole logical partition (its total number of tuples for NTILE() and PERCENTILE_RANK() functions, for example) or multiple iterations over the whole partition (when computing window frames for aggregate functions). Such functions are processed by non-streaming window operators.
A non-streaming window operator is identified by “physical-operator” value of “WINDOW” in the query execution plan. The operator materializes one logical partition at a time, then starts the window function computation for each tuple in that partition. In order to handle arbitrary amounts of incoming data, the operator follows the memory management model of the Analytics execution engine. The query planner assigns a memory budget to each operator. This budget cannot be exceeded during query execution.
Operational data beyond the budget is spilled to disk by each operator and read back later when memory becomes available. A query usually consists of multiple operators and therefore has a global memory budget that cannot be exceeded at runtime. The Analytics query processor implements resource-based load control for incoming queries, only admitting those that can be executed within the available memory across all nodes.
N1QL for Analytics also places fewer restrictions on the syntactic context of window function calls. Unlike SQL, queries in N1QL for Analytics permit window functions in WHERE and HAVING clauses as well as in N1QL-specific LET clauses.
Our original query, for example, can be easily modified to return only the top-ranked employee in each department:
/* Couchbase N1QL query */ SELECT employee_id, department_id, salary FROM employee WHERE RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) = 1
To conclude, window functions in Analytics provide a powerful mechanism for parallel data analysis and reporting. The Couchbase N1QL query language allows users to easily evaluate those functions directly on their application’s JSON data, thereby avoiding complex ETL processing.
 Announcing Couchbase Server 6.5 Beta – What’s New and Improved
 On Par with Window Functions
 Get a Bigger Picture with N1QL Window Functions and CTE
 Window functions in Couchbase Analytics
 Announcing Couchbase Server 6.0 with Analytics
 Couchbase’s History of Everything: DCP
 Murtadha Al Hubail, Ali Alsuliman, Michael Blow, Michael Carey, Dmitry Lychagin, Ian Maxon, and Till Westmann. Couchbase Analytics: NoETL for Scalable NoSQL Data Analysis. PVLDB, 12(12): 2275-2286, 2019
 Couchbase Analytics: Under the Hood – Connect Silicon Valley 2018
 Analytics Explain Plan – Part 1
Published at DZone with permission of Dmitry Lychagin . See the original article here.
Opinions expressed by DZone contributors are their own.