I’m working on a data pipeline using Polars and Delta Lake to store and update a grouped time series on a laptop with 16GB RAM. The dataset consists of a daily time series with ~1,000 unique groups with a history of 125 years. I cannot manage the entire dataset in a single Parquet file, so I chose Delta Lake as the storage method.
Right now, I’m trying to decide on the best partitioning strategy to balance query performance(!), file management and write efficiency.
My current pipeline:
Initial dataset
Daily upserts:
Optimization steps:
A logical partitioning would be year/month/day, but that would create too many small files. I'm considering two options:
Partition by year:
Partition by decade:
Since Delta Lake requires partition columns to be stored, both strategies would require adding an extra column (year or decade) to the dataset. However, I typically filter by date when reading the dataset, not year or decade, so query by partition is not useful in this case.
My Main Questions, are there better partitioning strategies for Delta Lake in my case? How can I efficiently manage file sizes without sacrificing query performance?
Here is a similar example where no partitioning is applied which results in a single large parquet file:
import os
import shutil
import psutil
from pathlib import Path
from datetime import date, timedelta
import polars as pl
from deltalake import DeltaTable
def generate_data(ngroups, ndates, ncols=5, start=date(1900, 1, 1), value=1.0, eager=True) -> pl.DataFrame | pl.LazyFrame:
groups = pl.LazyFrame({'group': pl.arange(1, ngroups+1, dtype=pl.Int64, eager=True).cast(pl.String)})
dates = pl.LazyFrame({'date': pl.date_range(start, start+timedelta(days=ndates-1), "1d", eager=True)})
lf = (
groups.join(dates, how='cross')
.with_columns(
[pl.lit(i*value, dtype=pl.Float64).alias(f'val_{i}') for i in range(1, ncols+1)])
)
return lf.collect() if eager else lf
print('Generating initial "large" dataset...')
df = generate_data(ngroups=1000, ndates=125*365, value=1.0, eager=True)
print(df.tail(3))
# ┌───────┬────────────┬───────┬───────┬───────┬───────┬───────┐
# │ group ┆ date ┆ val_1 ┆ val_2 ┆ val_3 ┆ val_4 ┆ val_5 │
# │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
# │ str ┆ date ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ f64 │
# ╞═══════╪════════════╪═══════╪═══════╪═══════╪═══════╪═══════╡
# │ 1000 ┆ 2024-11-28 ┆ 1.0 ┆ 2.0 ┆ 3.0 ┆ 4.0 ┆ 5.0 │
# │ 1000 ┆ 2024-11-29 ┆ 1.0 ┆ 2.0 ┆ 3.0 ┆ 4.0 ┆ 5.0 │
# │ 1000 ┆ 2024-11-30 ┆ 1.0 ┆ 2.0 ┆ 3.0 ┆ 4.0 ┆ 5.0 │
# └───────┴────────────┴───────┴───────┴───────┴───────┴───────┘
size = df.estimated_size("gb")
memory = psutil.virtual_memory().total/1024**3
print(f' size/memory => {size:.3}gb/{memory:.3}gb => {int(100*size/memory)}%')
# size/memory => 1.99gb/15.5gb => 12%
print('Saving initial "large" "dataset to delta table...')
delta_path = Path('./table/').resolve() # <= Path to dalta table
if os.path.exists(delta_path):
shutil.rmtree(delta_path)
df.write_delta(delta_path, mode="overwrite",
delta_write_options={"partition_by": []})
df = None # free memory
print('Upserting delta table by "small" dataset...')
(
generate_data(ngroups=1000, ndates=5, start=date(2024, 11, 30), value=10.0, eager=True)
.write_delta(
delta_path,
mode="merge",
delta_write_options={"partition_by": []},
delta_merge_options={
"predicate": "s.group = t.group AND s.date = t.date",
"source_alias": "s",
"target_alias": "t"})
.when_matched_update_all()
.when_not_matched_insert_all().execute()
)
print("Optimize zorder to sort by group & date...")
DeltaTable(delta_path).optimize.z_order(["group", "date"])
print("Remove unused files...")
DeltaTable(delta_path).vacuum(
retention_hours=0,
dry_run=False,
enforce_retention_duration=False,
)
print("Update complete.")
# shutil.rmtree(delta_path)
Edit:
Another approach could be to limit the number of rows per Parquet file using max_rows_per_file instead of partitioning. However, if the dataset is ordered by group and date, most Parquet files would probably be completely rewritten during any incremental update. If the dataset is stored by date and group instead, then only the latest file would be updated, reducing rewrites. Unfortunately, this would require additional sorting by group and date when reading the dataset. Alternatively, you could set the compressed target_file_size to ~10 MB, which might be more straightforward.