python - Choosing an optimal partitioning strategy for a delta lake table on a laptop - Stack Overflow

admin2025-04-17  3

I’m working on a data pipeline using Polars and Delta Lake to store and update a grouped time series on a laptop with 16GB RAM. The dataset consists of a daily time series with ~1,000 unique groups with a history of 125 years. I cannot manage the entire dataset in a single Parquet file, so I chose Delta Lake as the storage method.

Right now, I’m trying to decide on the best partitioning strategy to balance query performance(!), file management and write efficiency.

My current pipeline:

Initial dataset

  • group: String (~1,000 unique values).
  • date: Daily from 1900 to 2025.
  • 5 numeric value columns.
  • Stored in a Delta Table.
  • Estimated size: 2–5 GB.

Daily upserts:

  • A small dataset (1–30 days) is merged daily, weekly, or monthly into the Delta table.

Optimization steps:

  • Z-Ordering on group and date.
  • Vacuuming to remove unnecessary files.

A logical partitioning would be year/month/day, but that would create too many small files. I'm considering two options:

Partition by year:

  • Better for incremental updates and deletes.
  • Results in 125 partitions, leading to more files.

Partition by decade:

  • Reduces partitions to 13, meaning fewer small files.
  • Worse for incremental updates and deletes.

Since Delta Lake requires partition columns to be stored, both strategies would require adding an extra column (year or decade) to the dataset. However, I typically filter by date when reading the dataset, not year or decade, so query by partition is not useful in this case.

My Main Questions, are there better partitioning strategies for Delta Lake in my case? How can I efficiently manage file sizes without sacrificing query performance?

Here is a similar example where no partitioning is applied which results in a single large parquet file:

import os
import shutil
import psutil
from pathlib import Path
from datetime import date, timedelta
import polars as pl
from deltalake import DeltaTable

def generate_data(ngroups, ndates, ncols=5, start=date(1900, 1, 1), value=1.0, eager=True) -> pl.DataFrame | pl.LazyFrame:
    groups = pl.LazyFrame({'group': pl.arange(1, ngroups+1, dtype=pl.Int64, eager=True).cast(pl.String)})
    dates = pl.LazyFrame({'date': pl.date_range(start, start+timedelta(days=ndates-1), "1d", eager=True)})
    lf = (
        groups.join(dates, how='cross')
        .with_columns(
            [pl.lit(i*value, dtype=pl.Float64).alias(f'val_{i}') for i in range(1, ncols+1)])
    )
    return lf.collect() if eager else lf

print('Generating initial "large" dataset...')
df = generate_data(ngroups=1000, ndates=125*365, value=1.0, eager=True)
print(df.tail(3))
# ┌───────┬────────────┬───────┬───────┬───────┬───────┬───────┐
# │ group ┆ date       ┆ val_1 ┆ val_2 ┆ val_3 ┆ val_4 ┆ val_5 │
# │ ---   ┆ ---        ┆ ---   ┆ ---   ┆ ---   ┆ ---   ┆ ---   │
# │ str   ┆ date       ┆ f64   ┆ f64   ┆ f64   ┆ f64   ┆ f64   │
# ╞═══════╪════════════╪═══════╪═══════╪═══════╪═══════╪═══════╡
# │ 1000  ┆ 2024-11-28 ┆ 1.0   ┆ 2.0   ┆ 3.0   ┆ 4.0   ┆ 5.0   │
# │ 1000  ┆ 2024-11-29 ┆ 1.0   ┆ 2.0   ┆ 3.0   ┆ 4.0   ┆ 5.0   │
# │ 1000  ┆ 2024-11-30 ┆ 1.0   ┆ 2.0   ┆ 3.0   ┆ 4.0   ┆ 5.0   │
# └───────┴────────────┴───────┴───────┴───────┴───────┴───────┘
size = df.estimated_size("gb")
memory = psutil.virtual_memory().total/1024**3
print(f'  size/memory => {size:.3}gb/{memory:.3}gb => {int(100*size/memory)}%')
# size/memory => 1.99gb/15.5gb => 12%

print('Saving initial "large" "dataset to delta table...')
delta_path = Path('./table/').resolve()  # <= Path to dalta table
if os.path.exists(delta_path):
    shutil.rmtree(delta_path)
df.write_delta(delta_path, mode="overwrite",
               delta_write_options={"partition_by": []})
df = None  # free memory

print('Upserting delta table by "small" dataset...')
(
    generate_data(ngroups=1000, ndates=5, start=date(2024, 11, 30), value=10.0, eager=True)
    .write_delta(
        delta_path,
        mode="merge",
        delta_write_options={"partition_by": []},
        delta_merge_options={
            "predicate": "s.group = t.group AND s.date = t.date",
            "source_alias": "s",
            "target_alias": "t"})
    .when_matched_update_all()
    .when_not_matched_insert_all().execute()
)

print("Optimize zorder to sort by group & date...")
DeltaTable(delta_path).optimize.z_order(["group", "date"])

print("Remove unused files...")
DeltaTable(delta_path).vacuum(
    retention_hours=0,
    dry_run=False,
    enforce_retention_duration=False,
)

print("Update complete.")
# shutil.rmtree(delta_path)

Edit:

Another approach could be to limit the number of rows per Parquet file using max_rows_per_file instead of partitioning. However, if the dataset is ordered by group and date, most Parquet files would probably be completely rewritten during any incremental update. If the dataset is stored by date and group instead, then only the latest file would be updated, reducing rewrites. Unfortunately, this would require additional sorting by group and date when reading the dataset. Alternatively, you could set the compressed target_file_size to ~10 MB, which might be more straightforward.

转载请注明原文地址:http://anycun.com/QandA/1744895606a89147.html