Skip to content

general

chain(self, func, *args, **kwargs)

Applies a given function to the current Column and returns the result.

This method allows for chaining operations on a Column object by applying a custom function with additional arguments. It's particularly useful for creating complex transformations or applying user-defined functions to a Column.

Parameters:

Name Type Description Default
self Column

The current Column object.

required
func callable

The function to apply to the Column.

required
*args

Variable length argument list to pass to the function.

()
**kwargs

Arbitrary keyword arguments to pass to the function.

{}

Returns:

Name Type Description
Column Column

A new Column object resulting from applying the function.

Example
>>> df = spark.createDataFrame([("hello",)], ["text"])
>>> def custom_upper(col):
...     return F.upper(col)
>>> result = df.withColumn("upper_text", df.text.chain(custom_upper))
>>> result.show()
+-----+----------+
| text|upper_text|
+-----+----------+
|hello|     HELLO|
+-----+----------+

>>> def add_prefix(col, prefix):
...     return F.concat(F.lit(prefix), col)
>>> result = df.withColumn("prefixed_text", df.text.chain(add_prefix, prefix="Pre: "))
>>> result.show()
+-----+-------------+
| text|prefixed_text|
+-----+-------------+
|hello|   Pre: hello|
+-----+-------------+
Note

The function passed to chain should expect a Column as its first argument, followed by any additional arguments specified in the chain call.

Source code in pysparky/functions/general.py
@decorator.extension_enabler(Column)
def chain(self, func, *args, **kwargs) -> Column:
    """
    Applies a given function to the current Column and returns the result.

    This method allows for chaining operations on a Column object by applying
    a custom function with additional arguments. It's particularly useful for
    creating complex transformations or applying user-defined functions to a Column.

    Args:
        self (Column): The current Column object.
        func (callable): The function to apply to the Column.
        *args: Variable length argument list to pass to the function.
        **kwargs: Arbitrary keyword arguments to pass to the function.

    Returns:
        Column: A new Column object resulting from applying the function.

    Example:
        ```python
        >>> df = spark.createDataFrame([("hello",)], ["text"])
        >>> def custom_upper(col):
        ...     return F.upper(col)
        >>> result = df.withColumn("upper_text", df.text.chain(custom_upper))
        >>> result.show()
        +-----+----------+
        | text|upper_text|
        +-----+----------+
        |hello|     HELLO|
        +-----+----------+

        >>> def add_prefix(col, prefix):
        ...     return F.concat(F.lit(prefix), col)
        >>> result = df.withColumn("prefixed_text", df.text.chain(add_prefix, prefix="Pre: "))
        >>> result.show()
        +-----+-------------+
        | text|prefixed_text|
        +-----+-------------+
        |hello|   Pre: hello|
        +-----+-------------+
        ```

    Note:
        The function passed to `chain` should expect a Column as its first argument,
        followed by any additional arguments specified in the `chain` call.
    """
    return func(self, *args, **kwargs)

get_value_from_map(column_or_name, dict_)

Retrieves a value from a map (dictionary) using a key derived from a specified column in a DataFrame.

This function creates a map from the provided dictionary and then looks up the value in the map corresponding to the key that matches the value in the specified column.

Parameters:

Name Type Description Default
column_or_name str

The name of the column in the DataFrame whose value will be used as the key to look up in the map.

required
dict_ dict

A dictionary where keys and values are the elements to be used in the map.

required

Returns:

Name Type Description
Column Column

A PySpark Column object representing the value retrieved from the map.

Example
>>> map = {1: 'a', 2: 'b'}
>>> column_name = 'key_column'
>>> df = spark.createDataFrame([(1,), (2,)], ['key_column'])
>>> df.withColumn('value', get_value_from_map(map, column_name)).show()
+----------+-----+
|key_column|value|
+----------+-----+
|         1|    a|
|         2|    b|
+----------+-----+
Source code in pysparky/functions/general.py
@decorator.extension_enabler(Column)
def get_value_from_map(column_or_name: ColumnOrName, dict_: dict) -> Column:
    """
    Retrieves a value from a map (dictionary) using a key derived from a specified column in a DataFrame.

    This function creates a map from the provided dictionary and then looks up the value in the map
    corresponding to the key that matches the value in the specified column.

    Args:
        column_or_name (str): The name of the column in the DataFrame whose value will be used as the key to look up in the map.
        dict_ (dict): A dictionary where keys and values are the elements to be used in the map.

    Returns:
        Column: A PySpark Column object representing the value retrieved from the map.

    Example:
        ```python
        >>> map = {1: 'a', 2: 'b'}
        >>> column_name = 'key_column'
        >>> df = spark.createDataFrame([(1,), (2,)], ['key_column'])
        >>> df.withColumn('value', get_value_from_map(map, column_name)).show()
        +----------+-----+
        |key_column|value|
        +----------+-----+
        |         1|    a|
        |         2|    b|
        +----------+-----+
        ```
    """
    (column,) = ensure_column(column_or_name)

    return utils.create_map_from_dict(dict_)[column]

lower_(col)

This serve as an easy Examples on how this package work

Parameters:

Name Type Description Default
col Column

The column to be lowercased.

required

Returns:

Name Type Description
Column Column

A lowercased column.

