I'm building a low code sql generation tool that takes filter objects with column, operator, value, etc and compiles to sql. Ideally users will be able to specify window functions, which I'm trying to build with subqueries. A use case would be a sliding window query that sums transaction amounts over n days and flags transactions that go above a threshold.
Here is the function, which is one step in a query building pipeline
from sqlalchemy import select, func, inspect, text, MetaData, Table
from sqlalchemy.orm import sessionmaker, aliased
def apply_aggregation(query, aggregate_operator:str, time_window:str, time_column:str, column_name:str, filter_table:Table):
"""
Apply aggregation to the query (e.g., SUM).
"""
agg_func = getattr(func, aggregate_operator.lower(), None)
if not agg_func:
raise ValueError(f"Unsupported agregate function: {aggregate_operator}")
# Get the target column for aggregation
target_col = filter_table.c.get(column_name)
if target_col is None or (time_column not in filter_table.c):
raise ValueError(f"Invalid column names: {column_name} or {time_column}")
# Convert the time window into an integer (days)
try:
time_window_days = int(time_window.split()[0]) # Assumes format "X days"
except ValueError:
raise ValueError(f"Invalid time window format: {time_window}")
# Define the window function with RANGE INTERVAL
rolling_agg = agg_func(target_col).over(
partition_by=text("1"), # No partitioning
order_by=filter_table.c[time_column],
#does not work
range_=(-time_window_days,0)
#also does not work
#range_=(text(f"INTERVAL '{time_window}' PRECEDING"), text("CURRENT ROW"))
).label("rolling_agg")
# Build the subquery with the window function
subquery = (
select(
filter_table.c.id, # Keep primary key
filter_table.c[time_column], # Keep time column for reference
rolling_agg
)
.subquery()
)
# Join the subquery back to the original query
query = query.add_columns(subquery.c.rolling_agg).select_from(subquery)
return query
And here's how I'm using it
metadata=MetaData()
te_table = Table('transaction_entries',metadata, autoload_with=engine)
query = select(te_table)
query = apply_aggregation(query,
filters["aggregate_operator"],
filters["time_window"],
filters["time_column"],
filters["column_name"],
te_table) #simple case where filter table is target table
print(query)
rs = session.execute(query)
Desired result
SELECT transaction_entries.id, ...other transaction_entries fields, anon_1.rolling_agg
FROM (SELECT transaction_entries.id AS id, transaction_entries.created_at AS created_at, sum(transaction_entries.amount) OVER (PARTITION BY 1 ORDER BY transaction_entries.created_at RANGE INTERVAL :param_1 PRECEDING) AS rolling_agg
FROM transaction_entries) AS anon_1, transaction_entries
Actual result
SELECT transaction_entries.id, ...other transaction_entries fields, anon_1.rolling_agg
FROM (SELECT transaction_entries.id AS id, transaction_entries.created_at AS created_at, sum(transaction_entries.amount) OVER (PARTITION BY 1 ORDER BY transaction_entries.created_at RANGE BETWEEN :param_1 PRECEDING AND CURRENT ROW) AS rolling_agg
FROM transaction_entries) AS anon_1, transaction_entries
When I try to change range_ to an interval, I get a FeatureNotSupported Error
FeatureNotSupported: RANGE with offset PRECEDING/FOLLOWING is not supported for column type timestamp without time zone and offset type integer
LINE 2: ...R BY transaction_entries.created_at RANGE BETWEEN 30 PRECEDI...
^
HINT: Cast the offset value to an appropriate type.
How can I case to the correct type, or is there a better way to accomplish this?