Big Data, Small Machine: The Magic of Polars Streaming and more Data tips
Polars series #4 #edition24
What’s on the list today?
Polars Series, Part 4: Streaming in Polars
Data Engineering Tips
Efficient way to use withColumn in PySpark
Multi column Alter Databricks
🐻Polars Series – Part 4: Streaming
Welcome back to the Polars journey! This is the 4th part in the 5 part series of exploring Polars as a data frame library.
Part1 - An Introduction to Polars
Part2 - Lazy Execution in Polars
Part3 - Expressions and Transformations
In the previous editions we have explored Polars lazy engine and now its time to unlock the streaming engine.
🌟 Why Streaming?
Processing large datasets often comes with memory constraints. Polars offers a streaming execution engine that lets you work with datasets larger than your memory(RAM) by processing data in small batches instead of materializing it all at once.
The power of big data, without needing a huge compute.
This is made possible when you use scan_csv or scan_parquet and execute your lazy query with:
df = query.collect(engine="streaming")
💡 scan_* functions are required — eager read_* won’t work with streaming.
📈 Real example
To achieve this, we need a large file that's easier to use. Leveraging Polars, we can artificially expand our existing 5GB flights sample dataset into a bigger file.
import polars as pl
# Read the original small Parquet file
df = pl.read_parquet("data/flights.parquet")
# Repeat the data 1000 times
bigger_df = pl.concat([df] * 1000)
# Save to a new Parquet file
bigger_df.write_parquet("data/big_flights.parquet")
Next, we can use streaming aggregation on the large dataset to calculate the total number of delayed flights per carrier.
import polars as pl
q = (
pl.scan_parquet("data/big_flights.parquet")
.filter(pl.col("dep_delay") > 0)
.group_by("carrier")
.agg(pl.len().alias("delayed_flights"))
)
result = q.collect(engine="streaming")
print(result)
The streaming engine partitions the query into manageable batches, with the batch size influenced by the machine's CPU count and the memory requirements per row in the query.
To determine the number of CPUs on your machine, simply check the number of threads that matches the CPU count using this function.
import polars as pl
pl.threadpool_size()
The current formula for generating a chunk size is
thread_factor = max(12 / n_threads, 1)
STREAMING_CHUNK_SIZE = max(50_000 / n_cols * thread_factor, 1000))
The thread_factor is set to the greater of 1 or 12 divided by the number of CPUs. When you have 12 or more CPUs, thread_factor defaults to 1, and increases with fewer CPUs.
The chunk size is determined by the maximum of the following two values:
1000
50,000 divided by the number of DataFrame columns, scaled by the number of CPU cores.
This formula is based on the Polars source code and may be subject to change as the library evolves.
📈 When Should You Use It?
Large files that don’t fit in memory
Production pipelines where predictable memory usage is key
When you want fast startup time (especially for CSV/Parquet)
⚠️ What’s Supported?
Streaming is still in development and it will evolve in time. You can find the operations that support streaming in polars documentation.
💡 TL;DR
Polars Streaming enables scalable, memory-efficient data transformations — making it a powerful choice for production-grade pipelines without relying on heavyweight distributed frameworks.
For batch or near-real-time processing at scale, it offers the performance of big data engines with the simplicity of a single-node setup.
Thanks for checking out part-4 of the series. Stay tuned for the next one. The example code is available on Github.
Data Engineering Tips
🚀 Stop chaining withColumn in PySpark
Each .withColumn() call in your transformation creates a new DataFrame and adds a Project node to the logical plan. When used repeatedly, especially in loops, this leads to deeply nested logical plans which can unnecessarily strain the optimizer and inflate the the DAG.
Instead of multiple .withColumn() calls, consolidate your transformations using a single .select() statement or using the withColumns(available in 3.3+). This approach constructs the desired schema in one go, reducing the overhead of multiple plan analyses.
Tip credit Andrei Agaronian!
🚀 Multi-Column ALTER TABLE Support Databricks (From DBR 16.3+)
Databricks now supports multi column alter which makes table management smooth.