I have a function check_fun where the user inputs one table and additional argumments of column names. This function later on evaluates some checks depending on which variables has been inputed in the function call. This works fine but each evaluation and append takes a lot  of time. How can I rewrite it in one call?
Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, when, count
import pandas as pd
# Sample data
data = [(1, 12, 1, 5), (6, 8, 1, 6), (7, 15, 1, 7), (4, 9, 1, 12), (10, 11, 1, 9)]
columns = ["a", "b", "c", "d"]
df = spark.createDataFrame(data, columns)
The original function
def check_fun(df,
          a_input: str = None, 
          b_input: str = None,
          c_input: str = None,   
          d_input: str = None):
    
    columns = ['check', 'description', 'count']
    all_res = pd.DataFrame(columns = columns)
    
    # check1
    if a_input is not None and b_input is not None:
        check1_count = df.filter(col(a_input) > col(b_input)).count()
        check_1_res   = pd.DataFrame([['check1', 'a > b', check1_count]],
                                       columns = columns)
        all_res = all_res.append(check_1_res)
    # check2
    if a_input is not None and c_input is not None:
        check2_count = df.filter(col(a_input) > col(c_input)).count()
        check_2_res   = pd.DataFrame([['check2', 'a > c', check2_count]],
                                       columns = columns)
        all_res = all_res.append(check_2_res)
    # check3
    if a_input is not None and d_input is not None:
        check3_count = df.filter(col(a_input) > col(d_input)).count()
        check_3_res   = pd.DataFrame([['check3', 'a > d', check3_count]],
                                     columns=columns)
        all_res = all_res.append(check_3_res)
    # check4
    if b_input is not None and c_input is not None:
        check4_count = df.filter(col(a_input) <  col(d_input)).count()
        check_4_res   = pd.DataFrame([['check4', 'a <  d', check4_count]],
                                     columns=columns)
        all_res = all_res.append(check_4_res)
    return(all_res)
How I tried solving it:
 a = "a"
 b = "b"
 c = "c"
 d = "d"    
 df.agg(
    when(a is not None and b is not None, sum(when(col(a) > col(b), 1).otherwise(0)).otherwise(None).alias('check1')
)).show()
But this returns an error...
I have a function check_fun where the user inputs one table and additional argumments of column names. This function later on evaluates some checks depending on which variables has been inputed in the function call. This works fine but each evaluation and append takes a lot  of time. How can I rewrite it in one call?
Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, when, count
import pandas as pd
# Sample data
data = [(1, 12, 1, 5), (6, 8, 1, 6), (7, 15, 1, 7), (4, 9, 1, 12), (10, 11, 1, 9)]
columns = ["a", "b", "c", "d"]
df = spark.createDataFrame(data, columns)
The original function
def check_fun(df,
          a_input: str = None, 
          b_input: str = None,
          c_input: str = None,   
          d_input: str = None):
    
    columns = ['check', 'description', 'count']
    all_res = pd.DataFrame(columns = columns)
    
    # check1
    if a_input is not None and b_input is not None:
        check1_count = df.filter(col(a_input) > col(b_input)).count()
        check_1_res   = pd.DataFrame([['check1', 'a > b', check1_count]],
                                       columns = columns)
        all_res = all_res.append(check_1_res)
    # check2
    if a_input is not None and c_input is not None:
        check2_count = df.filter(col(a_input) > col(c_input)).count()
        check_2_res   = pd.DataFrame([['check2', 'a > c', check2_count]],
                                       columns = columns)
        all_res = all_res.append(check_2_res)
    # check3
    if a_input is not None and d_input is not None:
        check3_count = df.filter(col(a_input) > col(d_input)).count()
        check_3_res   = pd.DataFrame([['check3', 'a > d', check3_count]],
                                     columns=columns)
        all_res = all_res.append(check_3_res)
    # check4
    if b_input is not None and c_input is not None:
        check4_count = df.filter(col(a_input) <  col(d_input)).count()
        check_4_res   = pd.DataFrame([['check4', 'a <  d', check4_count]],
                                     columns=columns)
        all_res = all_res.append(check_4_res)
    return(all_res)
How I tried solving it:
 a = "a"
 b = "b"
 c = "c"
 d = "d"    
 df.agg(
    when(a is not None and b is not None, sum(when(col(a) > col(b), 1).otherwise(0)).otherwise(None).alias('check1')
)).show()
But this returns an error...
You're probably after something like this:
def check_fun(df, a=None, b=None, c=None, d=None):
    columns = ['check', 'description', 'count']
    results = [
        [check_name, description, df.filter(col(x) > col(y)).count()]
        for x, y, check_name, description in [
            (a, b, "check1", "a > b"),
            (a, c, "check2", "a > c"),
            (a, d, "check3", "a > d"),
            (d, a, "check4", "a < d")
        ]
        if x is not None and y is not None
    ]
    return pd.DataFrame(results, columns=columns) if results else pd.DataFrame(columns=columns)
Depending on the correct interpretation of 'check4', which had an inconsistency in the code you shared.


ifstatement:if b_input is not None and c_input is not None:- however, you're comparinga < din the logic that follows? – Grismar Commented Jan 29 at 10:32ifstatement, or the logic that follows? You can edit the question to fix it, and people won't need to refer to the comments to make sense of it. – Grismar Commented Jan 29 at 12:53