chain(self, func, *args, **kwargs)

Applies a given function to the current Column and returns the result.

This method allows for chaining operations on a Column object by applying a custom function with additional arguments. It's particularly useful for creating complex transformations or applying user-defined functions to a Column.


Name Type Description Default
self Column

The current Column object.

func callable

The function to apply to the Column.


Variable length argument list to pass to the function.


Arbitrary keyword arguments to pass to the function.



Name Type Description
Column Column

A new Column object resulting from applying the function.


>>> df = spark.createDataFrame([("hello",)], ["text"])
>>> def custom_upper(col):
...     return F.upper(col)
>>> result = df.withColumn("upper_text", df.text.chain(custom_upper))
| text|upper_text|
|hello|     HELLO|
>>> def add_prefix(col, prefix):
...     return F.concat(F.lit(prefix), col)
>>> result = df.withColumn("prefixed_text", df.text.chain(add_prefix, prefix="Pre: "))
| text|prefixed_text|
|hello|   Pre: hello|

The function passed to chain should expect a Column as its first argument, followed by any additional arguments specified in the chain call.

def chain(self, func, *args, **kwargs) -> Column:
    return func(self, *args, **kwargs)

get_value_from_map(column_or_name, dict_)

Retrieves a value from a map (dictionary) using a key derived from a specified column in a DataFrame.

This function creates a map from the provided dictionary and then looks up the value in the map corresponding to the key that matches the value in the specified column.


Name Type Description Default
column_or_name str

The name of the column in the DataFrame whose value will be used as the key to look up in the map.

dict_ dict

A dictionary where keys and values are the elements to be used in the map.



Name Type Description
Column Column

A PySpark Column object representing the value retrieved from the map.


>>> map = {1: 'a', 2: 'b'}
>>> column_name = 'key_column'
>>> df = spark.createDataFrame([(1,), (2,)], ['key_column'])
>>> df.withColumn('value', get_value_from_map(map, column_name)).show()
|         1|    a|
|         2|    b|
def get_value_from_map(column_or_name: ColumnOrName, dict_: dict) -> Column:
    (column,) = ensure_column(column_or_name)

    return utils.create_map_from_dict(dict_)[column]


This serve as an easy Examples on how this package work

def lower_(col: Column) -> Column:
    return F.lower(col)

replace_strings_to_none(column_or_name, list_of_null_string, customize_output=None)

Replaces empty string values in a column with None.


Name Type Description Default
column_or_name ColumnOrName

The column to check for empty string values.



Name Type Description
Column Column

A Spark DataFrame column with the values replaced.

def replace_strings_to_none(
    column_or_name: ColumnOrName,
    list_of_null_string: list[str],
    customize_output: Any = None,
) -> pyspark.sql.Column:
    (column,) = ensure_column(column_or_name)

    return F.when(column.isin(list_of_null_string), customize_output).otherwise(column)


Replaces multiple white spaces with a single space and trims the column.


Name Type Description Default
column_or_name Column

The column to be adjusted.



Name Type Description
Column Column

A trimmed column with single spaces.

def single_space_and_trim(column_or_name: ColumnOrName) -> Column:
    return F.trim(F.regexp_replace(column_or_name, r"\s+", " "))

when_mapping(column_or_name, dict_)

Applies a series of conditional mappings to a PySpark Column based on a dictionary of conditions and values.


Name Type Description Default
column Column

The PySpark Column to which the conditional mappings will be applied.

dict_ Dict

A dictionary where keys are the conditions and values are the corresponding results.



Name Type Description
Column Column

A new PySpark Column with the conditional mappings applied.

def when_mapping(column_or_name: ColumnOrName, dict_: dict) -> Column:
    (column,) = ensure_column(column_or_name)

    def reducer(result_column: Column, condition_value: tuple[Any, Any]) -> Column:
        condition, value = condition_value
        return result_column.when(column == condition, value)

    result_column: Column = functools.reduce(reducer, dict_.items(), F)  # type: ignore
    return result_column