Example
>>> df = spark.createDataFrame([("Hello",)], ["text"])
>>> df.select(lower_(F.col("text"))).show()
+-----------+
|lower(text)|
+-----------+
|      hello|
+-----------+
Source code in pysparky/functions/general.py
@decorator.extension_enabler(Column)
def lower_(col: Column) -> Column:
    """
    This serve as an easy Examples on how this package work

    Args:
        col (Column): The column to be lowercased.

    Returns:
        Column: A lowercased column.

    Example:
        ```python
        >>> df = spark.createDataFrame([("Hello",)], ["text"])
        >>> df.select(lower_(F.col("text"))).show()
        +-----------+
        |lower(text)|
        +-----------+
        |      hello|
        +-----------+
        ```
    """
    return F.lower(col)

replace_strings_to_none(column_or_name, list_of_null_string, customize_output=None)

Replaces empty string values in a column with None.

Parameters:

Name Type Description Default
column_or_name ColumnOrName

The column to check for empty string values.

required

Returns:

Name Type Description
Column Column

A Spark DataFrame column with the values replaced.

Example
>>> df = spark.createDataFrame([("",), ("foo",), (None,)], ["col"])
>>> df.select(replace_strings_to_none(F.col("col"), [""]).alias("cleaned")).show()
+-------+
|cleaned|
+-------+
|   null|
|    foo|
|   null|
+-------+
Source code in pysparky/functions/general.py
@decorator.extension_enabler(Column)
def replace_strings_to_none(
    column_or_name: ColumnOrName,
    list_of_null_string: list[str],
    customize_output: Any = None,
) -> pyspark.sql.Column:
    """
    Replaces empty string values in a column with None.

    Args:
        column_or_name (ColumnOrName): The column to check for empty string values.

    Returns:
        Column: A Spark DataFrame column with the values replaced.

    Example:
        ```python
        >>> df = spark.createDataFrame([("",), ("foo",), (None,)], ["col"])
        >>> df.select(replace_strings_to_none(F.col("col"), [""]).alias("cleaned")).show()
        +-------+
        |cleaned|
        +-------+
        |   null|
        |    foo|
        |   null|
        +-------+
        ```
    """

    (column,) = ensure_column(column_or_name)

    return F.when(column.isin(list_of_null_string), customize_output).otherwise(column)

single_space_and_trim(column_or_name)

Replaces multiple white spaces with a single space and trims the column.

Parameters:

Name Type Description Default
column_or_name Column

The column to be adjusted.

required

Returns:

Name Type Description
Column Column

A trimmed column with single spaces.

Example
>>> df = spark.createDataFrame([("  foo   bar  ",)], ["text"])
>>> df.select(single_space_and_trim(F.col("text")).alias("cleaned")).show()
+-------+
|cleaned|
+-------+
|foo bar|
+-------+
Source code in pysparky/functions/general.py
@decorator.extension_enabler(Column)
def single_space_and_trim(column_or_name: ColumnOrName) -> Column:
    """
    Replaces multiple white spaces with a single space and trims the column.

    Args:
        column_or_name (Column): The column to be adjusted.

    Returns:
        Column: A trimmed column with single spaces.

    Example:
        ```python
        >>> df = spark.createDataFrame([("  foo   bar  ",)], ["text"])
        >>> df.select(single_space_and_trim(F.col("text")).alias("cleaned")).show()
        +-------+
        |cleaned|
        +-------+
        |foo bar|
        +-------+
        ```
    """

    return F.trim(F.regexp_replace(column_or_name, r"\s+", " "))

when_mapping(column_or_name, dict_)

Applies a series of conditional mappings to a PySpark Column based on a dictionary of conditions and values.

Parameters:

Name Type Description Default
column Column

The PySpark Column to which the conditional mappings will be applied.

required
dict_ Dict

A dictionary where keys are the conditions and values are the corresponding results.

required

Returns:

Name Type Description
Column Column

A new PySpark Column with the conditional mappings applied.

Example
>>> df = spark.createDataFrame([("A",), ("B",), ("C",)], ["category"])
>>> mapping = {"A": 1, "B": 2}
>>> df.select("category", when_mapping(F.col("category"), mapping).alias("mapped")).show()
+--------+------+
|category|mapped|
+--------+------+
|       A|     1|
|       B|     2|
|       C|  null|
+--------+------+
Source code in pysparky/functions/general.py
@decorator.extension_enabler(Column)
def when_mapping(column_or_name: ColumnOrName, dict_: dict) -> Column:
    """
    Applies a series of conditional mappings to a PySpark Column based on a dictionary of conditions and values.

    Args:
        column (Column): The PySpark Column to which the conditional mappings will be applied.
        dict_ (Dict): A dictionary where keys are the conditions and values are the corresponding results.

    Returns:
        Column: A new PySpark Column with the conditional mappings applied.

    Example:
        ```python
        >>> df = spark.createDataFrame([("A",), ("B",), ("C",)], ["category"])
        >>> mapping = {"A": 1, "B": 2}
        >>> df.select("category", when_mapping(F.col("category"), mapping).alias("mapped")).show()
        +--------+------+
        |category|mapped|
        +--------+------+
        |       A|     1|
        |       B|     2|
        |       C|  null|
        +--------+------+
        ```
    """
    (column,) = ensure_column(column_or_name)

    def reducer(result_column: Column, condition_value: tuple[Any, Any]) -> Column:
        condition, value = condition_value
        return result_column.when(column == condition, value)

    result_column: Column = functools.reduce(reducer, dict_.items(), F)  # type: ignore
    return result_column