Home Reading from S3 with Polars
Post
Cancel

Reading from S3 with Polars

Updated October 2023

In this post we see how to read and write from a CSV or Parquet file in S3 with Polars. We also see how to filter the file on S3 before downloading it to reduce the amount of data transferred across the network.

Want to get going with Polars? This post is an extract from my Up & Running with Polars course - learn more here or check out the preview of the first chapters

Writing a file to S3

We will create a simple DataFrame with 3 columns. We will write this to both a CSV and Parquet file in S3 using s3fs. The s3fs library allows you to read and write files to S3 with similar syntax to working on a local file system.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bucket_name = "my_bucket"
csv_key = "test_write.csv"
parquet_key = "test_write.parquet"

fs = s3fs.S3FileSystem()  
df = pl.DataFrame(
    {
        "foo": [1, 2, 3, 4, 5],
        "bar": [6, 7, 8, 9, 10],
        "ham": ["a", "b", "c", "d", "e"],
    }
)
with fs.open(f"{bucket_name}/{csv_key}", mode="wb") as f:
    df.write_csv(f)
with fs.open(f"{bucket_name}/{parquet_key}", mode="wb") as f:
    df.write_parquet(f)

I recommend the Parquet format if you can choose as it has a smaller file size, preserves dtypes and makes subsequent reads faster.

Reading a file from S3

We can use Polars to read the file back from S3 using pl.read_csv

1
2
df_csv = pl.read_csv(f"s3://{bucket}/{csv_key}")
df_parquet = pl.read_parquet(f"s3://{bucket}/{parquet_key}")

Internally Polars reads the remote file into a memory buffer using ffspec and then reads the buffer into a DataFrame. This is a fast approach but it does mean that the whole file is read into memory. This is fine for small files but can be slow and memory intensive for large files.

However, reading the whole file is wasteful when we only want to read a subset of rows. With a Parquet file we can instead scan the file on S3 and only read the rows we need.

Scanning a file on S3 with query optimisation

With a Parquet file we can scan the file on S3 and build a lazy query. The Polars query optimiser applies:

  • predicate pushdown meaning that any conditions on which rows to filter are applied on S3 and
  • projection pushdown meaning that if only a subset of columns are required then only those columns are read from S3.

We can do this with pl.scan_parquet. This may also require some cloud storage provider specific options to be passed. In the first instance Polars try to get these from environment variables but we can override these with the storage_options argument

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import polars as pl

source = "s3://bucket/*.parquet"

storage_options = {
    "aws_access_key_id": "<secret>",
    "aws_secret_access_key": "<secret>",
    "aws_region": "eu-west-1",
}
df = (
    # Scan the file on S3
    pl.scan_parquet(source, storage_options=storage_options)
    # Apply a filter condition
    .filter(pl.col("id") > 100)
    # Select only the columns we need
    .select("id","value")
)

With scan_parquet Polars does an async read of the Parquet file using the Rust object_store library under the hood.

Applying filters to a CSV file

At this point in time (October 2023) Polars does not support scanning a CSV file on S3. In this case we can use the boto3 library to apply a filter condition on S3 before returning the file.

The key method within boto3 is select_object_content. This allows us to apply a SQL filter to the file on S3 before downloading it. It also requires us to pass some more information about how the file is serialised - whether it is a CSV or Parquet file, whether it is compressed etc. - and how the downloaded data should be serialised.

In this example we filter the CSV file on S3 to only return rows where the ham column is equal to "a". I present the full code below and then explain each part in turn.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import boto3
import polars as pl

bucket_name = "my_bucket"
key = "test_write.csv"

# Create a boto3 client to interface with S3
s3 = boto3.client("s3")
# Define the SQL statement to filter the CSV data
sql_expression = "SELECT * FROM s3object s WHERE ham = 'a'"

# Use SelectObjectContent to filter the CSV data before downloading it
s3_object = s3.select_object_content(
    Bucket=bucket_name,
    Key=key,
    ExpressionType="SQL",
    Expression=sql_expression,
    InputSerialization={"CSV": {"FileHeaderInfo": "USE"}, "CompressionType": "NONE"},
    OutputSerialization={"CSV": {}},
)
# Create a reusable StringIO object
output = io.StringIO()

# Iterate over the filtered CSV data and write it to the StringIO object
for event in s3_object["Payload"]:
    if "Records" in event:
        records = event["Records"]["Payload"].decode("utf-8")
        output.write(records)

# Rewind the StringIO object to the beginning
output.seek(0)

df = pl.read_csv(output)

We began by creating the SQL query string:

1
sql_expression = "SELECT * FROM s3object s WHERE ham = 'a'"

This uses the S3 Select SQL syntax which has some differences from how you would write a query in, say, Postgres.

  • The s3object keyword refers to the object on S3.
  • The s is an alias for the object.
  • The WHERE clause is the filter condition.

Note that you will of course be charged for the compute to filter the data.

We then created the s3_object by calling s3.select_object_content. This takes a number of arguments:

1
2
3
4
5
6
7
8
s3_object = s3.select_object_content(
    Bucket=bucket_name,
    Key=key,
    ExpressionType="SQL",
    Expression=sql_expression,
    InputSerialization={"CSV": {"FileHeaderInfo": "USE"}, "CompressionType": "NONE"},
    OutputSerialization={"CSV": {}},
)

In the InputSerialization argument we passed a dictionary with some arguments to tell it how to read our CSV file. For example we told it that the file has a header row and that it is not compressed.

In the OutputSerialization argument we passed a dictionary with some arguments to tell it how to serialize the data it returns - In this as a CSV file. Unfortunately, the only output serialisation options at the time of writing are CSV and JSON so even if you input a Parquet file you cannot return a Parquet file.

We then created a StringIO object from python’s built-in io library to hold the data returned by the s3.select_object_content method before extracting the data returned from S3:

1
2
3
4
5
6
7
8
9
10
11
# Iterate over the filtered CSV data and write it to the StringIO object
for event in s3_object["Payload"]:
    if "Records" in event:
        # Decode the bytes for each line to a string
        records = event["Records"]["Payload"].decode("utf-8")
        # Write the string to the StringIO object
        output.write(records)

# Rewind the StringIO object to the beginning
output.seek(0)

With this StringIO object we can then create a Polars DataFrame:

1
df = pl.read_csv(output)

This approach can be much faster than reading the whole file from S3 if you are only returning a subset of the data.

Wrap-up

In this post we have seen how to read and write files from S3 with Polars. This is a fast-developing area so I’m sure I’ll be back to update this post in the future (again!) as Polars does more of this natively.

There are more sophisticated ways to manage data on S3. For example, you could use a data lake tool like Delta Lake to manage your data on S3. These tools allow you to manage your data in a more structured way and to perform operations like upserts and deletes. See this post by Matthew Powers for an intro to using Delta Lake with Polars.

Want to get going with Polars? This post is an extract from my Up & Running with Polars course - learn more here or check out the preview of the first chapters )

Next steps

Want to know more about Polars for high performance data science? Then you can:

This post is licensed under CC BY 4.0 by the author.