distributed query execution plan
May 14, 2023

Distributed Query Execution in Firebolt

No items found.

Modern analytics workloads have become an integral part of many businesses, enabling them to gather insights from their data, and make better decisions. However, managing these workloads can be challenging, as transforming and serving analytics data requires very different characteristics. Transforming involves sifting through vast amounts of data, while serving requires an engine capable of high-concurrency and low-latency.

To address these requirements, data engineering teams often rely on separate systems - a general-purpose data warehouse for transforming and an accelerator for serving. For example, let's consider a social media analytics application that needs to process vast amounts of data, extract insights, and serve real-time analytics dashboards to users. Such systems typically leverage a general-purpose data warehouse like a Snowflake or Teradata for large scale data processing and use an accelerator like Druid, Pinot or ClickHouse for serving data. Two purpose-built platforms are not bad, but this leads to the need for specialized skills, fragmented data silos, redundant data and data flows, and skyrocketing costs. What if you could unify the capabilities offered by these two different systems?

At Firebolt, we aim to bridge the gap and address low-latency, high-concurrency data-intensive applications with the ease of use of general-purpose data warehouses. The system we have built at Firebolt handles point lookups in milliseconds but also scales to address complex, distributed queries. In this blog, we focus on distributed query execution as an integral part of Firebolt.

The Problem

In a distributed SQL database, the query engine needs to figure out a way to break the user-sent query down into small chunks of work and schedule them across the cluster. Let's take a hypothetical engine with multiple compute nodes. When a transformation job is run, a query orchestrator of some sort sends subsets of the query plan to nodes on the cluster. An example would be an aggregation on a large fact table: the orchestrator schedules pre-aggregations of the data stored locally on each node.The intermediate results of the pre-aggregation are sent to a primary node designated by the orchestrator. These partial results are then merged to calculate the final result. 

Query plans like this are common in accelerators. When the intermediate aggregate state is small, a query like this scales out nicely. Another example is the execution of distributed joins,  where the query plan might end up broadcasting one of the tables to all nodes, do a local join and then merge everything at the primary in the end. Pretty straightforward so far, right?

The challenge with these examples comes down to scaling. We are looking at a primary node that has the potential to become the bottleneck in this whole process. If you deal with high-cardinality aggregations, most of the time will be spent merging aggregate state on the primary. The query engine is not using the cluster’s full compute resources to efficiently process the query. As the complexity of the query increases this problem only becomes more acute. You either pay more for your query, or go out of memory on the primary, causing the query to fail. 

The Solution - Shuffle Operator

To handle complex query shapes and large intermediate result sets, modern cloud data warehouses implement a so-called shuffle operator. The shuffle operator is used to redistribute data between stages based on a set of partitioning keys, enabling parallel processing across multiple nodes. When breaking a user query into different stages, the engine can strategically insert the shuffle operators to scale out effectively. Multi-stage query execution with a shuffle operator typically involves the following steps:

  1. Accessing Base Tables: At the beginning of execution, the partitioned base tables are scanned. This stage typically involves filtering, transformation, or pre-aggregation operations that are applied to each partition of data independently. This is similar to the initial stage in the previous example.
    Shuffle: The output of the first stage is fed to a shuffle operator which redistributes the data. For doing a distributed equi-join, the engine can hash-partition the data on the join keys. For a distributed aggregation, the engine can hash-partition the data on the group-by keys. Different partitions can then be sent to different nodes for distributed processing in the next stage.
  2. Distributed Operators: The second stage of the query reads from the previous shuffle operator. This means that each node is bound to one of the generated output partitions. This stage typically involves join, aggregation, or window operations. Since the shuffle operator makes sure that the same keys end up in the same partition, the merging of the aggregate state can happen in a fully distributed way.

    Shuffle: If there are additional stages, the shuffle operator is used again to redistribute the data between stages. This process is repeated until all stages are complete. This composability allows running queries with many complex joins and high-cardinality aggregations.
  3. Final output: The output of the final stage is collected on the primary and combined to produce the final query result. As the result of the query can be streamed back to the user, this isn’t a scalability bottleneck.

Let’s take a look at a simple example with a join and aggregation.

Assuming that A and B are very large fact tables, the join needs to happen in a distributed fashion. As a result, we need to make sure that the keys of the join are aligned on the different nodes in the cluster. Before both sides of the join, we insert a shuffle which is hash partitioned on the join keys. Here it’s important that the partitioning functions are the same for both sides of the join, ensuring that the tuples from one relation find their corresponding join partners from the other relation when performing the join in a fully distributed way. Within each partition produced by the shuffle operators, the join can then happen locally on one of the nodes in the cluster.

Finally, the result of the join for each partition is fed into a partial aggregation that computes the local sums for each value of the aggregation key. This aggregate state is then fed into the next shuffle operator. The purpose of the pre-aggregation is to reduce the amount of data being sent across the network by combining all local occurrences of the aggregate keys’ values. In total, this approach ensures that multiple stages process data efficiently while leveraging all the parallelism available in the cluster.

However, shuffle operators don’t magically solve all problems. They can impose additional network traffic due to the need to move data between nodes, and repartitioning after each stage can consume additional resources. From an implementation and execution perspective, shuffle needs to be handled in a smart and efficient manner. 

First and foremost, we try to minimize the number of shuffles we insert while still allowing for high scalability. If a shuffle can be avoided by, for example, broadcasting a small intermediate result set of a dimension table, the Firebolt query optimizer does this in a transparent way.

If we do have to shuffle data, the implementation of our shuffle operator is highly optimized for low-latency analytics. We send data directly over the network to the consumers, overlapping computation as much as possible. This means that while the nodes are still scanning from base tables, they are already shuffling data and kicking off work for the next stage, by e.g. building a hash table for a join. This approach reduces latency with the trade-off being query resiliency.

Firebolt also uses a highly efficient indexed and compressed file format to store data. Sparse indexes are used to enable fast, direct access to granular ranges of data without performing full table scans. In the previous example, if there are additional filters in the form of a ‘WHERE’ clause, Firebolt transparently narrows down the scanned data range using these indexes. Similarly, the use of custom aggregating indexes can reduce the amount of data that needs to be scanned and aggregated.

Combining our efficient file format, indexes and distributed execution layer yields a query engine that bridges the gap between custom accelerators and more traditional cloud data warehouses. By aggressively pruning data and using indexes, Firebolt minimizes the computation required for every query. This allows for high concurrency and low latency. When the engine does need to handle complex, data-intensive queries, Firebolt scales out seamlessly to use the resources of the entire cluster. 

User benefits

All of these optimizations happen seamlessly behind the scenes. Users never have to reason about shuffle or multi-stage execution. As a user of Firebolt, this means if  data volume doubles, you have the option of doubling the number of nodes to keep execution time about the same. This enables linear scalability as data volume grows. 

Our shuffle-based architecture also improves the utilization of compute resources. A user running the transformation of a large fact table was able to reduce processing time from 1.5 hours on 60 nodes with Presto down to 26 minutes on 40 nodes with Firebolt. Distributed query processing in Firebolt reduces resource contention and hence improves overall performance.

Get started with Firebolt

Free credits. Free support. No credit card needed.

Read all the posts

Intrigued? Want to read some more?