Home Streaming large datasets in Polars
Post
Cancel

Streaming large datasets in Polars

One major advantage of Polars over Pandas is that working with larger-than-memory datasets can be as easy as adding a single argument to a function call. However, streaming doesn’t work in all cases. In this post I introduce how streaming works and how to work around some challenges you may face.

Want to get going with Polars? This post is an extract from my Up & Running with Polars course - learn more here or check out the preview of the first chapters

Streaming in Polars

Firstly, what is streaming? Streaming is the name given in Polars when we process data in batches rather than all-at-once. It’s basically like Polars doing a loop through the data and processing each batch in turn. In contrast when we use the default of processing data all-at-once we call this in Polars the standard engine.

Streaming allows us to work with larger-than-memory datasets because Polars can work with many small batches of data rather than one large dataset. We don’t even need the final output of the query to fit in memory as we can use one of the sink_ methods (see below) to write the output to a Parquet or other file on disk.

Streaming only occurs in Polars when we use lazy mode. We have to tell Polars explicitly that we want to use the streaming engine by passing streaming=True when we evaluate a lazy expression as we’ll see below.

Not all lazy queries can be executed in streaming mode. If you try to execute a query in streaming mode that can’t be executed in streaming mode then Polars defaults to running the query using the standard engine.

Streaming in practice

In this example lazy query we:

  • do a lazy scan of a CSV file
  • group by a column and
  • calculate the mean of another column.

We execute this lazy query with the streaming engine by passing streaming=True to collect.

1
2
3
4
query = pl.scan_csv("iris.csv").group_by("species").agg(
    pl.col("sepal_width").mean().alias("mean_sepal_width")
)
query.collect(streaming=True)

Polars doesn’t guarantee that a query will be executed with the streaming engine when we pass streaming=True. It’s always a good idea to check that your query is actually running with the streaming engine.

We can do this check by printing the optimised query plan with the explain method. The optimised query plan is different for the streaming engine than for the standard engine so we pass streaming=True to explain as well.

1
query.explain(streaming=True)

which gives us the following output:

1
2
3
4
5
6
7
8
--- STREAMING
AGGREGATE
	[col("sepal_width").mean().alias("mean_sepal_width")] BY [col("species")] FROM

    Csv SCAN iris.csv
    PROJECT 2/5 COLUMNS  --- END STREAMING

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

We see that the whole query is in the block that begins with --- STREAMING and ends with --- END STREAMING. This tells us that the whole query is running in streaming mode.

The streaming engine breaks the query up into batches. The size of the batches is determined by the number of CPUs on your machine and the amount of memory each row in your query requires. I explored this in more detail in this post.

Streaming directly to file

If the output of a query is still too large to fit in memory we can write the output directly to a file on disk. We do this by using one of the sink_ methods. In this example we write the output to a Parquet file.

1
query.sink_parquet("iris.parquet")

We don’t have to pass streaming=True here as sink_parquet only runs with the streaming engine. If some part of the query does not run in streaming mode sink_parquet fails with an error message.

Why doesn’t streaming work with all queries?

Streaming doesn’t work with all queries. In this example we do a cum_sum on a column in the dataset as follows:

1
2
3
4
5
6
query = (
    pl.scan_csv("iris.csv")
    .with_columns(
        pl.col("sepal_width").cum_sum().alias("cum_sum_sepal_width")
    )
)

When we print the query plan with explain(streaming=True) we get the following output:

1
2
3
4
5
6
7
8
 WITH_COLUMNS:
 [col("sepal_width").cum_sum().alias("cum_sum_sepal_width")]
  --- STREAMING

  Csv SCAN iris.csv
  PROJECT */5 COLUMNS  --- END STREAMING

    DF []; PROJECT */0 COLUMNS; SELECTION: "None"

and we see that the WITH_COLUMNS part of the query plan with cum_sum is not in the streaming block.

So why does this query not run in streaming mode? To understand this be aware that the Polars developers have to implement each operation in the streaming engine where an operation could be a method like group_by or an expression like .mean. As a general principle operations that can be applied to one batch at a time and then gathered together are more suitable to the streaming engine than operations where one batch requires information from other batches.

For example, if we take the sum of a column we can do this batchwise by taking the sum of each batch and then summing the results from all batches together. However, if we want to take the cumulative cum_sum then we need to do a cumulative sum of the values in the first batch and then pass the final value to the second batch and so on. This makes cum_sum more challenging to include in the streaming engine.

Troubleshooting streaming queries

If you are having challenges executing a query in streaming mode I recommend the following steps:

  • Check the query plan to see which parts of the query can be executed in streaming mode
  • If part of the query plan cannot be executed in streaming mode and you are not sure which operations are causing this, try starting with a simpler query and adding parts until you find the operation that is causing the issue
  • If you find an operation that means the query cannot be executed in streaming mode, try to defer this operation until a point in the query when the size of the data has been reduced, for example after a filter or group_by

In some cases there is a workaround for queries that do not work in streaming mode. For example, let’s say we have the following DataFrame with time series data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from datetime import datetime
import polars as pl

df = (
    pl.DataFrame(
        {
            "datetime":pl.datetime_range(datetime(2024,1,1),datetime(2024,1,3),"1h",eager=True)
        }
    )
    .with_row_index()
)
shape: (49, 2)
┌───────┬─────────────────────┐
 index  datetime            
 ---    ---                 
 u32    datetime[μs]        
╞═══════╪═════════════════════╡
 0      2024-01-01 00:00:00 
 1      2024-01-01 01:00:00 
 2      2024-01-01 02:00:00 
 3      2024-01-01 03:00:00 
                          
 45     2024-01-02 21:00:00 
 46     2024-01-02 22:00:00 
 47     2024-01-02 23:00:00 
 48     2024-01-03 00:00:00 
└───────┴─────────────────────┘

We want to do a daily group by on the data. Normally we would use the group_by_dynamic method for a temporal group by. However, this method does not at present work in streaming mode.

We can, however, still do this in streaming mode by doing a with_columns to extract the date from our datetimes and then doing a regular group_by on the date column.

1
2
3
4
5
6
7
groupby_query = (
    df
    .lazy()
    .with_columns(pl.col("datetime").dt.date().alias("date"))
    .group_by("date")
    .agg(pl.col("index").mean())
)

If we print the streaming query plan we get the following output:

1
2
3
4
5
6
7
8
--- STREAMING
AGGREGATE
	[col("index").mean()] BY [col("date")] FROM
   WITH_COLUMNS:
   [col("datetime").dt.date().alias("date")]
    DF ["index", "datetime"]; PROJECT 2/2 COLUMNS; SELECTION: "None"  --- END STREAMING

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

and we see here that both the WITH_COLUMNS and the AGGREGATE parts of the query are in the streaming block so this full query will run in streaming mode.

You may be able to identify similar transformations that you can do to your query to get it to run in streaming mode.

Next steps

That’s my quick introduction to the core ideas of working with streaming mode in Polars. Get in touch on social media if you have any questions or comments.

If you would like more detailed support on your challenges then I provide consulting on optimising your data processing pipelines with Polars. You can also check out my online course to get you up-and-running with Polars by clicking on the bear below

Want to know more about Polars for high performance data science? Then you can:

This post is licensed under CC BY 4.0 by the author.