python - Is there a way to apply timestamp rangesintervals to a window function in sqlalchemy postgresql? - Stack Overflow

admin2025-04-17  1

I'm building a low code sql generation tool that takes filter objects with column, operator, value, etc and compiles to sql. Ideally users will be able to specify window functions, which I'm trying to build with subqueries. A use case would be a sliding window query that sums transaction amounts over n days and flags transactions that go above a threshold.

Here is the function, which is one step in a query building pipeline

from sqlalchemy import select, func, inspect, text, MetaData, Table
from sqlalchemy.orm import sessionmaker, aliased



def apply_aggregation(query, aggregate_operator:str, time_window:str, time_column:str, column_name:str, filter_table:Table):
    """
    Apply aggregation to the query (e.g., SUM).
    """
    agg_func = getattr(func, aggregate_operator.lower(), None)
    if not agg_func:
        raise ValueError(f"Unsupported agregate function: {aggregate_operator}")
    
     # Get the target column for aggregation
    target_col = filter_table.c.get(column_name)
    if target_col is None or (time_column not in filter_table.c):
        raise ValueError(f"Invalid column names: {column_name} or {time_column}")

     # Convert the time window into an integer (days)
    try:
        time_window_days = int(time_window.split()[0])  # Assumes format "X days"
    except ValueError:
        raise ValueError(f"Invalid time window format: {time_window}")

    # Define the window function with RANGE INTERVAL
    rolling_agg = agg_func(target_col).over(
        partition_by=text("1"),  # No partitioning
        order_by=filter_table.c[time_column],
        #does not work
        range_=(-time_window_days,0)
        #also does not work
        #range_=(text(f"INTERVAL '{time_window}' PRECEDING"), text("CURRENT ROW")) 

    ).label("rolling_agg")

    # Build the subquery with the window function
    subquery = (
        select(
            filter_table.c.id,  # Keep primary key
            filter_table.c[time_column],  # Keep time column for reference
            rolling_agg
        )
        .subquery()
    )

    # Join the subquery back to the original query
    query = query.add_columns(subquery.c.rolling_agg).select_from(subquery)

    return query

And here's how I'm using it

metadata=MetaData()
te_table = Table('transaction_entries',metadata, autoload_with=engine)

query = select(te_table)
query = apply_aggregation(query, 
                                  filters["aggregate_operator"],
                                  filters["time_window"],
                                  filters["time_column"],
                                   filters["column_name"],
                                   te_table) #simple case where filter table is target table
print(query)
rs = session.execute(query)

Desired result

SELECT transaction_entries.id, ...other transaction_entries fields, anon_1.rolling_agg 
FROM (SELECT transaction_entries.id AS id, transaction_entries.created_at AS created_at, sum(transaction_entries.amount) OVER (PARTITION BY 1 ORDER BY transaction_entries.created_at RANGE INTERVAL :param_1 PRECEDING) AS rolling_agg 
FROM transaction_entries) AS anon_1, transaction_entries

Actual result

SELECT transaction_entries.id, ...other transaction_entries fields, anon_1.rolling_agg 
FROM (SELECT transaction_entries.id AS id, transaction_entries.created_at AS created_at, sum(transaction_entries.amount) OVER (PARTITION BY 1 ORDER BY transaction_entries.created_at RANGE BETWEEN :param_1 PRECEDING AND CURRENT ROW) AS rolling_agg 
FROM transaction_entries) AS anon_1, transaction_entries

When I try to change range_ to an interval, I get a FeatureNotSupported Error

FeatureNotSupported: RANGE with offset PRECEDING/FOLLOWING is not supported for column type timestamp without time zone and offset type integer
LINE 2: ...R BY transaction_entries.created_at RANGE BETWEEN 30 PRECEDI...
                                                             ^
HINT:  Cast the offset value to an appropriate type.

How can I case to the correct type, or is there a better way to accomplish this?

转载请注明原文地址:http://anycun.com/QandA/1744877248a88888.html