Skip to content

Dedup

get_latest_record_from_column(sdf, window_partition_column_name, window_order_by_column_names, window_function=F.row_number)

Fetches the most recent record from a DataFrame based on a specified column, allowing for custom sorting order.

Parameters:

Name Type Description Default
sdf DataFrame

The DataFrame to process.

required
window_partition_column_name str

The column used to partition the DataFrame.

required
window_order_by_column_names str | list

The column(s) used to sort the DataFrame.

required
window_function Column

The window function for ranking records. Defaults to F.row_number.

row_number

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame with the most recent record for each partition.

Source code in pysparky/transformations/dedup.py
@decorator.extension_enabler(DataFrame)
def get_latest_record_from_column(
    sdf: DataFrame,
    window_partition_column_name: str,
    window_order_by_column_names: str | list[str],
    window_function: Callable = F.row_number,
) -> DataFrame:
    """
    Fetches the most recent record from a DataFrame based on a specified column, allowing for custom sorting order.

    Parameters:
        sdf (DataFrame): The DataFrame to process.
        window_partition_column_name (str): The column used to partition the DataFrame.
        window_order_by_column_names (str | list): The column(s) used to sort the DataFrame.
        window_function (Column, optional): The window function for ranking records. Defaults to F.row_number.

    Returns:
        DataFrame: A DataFrame with the most recent record for each partition.
    """
    window_order_by_column_names = enabler.ensure_list(window_order_by_column_names)

    return (
        sdf.withColumn(
            "temp",
            window_function().over(
                Window.partitionBy(window_partition_column_name).orderBy(
                    *window_order_by_column_names
                )
            ),
        )
        .filter(F.col("temp") == 1)
        .drop("temp")
    )

get_only_duplicate_record(sdf, column_name)

Retrieves only the duplicate records from the input DataFrame based on the specified column.

Parameters:

Name Type Description Default
sdf DataFrame

The input Spark DataFrame.

required
column_name str

The column name to check for duplicates.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing only the duplicate records.

Examples:

>>> data = [(1, "A"), (2, "B"), (3, "C"), (1, "A"), (4, "D"), (2, "B")]
>>> sdf = spark.createDataFrame(data, ["id", "value"])
>>> duplicate_records = get_only_duplicate_record(sdf, "id")
>>> duplicate_records.show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  1|    A|
|  2|    B|
|  2|    B|
+---+-----+
Source code in pysparky/transformations/dedup.py
def get_only_duplicate_record(sdf: DataFrame, column_name: str) -> DataFrame:
    """
    Retrieves only the duplicate records from the input DataFrame
    based on the specified column.

    Args:
        sdf (DataFrame): The input Spark DataFrame.
        column_name (str): The column name to check for duplicates.

    Returns:
        DataFrame: A DataFrame containing only the duplicate records.

    Examples:
        ``` py
        >>> data = [(1, "A"), (2, "B"), (3, "C"), (1, "A"), (4, "D"), (2, "B")]
        >>> sdf = spark.createDataFrame(data, ["id", "value"])
        >>> duplicate_records = get_only_duplicate_record(sdf, "id")
        >>> duplicate_records.show()
        +---+-----+
        | id|value|
        +---+-----+
        |  1|    A|
        |  1|    A|
        |  2|    B|
        |  2|    B|
        +---+-----+
        ```
    """
    _, duplicate_records_sdf = quarantine_duplicate_record(sdf, column_name)
    return duplicate_records_sdf

get_only_unique_record(sdf, column_name)

Retrieves only the unique records from the input DataFrame based on the specified column.

Parameters:

Name Type Description Default
sdf DataFrame

The input Spark DataFrame.

required
column_name str

The column name to check for duplicates.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing only the unique records.

Examples:

