General
agg_apply(df, agg_exprs)
Apply aggregation expressions and return a DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The input Spark DataFrame. |
required |
agg_exprs
|
Dict[str, Column]
|
A dictionary where keys are the aliases for the aggregated columns and values are the Spark aggregation expressions (Columns). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
A DataFrame containing the aggregated results. |
Example
>>> df = spark.createDataFrame([(1, "A"), (2, "A"), (3, "B")], ["value", "category"])
>>> agg_exprs = {
... "total_value": F.sum("value"),
... "max_value": F.max("value"),
... "count": F.count("*")
... }
>>> result = agg_apply(df, agg_exprs)
>>> result.show()
+-----------+---------+-----+
|total_value|max_value|count|
+-----------+---------+-----+
| 6| 3| 3|
+-----------+---------+-----+
Source code in pysparky/transformations/general.py
apply_cols(sdf, col_func, cols=None, **kwargs)
Apply a function to specified columns of a Spark DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sdf
|
DataFrame
|
The input Spark DataFrame. |
required |
col_func
|
callable
|
The function to apply to each column. |
required |
cols
|
list[str]
|
List of column names to apply the function to. If None, applies to all columns. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
A new Spark DataFrame with the function applied to the specified columns. |
Source code in pysparky/transformations/general.py
distinct_value_counts_map(sdf, column_name)
Get distinct values from a DataFrame column as a map with their counts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sdf
|
DataFrame
|
The input Spark DataFrame. |
required |
column_name
|
str
|
The name of the column to process. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
A DataFrame containing a single column with a map of distinct values and their counts. |
Example
>>> data = [("Alice",), ("Bob",), ("Alice",), ("Eve",), (None,)]
>>> sdf = spark.createDataFrame(data, ["name"])
>>> result = distinct_value_counts_map(sdf, "name")
>>> result.show(truncate=False)
+-------------------------------------------+
|name_map |
+-------------------------------------------+
|{Alice -> 2, Bob -> 1, Eve -> 1, NONE -> 1}|
+-------------------------------------------+
Source code in pysparky/transformations/general.py
execute_transformation_blueprint(sdf, blueprint)
Executes a transformation blueprint on a Spark DataFrame.
The transformation blueprint is a dictionary where keys are column names and values are the corresponding transformations to apply. The function applies each transformation in the order specified by the blueprint and returns the resulting DataFrame with the transformed columns.
A transformation_blueprint is a dictionary that the key: new column name value: Column function
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sdf
|
DataFrame
|
The input DataFrame to be transformed. |
required |
blueprint
|
Dict[str, Column]
|
A dictionary of column names as keys and transformation functions as values. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
The resulting DataFrame with the transformed columns. |
Example
Source code in pysparky/transformations/general.py
filters(sdf, conditions, operator_='and')
Apply multiple filter conditions to a Spark DataFrame.
This function takes a Spark DataFrame, a list of conditions, and an optional operator. It returns a new DataFrame with all conditions applied using the specified operator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sdf
|
DataFrame
|
The input Spark DataFrame to be filtered. |
required |
conditions
|
list[Column]
|
A list of Column expressions representing the filter conditions. |
required |
operator_
|
Callable
|
The operator to use for combining
conditions. Defaults to |
'and'
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pyspark.sql.DataFrame: A new DataFrame with all filter conditions applied. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If an unsupported operator is provided. |
Example
>>> from pyspark.sql.functions import col
>>> df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['id', 'letter'])
>>> conditions = [col('id') > 1, col('letter').isin(['b', 'c'])]
# Filter using AND (default behavior)
>>> filtered_df_and = filters(df, conditions)
>>> filtered_df_and.show()
+---+------+
| id|letter|
+---+------+
| 2| b|
| 3| c|
+---+------+
# Filter using OR
>>> filtered_df_or = filters(df, conditions, or_)
>>> filtered_df_or.show()
+---+------+
| id|letter|
+---+------+
| 2| b|
| 3| c|
| 1| a|
+---+------+
Source code in pysparky/transformations/general.py
get_unique_values(df1, df2, column_name)
Unions two DataFrames and returns a DataFrame with unique values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df1
|
DataFrame
|
First DataFrame. |
required |
df2
|
DataFrame
|
Second DataFrame. |
required |
column_name
|
str
|
The column name containing the values. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
A DataFrame with unique values. |
Example
Source code in pysparky/transformations/general.py
order_and_aggregate_events(df, id_col, value_col, record_no_col, values_array_col, record_nos_array_col, sort_asc=True)
Group events by id, order by record_no, and aggregate into arrays.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The input DataFrame. |
required |
id_col
|
str
|
The column name to group events by. |
required |
value_col
|
str
|
The column name for values to aggregate. |
required |
record_no_col
|
str
|
The column name for record numbers to sort by. |
required |
values_array_col
|
str
|
The column name for the aggregated values array. |
required |
record_nos_array_col
|
str
|
The column name for the aggregated record numbers array. |
required |
sort_asc
|
bool
|
Whether to sort the arrays in ascending order. Defaults to True. |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
A DataFrame with events aggregated into arrays and ordered by record_no. |
Example
>>> data = [
... ("A", "val1", 2),
... ("A", "val2", 1),
... ("B", "val3", 1),
... ]
>>> df = spark.createDataFrame(data, ["id", "value", "record_no"])
>>> result = order_and_aggregate_events(df, "id", "value", "record_no", "values", "record_nos")
>>> result.show(truncate=False)
+---+------------+----------+
|id |values |record_nos|
+---+------------+----------+
|A |[val2, val1]|[1, 2] |
|B |[val3] |[1] |
+---+------------+----------+
Source code in pysparky/transformations/general.py
set_columns_to_null_based_on_condition(df, condition_column, condition_value, target_columns)
Sets specified columns to null based on a condition in the given DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The input DataFrame. |
required |
condition_column
|
str
|
The name of the column containing the condition value. |
required |
condition_value
|
str
|
The value indicating the condition to set columns to null. |
required |
target_columns
|
Tuple[str]
|
The tuple of columns to be set as null if the condition value is found. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
The updated DataFrame with specified columns set to null based on the condition. |
Example
>>> data = [
... (1, 0, 0, 0),
... (2, 0, 1, 0),
... (3, 1, 1, 1),
... (4, 1, 0, 1),
... (5, 0, 0, 0),
... ]
>>> columns = ["ID", "Dummy1", "Dummy2", "Dummy3"]
>>> df = spark.createDataFrame(data, columns)
>>> condition_column = "Dummy1"
>>> condition_value = 1
>>> target_columns = ("Dummy2", "Dummy3")
>>> result_df = set_columns_to_null_based_on_condition(df, condition_column, condition_value, target_columns)
>>> result_df.show()
+---+------+------+------+
| ID|Dummy1|Dummy2|Dummy3|
+---+------+------+------+
| 1| 0| 0| 0|
| 2| 0| 1| 0|
| 3| 1| null| null|
| 4| 1| null| null|
| 5| 0| 0| 0|
+---+------+------+------+
Source code in pysparky/transformations/general.py
transforms(sdf, transformations)
Apply a series of transformations to a Spark DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sdf
|
DataFrame
|
The input Spark DataFrame to be transformed. |
required |
transformations
|
list
|
A list of transformations, where each transformation is a tuple containing a function and a dictionary of keyword arguments to apply the function to. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DataFrame |
DataFrame
|
The transformed Spark DataFrame. |