(If you’re concerned, I handwrite all my articles, none of this post was written by an LLM. I value your time!)
tldr; built my own distributed query engine; this article is me yapping about the non-technical ideas and motivations that drove me to complete this project, and the way I went about doing it. (What is a query engine?)
there's also a technical architecture section, where I'll go through a very high level walkthrough on how our engine works, and a quick crash course on all the concepts I had to dive into through the course of this project.
-
However, if you're looking for a full proper step by step guide to build what I built, I've documented that here.
-
I'd also recommend actually reading the book “How Query Engines Work”
-
You can view all the code, and see the full set of features of the engine in the Github repository.
Intro
After spending my fall quarter of sophomore year in the terrible rite of passage of recruiting, I decided to treat myself this quarter by spending time doing what I love the most – building. While I shipped many smaller projects that I’m extremely proud of, I also wanted to work on something large-scale that challenged me.
I eventually settled on adapting the Kotlin-based query engine taught in the book “How Query Engines Work” into Rust, while also adding on my own distributed execution & observability layer, as each of these were things I’ve always wanted to dive deep into, but never had the chance to.
The Approach
A lot of this project was motivated by the fact that this year felt like the year that coding agents got really good, to the extent where I spent a lot of time ruminating on what the world of software would look like after I graduated, and the type of engineer I wanted to be. The most straightforward conclusion I came to was that engineers would be expected to ship much more ambitious projects now, and I figured this would be a good time to experiment with “pair-programming” with my agent. Specifically, I wanted to observe how A.I. affects my ability to code, think and learn when given an unfamiliar, ambitious goal.
Here's how I thought about things:
- I wanted to read every single line of code the LLM generated. Even with the speed & reliability of the latest models, I believe that it was important for an engineer to be responsible for every line of code committed to a repository. So I did that, except for reading the tests and a couple markdown files here and there (sorry!)
- This was an extremely important rule to follow. On the days where I was pressured for time and skimmed the files, I always ended up not really understanding what I had written, and I'm sure some lapses are still present in the repository. There is a direct correlation between how hard your brain works and how much you learn, and I (thankfully) made an internal promise to myself that when it comes to learning, slow is fast, and fast is slow.
- Learning always involves copying & building a shitty version first. I got my start in web-development by copying code written by Colt Steele line by line on his Udemy course, which explains why I felt comfortable reading the code written by an LLM first instead of writing it myself.
- Caveat: This worked out for me 80% of the time. In 20% of cases, I knew from basic code smell that something was off, and I realized with LLM generated-development you do lose that “assurance” of correctness that learning from an expert instructor would have had. I’ve had to refactor / re-write code a few times due to this, but overall this LLM-driven approach still worked for me.
- Another Caveat: I didn’t go into this completely blind, I had completed a good amount of rustlings before diving into this project, and I had a rough idea of what query engines & distributed systems were like, just not to the extent I was satisfied with.
- Lastly (and lowkey most importantly), I documented everything I learnt from the LLM. Every concept or syntax or pattern I didn’t understand was added to a markdown document for me to review later, and I felt like this really concretized the growth I’ve had throughout this project.
On benchmarks and performance
If I were to be completely honest, I was a little disappointed by my benchmarks. I ran two different benchmarks on 15 million rows from the TPC-H dataset, on c6i.xlarge EC2 instances. On local execution, we lost to pandas for one benchmark but won on the other. On distributed execution, we got absolutely demolished (I suspect the networking overhead coordinating all the workers had a greater effect on performance and we would likely need a much bigger dataset to see any performance gains from distributed execution). It genuinely surprised me, though my expectations were clearly not based on any legitimate experience with building or working with query engines.
That said, I had no intention to build a performant query engine. The book “How Query Engines Work” was meant to be a beginners guide to query engines and not a production level one. This is in line with my belief that learning always starts with building one “shitty rep” of something. I also did not read the benchmark section of the book properly HAHA.
The main priority was to learn, and I was intentional about it. I wanted to learn a little bit of everything, rather than everything of a little thing. The nitty gritty details of a query engine was less of a priority to me than the big picture flow of how it worked and how distributed execution could be added to it. To that end, I de-scoped many optimizations I could have made and made sure I spent my time focused on the areas of the project I really cared about. Which explains the performance. LOL
Some query optimizations I descoped but could've added:
- Predicate push-down
- Cost-based optimization (including join ordering, physical-operator choice by cost)
- Constant folding
- Dead column elimination
- Limit push-down
- Partition pruning
- Join reordering
- Advanced join optimizations (spill, Bloom filters, etc.)
- Distributed optimizations (shuffle minimization, broadcast-join selection, stats-based planning)
Okay! Now onto the fun stuff.
Technical Architecture
Understanding the query engine
Let's start from the top. To understand how the query engine works, we first look at what its meant to do, and how a user can interact with it.
We first provide the user a high level dataframe API that lets them construct their own queries. This looks something like
// 1) Data source
let df = ctx.parquet(orders_path)?; // local file
// 2) Operators (transformations)
let result_df = df
.filter(
col("o_orderdate")
.gt_eq(lit("1995-01-01"))
.and(col("o_orderdate").lt_eq(lit("1995-12-31")))
.and(col("o_orderstatus").eq(lit("F")))
.and(col("o_totalprice").gt(lit(200000.0_f64))),
)?
.project(vec![
col("o_orderkey").into(),
col("o_custkey").into(),
col("o_orderdate").into(),
col("o_totalprice").into(),
col("o_orderstatus").into(),
])?;
// 3) Execute
let _batches = result_df.collect_async().await?;
Given more time, I would have probably added another layer of abstraction so users can use a higher-level language like Python to construct their queries, but the idea is there!
Looking at this code example, we can see a few things going on.
- We are getting data from our data source
- We are performing certain operations on our data
- We execute these operations asynchronously and collect the results
We can use this structure to guide our implementation. Our query engine consists of layers:
- Data Source Layer: To let users connect a relevant external data source. This can be a CSV / Parquet File, fetched from a local filesystem, or an S3 instance in the cloud.
- Logical Plan: Based on the user's queries, logical plans describe what computation we want to perform on our data source
- Query Optimizer: While it is possible to execute queries exactly in the order the user laid out for us, we tend to get performance gains by separating what the user wants, from how we plan to do it. An easy example to understand is that of the projection push down: if we can observe from the logical plan that a user is going to do a projection (i.e. only select a few) of the columns at any point in the plan, it would make sense to just not read these columns when we scan our data source, so we decide to use our "new", optimized logical plan instead.
- Physical Plan: A physical plan turns this logical plan into executable operators and physical expressions. This is how we separate "what the user wants" and "what we're actually going to do"
- Execution Layer: Finally, we have the execution layer! This layer takes your physical plan, actually evaluates them on your input source data, and returns your results
Building the query engine
Now that we understand what we want to build, we can look into how we want to build it. Here are some concepts to keep in mind:
Traits
The entire codebase of the query engine is structured around Traits. This should intuitively make sense, the way our layers interface / interact with each other should always follow a strict contract, but each unique entity of each layer might still have its own way of doing things while following this contract.
Our data follows a sequence of stages: data frame → logical plan → optimized logical plan → physical plan → async execution → get results, and traits are the typed seams between those stages.
Parallelism with Rayon
A big part of making a query engine run fast is having some sort of data-parallelism model. A good way to think about this is: can my data be partitioned? If the rows of my dataset aren't at all related to each other, won't it be faster if I run the computation of each set of rows in parallel, then combine the individual results at the end?
For this project, I didn't spend too much time worrying about the data parallelism model. For local parquet data sources, I simply split the datasets by row group and then used Rayon for parallelizing scans of my data source.
Multi-threading with Tokio
In Rust's Tokio runtime model, our default runtime worker threads are meant to juggle many lightweight tasks that yield often. To avoid occupying / tying up these threads, CPU-heavy tasks are offloaded to a separate bounded, blocking thread pool (spawn_blocking), allowing the runtime threads to schedule other tasks, since they only need to await the result of the offloaded tasks.
Why go distributed?
For the scope of this project, I lowkey just wanted to do it so I could get my hands dirty with some distributed systems! Like I mentioned in the benchmarks section, the overhead of distributed execution more than offset any potential performance gains I got.
When should you / should you not go distributed? This section from "How Query Engines Work" covers it very well:
-
Dataset size: If your data fits comfortably on one machine, parallel execution on that machine will almost always be faster than distributing across a cluster. Network transfer is orders of magnitude slower than memory access. The break-even point depends on your hardware, but datasets under a few hundred gigabytes rarely benefit from distribution.
-
Compute requirements: Some queries are compute-intensive enough that a single machine cannot process them fast enough. Machine learning training, complex simulations, or queries with expensive user-defined functions may need more CPU cores than any single machine provides.
-
Storage location: If data already lives in a distributed file system like HDFS or an object store like S3, it may be more efficient to move computation to where the data lives rather than pulling all data to a single machine.
-
Fault tolerance: For long-running queries (hours or days), the probability of a single machine failing becomes significant. Distributed execution can checkpoint progress and recover from failures, while a single-machine query would have to restart from scratch.
-
For typical analytical queries on datasets under a terabyte, a single well-configured machine with parallel execution often outperforms a distributed cluster
Understanding distributed execution
Our distributed execution layer consists of three key entities: the Client, the Scheduler, and the Workers. These entities expose services that enable them to communicate each other to make everything work, but they also have their own internal services for better separation of concerns.
The big picture
- Clients send logical plans to the scheduler, and expect to receive a result stream back.
- Schedulers accept plans from the client, break plans down into tasks, which they then assign to workers. Finally, they proxy the stream they receive from the workers back to the client.
- Workers register themselves with the scheduler to indicate their availability, and then execute the tasks assigned to them by the scheduler.
The scheduler
The scheduler is where things get relatively complex. It exposes two main services:
- One service to handle the plans sent to them by the clients, let's call this the client management service
- One service to manage workers, let's call this the worker management service
These are not the real names I used in the codebase, but I think these names work better for our purposes
The client management service
Before we can explain the client management service, we need to define some terms:
- Plan: A plan / logical plan is what the scheduler receives
- Stage: A service called the stage planner splits the logical plans into stages at exchange boundaries. In this engine, new stages are introduced at joins: each side of the join becomes its own leaf stage, and the join itself is a separate downstream stage
- Exchange boundaries: They occur in a particular type of plan that involves some exchange of data. In our codebase, the Join operator requires an exchange boundary because a join needs both inputs fully shuffled (or otherwise partitioned) by the join key before rows can be matched.
- Task: A task is a concrete unit of work sent to a worker. With the exception of Joins which remain as a singular task, a stage is split further into tasks to exploit data parallelism in our engine. Tasks are what is executed in workers. In our engine, for parquet files with multiple row groups, we emit one task per row group to enable intra-file parallelism
When the scheduler receives a plan, it runs a stage planner that breaks the plan down either into a singular stage (if there's no need for an exchange boundary), or multiple stages (if there's an exchange boundary required).
-
In the event of a single stage plan, we have a task planner that splits this stage into multiple row-group sub tasks. We then have an assigner that picks workers from the worker registry via round robin, and the scheduler executes each task on a worker, before merging the per-task streams back into one client-facing stream
-
In the event of a multi-stage plan,
-
we have a task planner that splits the stages BEFORE the exchange boundary into multiple row-group sub tasks similar to above.
-
We have a stage coordinator that helps facilitate a distributed shuffle (more on that later!).
-
The scheduler barriers on that phase, waiting until upstream work completes, before submitting the downstream stage, and streaming the final result back to the client. After the barrier, this is similar to our single stage plan
-
The worker management service
The worker management service keeps track of all available workers we could possibly assign tasks to. It exposes a service that lets workers register themselves. Workers periodically call this service as a form of "heartbeat" that lets the scheduler know the worker is still alive and available to receive tasks.
At the same time, we also spawn a Reaper in the background thread, whose main role is to reap dead workers at a periodic interval. TTL defaults to 30s - 3 missed heartbeats before a worker is reaped.
Building distributed execution
Here are some key concepts that form the backbone of our distributed execution layer
Serialization, gRPC, and the Arrow IPC Protocol
There are two types of data that we need to send across the network:
- Control-plane messages: plans, task assignments, shuffle metadata
- Data-plane payloads: our actual rows of data
We split these responsibilities across two protocols on purpose. gRPC carries small, structured control messages encoded with protobuf, while Arrow Flight streams results using the Arrow IPC protocol.
gRPC
For distributed systems, we use gRPC for our networking calls because we want reliable, structured, strongly-typed RPC calls between services. To keep the overhead low, we want our payload to be small. This is done by defining an explicit contract with gRPC + protobuf, and then generating types on both ends (as compared to sending an entire JSON payload over the network)
-
First, we define our message types and RPC methods in a .proto file. This is the source of truth for what each service can send and receive. In our case, this includes messages like TaskAssignmentRequest, TaskResultResponse, WorkerRegistrationRequest, and services like SchedulerService / WorkerService.
-
Next, code is generated from that .proto file for both sides. On the client side, this gives us ready-made typed functions (stubs), so we call methods like execute_task(...) instead of manually building HTTP requests. On the server side, the generated code defines exactly which RPC methods must exist (the service contract), and we implement those handlers with our application logic.
-
At runtime, protobuf handles serialization/deserialization automatically. Before crossing the network, typed request structs are serialized into compact protobuf bytes; on the receiving side they are deserialized back into typed structs. This gives us schema safety, lower payload size
Arrow Flight
I realise this is the first time in the article I'm mentioning Arrow, tldr Arrow is what all our datatypes are built on! It's basically the industry-standard columnar memory format that allows for zero-copy data exchange and SIMD-optimized query processing
Arrow IPC (Inter-Process Communication) is Arrow’s binary format for sending Arrow data between processes or machines while preserving Arrow’s columnar structure. We like using it because it has an efficient binary format and reduces serialization/deserialiazation overhead compared to generic formats
If Arrow IPC is the payload format, Arrow Flight determines the protocol for requesting and streaming those payloads. Built on top of gRPC, its a high performance RPC protocol for transport Arrow datastreams. You can think of it as using gRPC as the transport foundation, but specialized for data exchange.
The distributed shuffle
(haven't written this part yet, sorry! truth be told, i dont even know if anyone is going to read this article LOL. i'm currently on holiday rn and lowkey im just gonna write this when i come back im too tired to keep working on Spring break. if you see this please lmk HAHHAHAHA)
Exponential backoff & retries
To avoid unpredictable network-related errors from messing everything up, we implemented a retry wrapper that:
- Only retries on errors that are transient such as
tonicstatus codes "Unavailable"/"DeadlineExceeded" - Backs off exponentially so we don't create a retry storm
- Add jitter so retries don't all happen at the same time and cause a thundering herd
Observability
If I'm being honest, I was more interested in learning how we could implement observability into our engine, more than what we should be observing. For that reason, I'll focus more on the implementation details than the specific logs and metrics I collected.
There are two key components to our observability layer:
- Traces answer the question of "What happened?". A trace is the complete record of one request moving through our (distributed) system
- Metrics answer the question of "How is our system behaving?". We track three different types of metrics:
- A counter: monotonically increasing number (e.g. How many tasks have we completed?)
- A gauge: number that can go up or down (e.g. How many workers are currently alive?)
- A histogram: distribution of a number over time (e.g. How is our system performing over time?)
Let's dive deeper into each of them
Distributed Tracing
First, some quick definitions
- Trace: The full journey of one request, end-to-end
- Span: One unit of work within that trace
It is helpful to think of traces in terms of layers
Layer 1: The Rust tracing crate
You can think of the tracing crate in Rust like Python's logging module. The crate provides you with a macro that can automatically create a span wrapping an async function, as well as the ability to emit a structured log event inside this current span. These spans exist at a function level.
Layer 2: The tracing-subscriber
The tracing subscriber is a subscriber registry that allows you to subscribe to spans from the tracing crate. This operates at an engine level, allowing you to decide what to do with the spans collected throughout the engine
Layer 3: OpenTelemetry (OTEL)
OpenTelemetry is a vendor-neutral standard for observability. It defines a data model for traces, metrics and logs, as well as a wire protocol for exporting that data (OTLP). This gives you the ability to instrument once, but swap higher level layers easily.
We attach an OTEL layer to our tracing-subscriber to standardize our traces to the OTEL standard.
Layer 4: Jaeger
Jaeger is the trace storage and visualization backend. It accepts traces across our distributed workers, stores them, and provides a UI to search and analyze traces. Jaeger is just one option, and OTEL lets you swap to other providers without changing your code.
For Jaeger to show one unified trace, the Worker must know that its spans are children of the Scheduler's spans so that they can create a child span, even though everything runs in a different process. We solve this by injecting context into gRPC metadata headers, bringing us to the next layer.
Layer 5: W3C TraceContext
W3C TraceContext is a web standard (RFC) that defines exactly how trace context is encoded in HTTP/gRPC headers. This is known as a traceparent header.
TraceContextPropagator is the OTEL implementation of the W3C standard. It knows how to:
- Inject: take the current active span's TraceID/SpanID and write them as a
traceparentheader - Extract: read
traceparentfrom incoming headers and restore the parent context so the Worker's spans become children
Scheduler Worker
─────────────────────────────────────────────────────────
[Span: execute_stages]
TraceID: abc-123
SpanID: 456
│
│ inject_trace_context()
│ → "traceparent: 00-abc123-456-01"
│ → added to gRPC metadata
│
└──── gRPC ExecuteTask ────────────►
extract_trace_context()
← reads "traceparent" header
← restores parent context
[Span: execute_task]
TraceID: abc-123 ← same!
SpanID: 789 ← new
ParentSpanID: 456 ← Scheduler's span
In an ideal world (not mine because I haven't linked up my workers to an OTLP exporter yet), Jaeger receives both spans, sees the TraceID matches, and draws them as parent-child in one unified flame graph.
Putting it together,the final picture becomes
┌─────────────────────────────────────────────────────────┐
│ YOUR CODE │
│ #[instrument], info!(), Span::current().record(...) │
└────────────────────────┬────────────────────────────────┘
│ emits spans/events
┌────────────────────────▼────────────────────────────────┐
│ tracing CRATE │
│ API layer — defines how to write instrumented code │
└────────────────────────┬────────────────────────────────┘
│ dispatches to subscribers
┌────────────────────────▼────────────────────────────────┐
│ tracing-subscriber REGISTRY │
│ ├── fmt::layer() → JSON logs to stdout │
│ └── OpenTelemetryLayer → converts to OTEL spans │
└────────────────────────┬────────────────────────────────┘
│ OTEL spans
┌────────────────────────▼────────────────────────────────┐
│ opentelemetry SDK │
│ Data model: Trace, Span, Attributes, Events │
│ Propagation: TraceContextPropagator (W3C standard) │
└────────────────────────┬────────────────────────────────┘
│ OTLP (gRPC, port 4317)
┌────────────────────────▼────────────────────────────────┐
│ Jaeger │
│ Receives, stores, visualizes traces │
│ UI at localhost:16686 │
└─────────────────────────────────────────────────────────┘
Metrics
We collect & update our metrics at the Rust source code level. We create a store that lives in memory when our scheduler starts up, and we pass pointers to this metrics store to the various components of our scheduler that needs to update metrics.
Because we are only collecting metrics at the scheduler, there is lower complexity than distributed tracing. When we display these metrics through our UI, every call reads the same, live, shared state that the rest of the scheduler is updating.
Wrapping it up
And that should be it! Three and a half months of building condensed into (hopefully) an enjoyable read. I've hoped you learnt something (I certainly have), and if you'd like to dive deeper, feel free to reach out to me if you'd like to chat about the project. Alternatively, I'd love for you to explore the Github repository in your own time. There's still a ton more stuff I want to add to this engine so maybe I'll update this article from time to time.
Not to short-change myself, but most of my programming experience has come from building full-stack web applications, and it was fun building something so different for a change of pace.
I think I initially had a lot of fear going public with this project out of fear that my "beginner-ness" would be subject to some level of scrutiny and judgement, but honestly, I'm proud of what I accomplished!
I learnt a new programming language, shipped an ambitious project, and learnt tons throughout this whole journey. Till next time!