General
apply_cols(sdf, col_func, cols=None, **kwargs)
Apply a function to specified columns of a Spark DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sdf
|
DataFrame
|
The input Spark DataFrame. |
required |
col_func
|
callable
|
The function to apply to each column. |
required |
cols
|
list[str]
|
List of column names to apply the function to. If None, applies to all columns. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A new Spark DataFrame with the function applied to the specified columns. |
Source code in pysparky/transformations/general.py
distinct_value_counts_map(sdf, column_name)
Get distinct values from a DataFrame column as a map with their counts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sdf
|
DataFrame
|
The input Spark DataFrame. |
required |
column_name
|
str
|
The name of the column to process. |
required |
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A DataFrame containing a single column with a map of distinct values and their counts. |
Examples:
>>> data = [("Alice",), ("Bob",), ("Alice",), ("Eve",), (None,)]
>>> sdf = spark.createDataFrame(data, ["name"])
>>> result = distinct_value_counts_map(sdf, "name")
>>> result.show(truncate=False)
+--------------------------+
|name_map |
+--------------------------+
|{Alice -> 2, Bob -> 1, Eve -> 1, NONE -> 1}|
+--------------------------+
Source code in pysparky/transformations/general.py
execute_transformation_blueprint(sdf, blueprint)
Executes a transformation blueprint on a Spark DataFrame.
The transformation blueprint is a dictionary where keys are column names and values are the corresponding transformations to apply. The function applies each transformation in the order specified by the blueprint and returns the resulting DataFrame with the transformed columns.
A transformation_blueprint is a dictionary that the key: new column name value: Column function
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sdf
|
DataFrame
|
The input DataFrame to be transformed. |
required |
blueprint
|
Dict[str, Column]
|
A dictionary of column names as keys and transformation functions as values. |
required |
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
The resulting DataFrame with the transformed columns. |
Example
sdf.transform(execute_transformation_blueprint, processing_blueprint).show()
Source code in pysparky/transformations/general.py
filters(sdf, conditions, operator_='and')
Apply multiple filter conditions to a Spark DataFrame.
This function takes a Spark DataFrame, a list of conditions, and an optional operator. It returns a new DataFrame with all conditions applied using the specified operator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sdf
|
DataFrame
|
The input Spark DataFrame to be filtered. |
required |
conditions
|
list[Column]
|
A list of Column expressions representing the filter conditions. |
required |
operator_
|
Callable
|
The operator to use for combining
conditions. Defaults to |
'and'
|
Returns:
Type | Description |
---|---|
DataFrame
|
pyspark.sql.DataFrame: A new DataFrame with all filter conditions applied. |
Raises:
Type | Description |
---|---|
ValueError
|
If an unsupported operator is provided. |
Examples:
>>> from pyspark.sql.functions import col
>>> df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['id', 'letter'])
>>> conditions = [col('id') > 1, col('letter').isin(['b', 'c'])]
Filter using AND (default behavior)
>>> filtered_df_and = filters(df, conditions)
>>> filtered_df_and.show()
+---+------+
| id|letter|
+---+------+
| 2| b|
| 3| c|
+---+------+
Filter using OR
>>> filtered_df_or = filters(df, conditions, or_)
>>> filtered_df_or.show()
+---+------+
| id|letter|
+---+------+
| 2| b|
| 3| c|
| 1| a|
+---+------+
Source code in pysparky/transformations/general.py
get_unique_values(df1, df2, column_name)
Unions two DataFrames and returns a DataFrame with unique values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df1
|
DataFrame
|
First DataFrame. |
required |
df2
|
DataFrame
|
Second DataFrame. |
required |
column_name
|
str
|
The column name containing the values. |
required |
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
A DataFrame with unique values. |
Examples:
spark = SparkSession.builder.appName("UniqueValues").getOrCreate()
df1 = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
df2 = spark.createDataFrame([(3,), (4,), (5,)], ["value"])
unique_values = get_unique_values(df1, df2, "value")
unique_values.show()
Source code in pysparky/transformations/general.py
set_columns_to_null_based_on_condition(df, condition_column, condition_value, target_columns)
Sets specified columns to null based on a condition in the given DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrame
|
The input DataFrame. |
required |
condition_column
|
str
|
The name of the column containing the condition value. |
required |
condition_value
|
str
|
The value indicating the condition to set columns to null. |
required |
target_columns
|
Tuple[str]
|
The tuple of columns to be set as null if the condition value is found. |
required |
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
The updated DataFrame with specified columns set to null based on the condition. |
Examples:
>>> data = [
... (1, 0, 0, 0),
... (2, 0, 1, 0),
... (3, 1, 1, 1),
... (4, 1, 0, 1),
... (5, 0, 0, 0),
... ]
>>> columns = ["ID", "Dummy1", "Dummy2", "Dummy3"]
>>> df = spark.createDataFrame(data, columns)
>>> condition_column = "Dummy1"
>>> condition_value = 1
>>> target_columns = ("Dummy2", "Dummy3")
>>> result_df = set_columns_to_null_based_on_condition(df, condition_column, condition_value, target_columns)
>>> result_df.show()
+---+------+-------+-------+
| ID|Dummy1|Dummy2 |Dummy3 |
+---+------+-------+-------+
| 1| 0| 0| 0|
| 2| 0| 1| 0|
| 3| 1| null| null|
| 4| 1| null| null|
| 5| 0| 0| 0|
+---+------+-------+-------+
Source code in pysparky/transformations/general.py
transforms(sdf, transformations)
Apply a series of transformations to a Spark DataFrame.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sdf
|
DataFrame
|
The input Spark DataFrame to be transformed. |
required |
transformations
|
list
|
A list of transformations, where each transformation is a tuple containing a function and a dictionary of keyword arguments to apply the function to. |
required |
Returns:
Name | Type | Description |
---|---|---|
DataFrame |
DataFrame
|
The transformed Spark DataFrame. |