Home Reading from S3 with Polars
Post
Cancel

Reading from S3 with Polars

In this post we see how to read and write from a CSV or Parquet file in S3 with Polars. We also see how to filter the file on S3 before downloading it to reduce the amount of data transferred across the network.

Want to learn more? Check out my Data Analysis with Polars course using this discount code

Writing a CSV file to S3

We will create a simple DataFrame with 3 columns. We will write this to a CSV file in S3 using s3fs. The s3fs library allows you to read and write files to S3 with similar syntax to working on a local file system.

1
2
3
4
5
6
7
8
9
10
11
12
13
bucket_name = "my_bucket"
key = "test_write.csv"

fs = s3fs.S3FileSystem()  
df = pl.DataFrame(
    {
        "foo": [1, 2, 3, 4, 5],
        "bar": [6, 7, 8, 9, 10],
        "ham": ["a", "b", "c", "d", "e"],
    }
)
with fs.open(f"{bucket_name}/{key}", mode="wb") as f:
    df.write_csv(f)

We could also of course have written this as a Parquet file to reduce the file size and make subsequent reads faster.

Reading a CSV file from S3

We can use Polars to read the file back from S3 using pl.read_csv

1
df_down = pl.read_csv(f"s3://{bucket}/{key}")

However, reading the whole file is wasteful when we only want to read a subset of rows. In this case we can use the boto3 library to apply a filter condition on S3 before returning the file.

The key method within boto3 is select_object_content. This allows us to apply a SQL filter to the file on S3 before downloading it. It also requires us to pass some more information about how the file is serialised - whether it is a CSV or Parquet file, whether it is compressed etc. - and how the downloaded data should be serialised.

In this example we filter the CSV file on S3 to only return rows where the ham column is equal to "a". I present the full code below and then explain each part in turn.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import boto3
import polars as pl

bucket_name = "my_bucket"
key = "test_write.csv"

# Create a boto3 client to interface with S3
s3 = boto3.client("s3")
# Define the SQL statement to filter the CSV data
sql_expression = "SELECT * FROM s3object s WHERE ham = 'a'"

# Use SelectObjectContent to filter the CSV data before downloading it
s3_object = s3.select_object_content(
    Bucket=bucket_name,
    Key=key,
    ExpressionType="SQL",
    Expression=sql_expression,
    InputSerialization={"CSV": {"FileHeaderInfo": "USE"}, "CompressionType": "NONE"},
    OutputSerialization={"CSV": {}},
)
# Create a reusable StringIO object
output = io.StringIO()

# Iterate over the filtered CSV data and write it to the StringIO object
for event in s3_object["Payload"]:
    if "Records" in event:
        records = event["Records"]["Payload"].decode("utf-8")
        output.write(records)

# Rewind the StringIO object to the beginning
output.seek(0)

df = pl.read_csv(output)

We began by creating the SQL query string:

1
sql_expression = "SELECT * FROM s3object s WHERE ham = 'a'"

This uses the S3 Select SQL syntax which has some differences from how you would write a query in, say, Postgres.

  • The s3object keyword refers to the object on S3.
  • The s is an alias for the object.
  • The WHERE clause is the filter condition.

Note that you will of course be charged for the compute to filter the data.

We then created the s3_object by calling s3.select_object_content. This takes a number of arguments:

1
2
3
4
5
6
7
8
s3_object = s3.select_object_content(
    Bucket=bucket_name,
    Key=key,
    ExpressionType="SQL",
    Expression=sql_expression,
    InputSerialization={"CSV": {"FileHeaderInfo": "USE"}, "CompressionType": "NONE"},
    OutputSerialization={"CSV": {}},
)

In the InputSerialization argument we passed a dictionary with some arguments to tell it how to read our CSV file. For example we told it that the file has a header row and that it is not compressed.

In the OutputSerialization argument we passed a dictionary with some arguments to tell it how to serialize the data it returns - In this as a CSV file. Unfortunately, the only output serialisation options at the time of writing are CSV and JSON so even if you input a Parquet file you cannot return a Parquet file.

We then created a StringIO object from python’s built-in io library to hold the data returned by the s3.select_object_content method before extracting the data returned from S3:

1
2
3
4
5
6
7
8
9
10
11
# Iterate over the filtered CSV data and write it to the StringIO object
for event in s3_object["Payload"]:
    if "Records" in event:
        # Decode the bytes for each line to a string
        records = event["Records"]["Payload"].decode("utf-8")
        # Write the string to the StringIO object
        output.write(records)

# Rewind the StringIO object to the beginning
output.seek(0)

With this StringIO object we can then create a Polars DataFrame:

1
df = pl.read_csv(output)

This approach can be much faster than reading the whole file from S3 if you are only returning a subset of the data.

Wrap-up

In this post we have seen how to read and write files from S3 with Polars. This is a fast-developing area so I’m sure I’ll be back to update this post in the future as Polars does more of this natively.

There are more sophisticated ways to manage data on S3. For example, you could use a data lake tool like Delta Lake to manage your data on S3. These tools allow you to manage your data in a more structured way and to perform operations like upserts and deletes. See this post by Matthew Powers for an intro to using Delta Lake with Polars.

Want to learn more? Check out my Data Analysis with Polars course on Udemy with this discount code

Next steps

Want to know more about Polars for high performance data science? Then you can:

This post is licensed under CC BY 4.0 by the author.