Spark Query Time Merge (non-id)#

Handling Duplicates and Latest Updates with Apache Spark#

In this notebook, we explore how to manage duplicates at query time and retain the latest updates with the help of Apache Spark’s window functions. These powerful functions enable you to:

  • Partition the data based on one or more key fields (e.g., a unique identifier or combinations like first_name and last_name).

  • Order the records by a timestamp (or any other field) to determine the most recent entry.

  • Apply row numbering to isolate and filter the latest record for each unique combination of key fields.

By leveraging this approach, we can efficiently handle large datasets, even when records are inserted only or identified by non-ID fields, while ensuring that only the most up-to-date information is processed.

Handling Multiple Non-ID Fields (e.g., first_name and last_name)#

In cases where records are identified by a combination of multiple fields (like first_name and last_name), the window function can be adjusted to partition by both of these fields. This approach will allow us to select the latest record for each unique combination of first_name and last_name.

1. Setup Spark Session#

To begin with, we initialize a Spark session, which will allow us to interact with Spark using DataFrames and SQL.

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("UpsertExample") \
    .getOrCreate()

2. Sample Data#

We’ll create data where records are identified by both first_name and last_name, and some records may have duplicate combinations with different timestamps.

from pyspark.sql import Row

# Sample data with possible duplicates based on first_name and last_name
data = [
    Row(first_name="Alice", last_name="Smith", timestamp="2024-11-18 10:00:00"),
    Row(first_name="Bob", last_name="Johnson", timestamp="2024-11-18 11:00:00"),
    Row(first_name="Alice", last_name="Smith", timestamp="2024-11-18 12:00:00"),  # Duplicate with later timestamp
    Row(first_name="Charlie", last_name="Brown", timestamp="2024-11-18 09:00:00"),
    Row(first_name="Bob", last_name="Johnson", timestamp="2024-11-18 12:00:00")  # Duplicate with later timestamp
]

# Create DataFrame
df = spark.createDataFrame(data)
df.show(truncate=False)
+----------+---------+-------------------+
|first_name|last_name|timestamp          |
+----------+---------+-------------------+
|Alice     |Smith    |2024-11-18 10:00:00|
|Bob       |Johnson  |2024-11-18 11:00:00|
|Alice     |Smith    |2024-11-18 12:00:00|
|Charlie   |Brown    |2024-11-18 09:00:00|
|Bob       |Johnson  |2024-11-18 12:00:00|
+----------+---------+-------------------+

3. Window Function with Multiple Partition Keys#

Now, instead of partitioning by id, we will partition by both first_name and last_name. We will still sort by timestamp in descending order to pick the latest record for each unique combination.

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# Define a window specification to partition by 'first_name' and 'last_name' and order by 'timestamp' descending
window_spec = Window.partitionBy("first_name", "last_name").orderBy(col("timestamp").desc())

# Add a row number column to select the latest record for each (first_name, last_name) combination
df_with_rownum = df.withColumn("row_num", row_number().over(window_spec))

# Filter the rows where row_num = 1, which gives us the latest record for each combination
latest_df = df_with_rownum.filter(col("row_num") == 1).drop("row_num")

# Show the result
latest_df.show(truncate=False)
+----------+---------+-------------------+
|first_name|last_name|timestamp          |
+----------+---------+-------------------+
|Alice     |Smith    |2024-11-18 12:00:00|
|Bob       |Johnson  |2024-11-18 12:00:00|
|Charlie   |Brown    |2024-11-18 09:00:00|
+----------+---------+-------------------+

4. Why Use Multiple Partition Keys?#

When dealing with multiple non-ID fields (such as first_name and last_name), the logic changes slightly, but the concept remains the same. The window function allows us to:

  1. Partition by Multiple Fields:

    • Window.partitionBy("first_name", "last_name") groups the data by both first_name and last_name, effectively treating this combination as a unique key.

    • This ensures that we consider records with the same first_name and last_name as duplicates, even if their id is not available.

  2. Sort Within Each Partition:

    • We use orderBy(col("timestamp").desc()) to sort records within each partition (group of the same first_name and last_name) by timestamp in descending order. This ensures that the most recent record comes first.

  3. Assign Row Numbers:

    • row_number().over(window_spec) assigns a row number to each record within its partition. The most recent record will have the row number 1 because we sorted by the timestamp in descending order.

  4. Filter for Latest Record:

    • By filtering row_num == 1, we select only the latest record from each partition, ensuring that only the most recent entry for each unique combination of first_name and last_name is retained.

Optimizing Data Processing with Filters#

Reading the entire dataset is usually not practical or necessary, especially when dealing with large-scale data. In real-world scenarios, it’s common to apply filters to narrow down the scope of data processing. For instance:

  1. Filter by Time Window:
    You can include a filter to process only data within a specific time range. This is particularly useful for incremental processing or handling recent changes:

    filtered_df = df.filter((col("timestamp") >= "2024-01-01") & (col("timestamp") <= "2024-01-31"))
    

    E.g.

    filtered_df = df.filter((col("timestamp") >= "2024-01-01") & (col("timestamp") <= "2024-01-31"))
    window_spec = Window.partitionBy("id").orderBy(col("timestamp").desc())
    df_with_rownum = filtered_df.withColumn("row_num", row_number().over(window_spec))
    
  2. Filter by Entity:
    When working with data tied to a specific entity (e.g., a user, customer, or region), you can filter by the relevant entity ID or group:

    filtered_df = df.filter(col("entity_id") == "12345")
    

    E.g.

    filtered_df = df.filter(col("entity_id") == "12345")
    window_spec = Window.partitionBy("id").orderBy(col("timestamp").desc())
    df_with_rownum = filtered_df.withColumn("row_num", row_number().over(window_spec))
    

By applying these filters before leveraging window functions, you can optimize performance, reduce resource usage, and focus on the subset of data that matters most for your use case.

Conclusion: Handling Multiple Fields in Upsert Logic#

In this updated example, we demonstrated how to handle multiple non-ID fields (first_name and last_name) as keys to identify records. The window function’s flexibility allows you to partition by any combination of fields, not just a single identifier, making it suitable for scenarios where records might not have a unique ID but can be identified by other fields.

By using partitioning, sorting, and row numbering, we can efficiently deduplicate and ensure only the latest records are kept, regardless of whether the data has a unique identifier like id or is based on other attributes like names.

Would you like to explore additional use cases or need further explanations about the Spark functions used here?