>>> data = [(1, "A"), (2, "B"), (3, "C"), (1, "A"), (4, "D"), (2, "B")]
>>> sdf = spark.createDataFrame(data, ["id", "value"])
>>> unique_records = get_only_unique_record(sdf, "id")
>>> unique_records.show()
+---+-----+
| id|value|
+---+-----+
|  3|    C|
|  4|    D|
+---+-----+
Source code in pysparky/transformations/dedup.py
def get_only_unique_record(sdf: DataFrame, column_name: str) -> DataFrame:
    """
    Retrieves only the unique records from the input DataFrame
    based on the specified column.

    Args:
        sdf (DataFrame): The input Spark DataFrame.
        column_name (str): The column name to check for duplicates.

    Returns:
        DataFrame: A DataFrame containing only the unique records.

    Examples:
        ``` py
        >>> data = [(1, "A"), (2, "B"), (3, "C"), (1, "A"), (4, "D"), (2, "B")]
        >>> sdf = spark.createDataFrame(data, ["id", "value"])
        >>> unique_records = get_only_unique_record(sdf, "id")
        >>> unique_records.show()
        +---+-----+
        | id|value|
        +---+-----+
        |  3|    C|
        |  4|    D|
        +---+-----+
        ```
    """
    unique_records_sdf, _ = quarantine_duplicate_record(sdf, column_name)
    return unique_records_sdf

quarantine_duplicate_record(sdf, column_name)

Splits the input DataFrame into two DataFrames: one containing unique records based on the specified column, and the other containing duplicate records.

Parameters:

Name Type Description Default
sdf DataFrame

The input Spark DataFrame.

required
column_name str

The column name to check for duplicates.

required

Returns:

Type Description
DataFrame

tuple[DataFrame, DataFrame]: A tuple containing two DataFrames. The first DataFrame

DataFrame

contains unique records, and the second DataFrame contains duplicate records.

Examples:

>>> data = [(1, "A"), (2, "B"), (3, "C"), (1, "A"), (4, "D"), (2, "B")]
>>> sdf = spark.createDataFrame(data, ["id", "value"])
>>> unique_records, duplicate_records = quarantine_duplicate_record(sdf, "id")
>>> unique_records.show()
+---+-----+
| id|value|
+---+-----+
|  3|    C|
|  4|    D|
+---+-----+
>>> duplicate_records.show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  1|    A|
|  2|    B|
|  2|    B|
+---+-----+
Source code in pysparky/transformations/dedup.py
def quarantine_duplicate_record(
    sdf: DataFrame, column_name: str
) -> tuple[DataFrame, DataFrame]:
    """
    Splits the input DataFrame into two DataFrames: one containing unique records
    based on the specified column, and the other containing duplicate records.

    Args:
        sdf (DataFrame): The input Spark DataFrame.
        column_name (str): The column name to check for duplicates.

    Returns:
        tuple[DataFrame, DataFrame]: A tuple containing two DataFrames. The first DataFrame
        contains unique records, and the second DataFrame contains duplicate records.

    Examples:
        ``` python
        >>> data = [(1, "A"), (2, "B"), (3, "C"), (1, "A"), (4, "D"), (2, "B")]
        >>> sdf = spark.createDataFrame(data, ["id", "value"])
        >>> unique_records, duplicate_records = quarantine_duplicate_record(sdf, "id")
        >>> unique_records.show()
        +---+-----+
        | id|value|
        +---+-----+
        |  3|    C|
        |  4|    D|
        +---+-----+
        >>> duplicate_records.show()
        +---+-----+
        | id|value|
        +---+-----+
        |  1|    A|
        |  1|    A|
        |  2|    B|
        |  2|    B|
        +---+-----+
        ```
    """
    window_spec = Window.partitionBy(column_name)
    with_count_sdf = sdf.withColumn("count", F.count(column_name).over(window_spec))
    unique_records_sdf = with_count_sdf.filter(F.col("count") == 1).drop("count")
    duplicate_records_sdf = with_count_sdf.filter(F.col("count") > 1).drop("count")

    return unique_records_sdf, duplicate_records_sdf