Spark Query Time Merge (id)#
Handling Duplicates and Latest Updates with Apache Spark#
See also
In this notebook, we explore how to manage duplicates at query time and retain the latest updates with the help of Apache Spark’s window functions. These powerful functions enable you to:
Partition the data based on a unique identifier (
id
).Order the records by a timestamp (or another field to determine the most recent entry).
Apply row numbering to isolate and filter the latest record.
By leveraging this approach, we can efficiently handle large datasets while ensuring that only the most up-to-date information is processed.
1. Setup Spark Session#
To begin with, we initialize a Spark session, which will allow us to interact with Spark using DataFrames and SQL.
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("UpsertExample") \
.getOrCreate()
2. Sample Data#
Let’s create some sample data that simulates a scenario where records with the same id
are inserted multiple times, but with different timestamps.
from pyspark.sql import Row
# Sample data with possible duplicates
data = [
Row(id=1, name="Alice", timestamp="2024-11-18 10:00:00"),
Row(id=2, name="Bob", timestamp="2024-11-18 11:00:00"),
Row(id=1, name="Alice Updated", timestamp="2024-11-18 12:00:00"), # Duplicate ID with later timestamp
Row(id=3, name="Charlie", timestamp="2024-11-18 09:00:00"),
Row(id=2, name="Bob Updated", timestamp="2024-11-18 12:00:00") # Duplicate ID with later timestamp
]
# Create DataFrame
df = spark.createDataFrame(data)
df.show(truncate=False)
+---+-------------+-------------------+
|id |name |timestamp |
+---+-------------+-------------------+
|1 |Alice |2024-11-18 10:00:00|
|2 |Bob |2024-11-18 11:00:00|
|1 |Alice Updated|2024-11-18 12:00:00|
|3 |Charlie |2024-11-18 09:00:00|
|2 |Bob Updated |2024-11-18 12:00:00|
+---+-------------+-------------------+
3. Remove Duplicates and Keep the Latest Record#
Next, we’ll use a window function to remove duplicates and keep only the latest record for each id
.
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
# Define a window specification to partition by 'id' and order by 'timestamp' descending (latest first)
window_spec = Window.partitionBy("id").orderBy(col("timestamp").desc())
# Add a row number column to select the latest record for each 'id'
df_with_rownum = df.withColumn("row_num", row_number().over(window_spec))
# Filter the rows where row_num = 1, which gives us the latest record for each id
latest_df = df_with_rownum.filter(col("row_num") == 1).drop("row_num")
# Show the result
latest_df.show(truncate=False)
+---+-------------+-------------------+
|id |name |timestamp |
+---+-------------+-------------------+
|1 |Alice Updated|2024-11-18 12:00:00|
|2 |Bob Updated |2024-11-18 12:00:00|
|3 |Charlie |2024-11-18 09:00:00|
+---+-------------+-------------------+
4. Why is the Window Function Required?#
The window function is crucial for handling this scenario. Here’s why:
a) Partitioning Data#
The Window.partitionBy("id")
part of the window function groups the data by the id
column. This allows us to handle each id
independently and apply operations (like sorting by timestamp) to only the records with the same id
.
b) Sorting Within Partitions#
The orderBy(col("timestamp").desc())
ensures that records within each partition (grouped by id
) are ordered by timestamp
in descending order. This sorting is important because we need to select the most recent (latest) record for each id
.
c) Row Number Assignment#
By using row_number().over(window_spec)
, we assign a row number to each record in the partition. This row number reflects the position of each record based on the descending timestamp order. The most recent record for each id
will receive the row number 1.
d) Filtering for the Latest Record#
After assigning the row numbers, we filter the rows where row_num == 1
. This ensures that we keep only the latest record for each id
, effectively removing any earlier duplicates.
Optimizing Data Processing with Filters#
Reading the entire dataset is usually not practical or necessary, especially when dealing with large-scale data. In real-world scenarios, it’s common to apply filters to narrow down the scope of data processing. For instance:
Filter by Time Window:
You can include a filter to process only data within a specific time range. This is particularly useful for incremental processing or handling recent changes:filtered_df = df.filter((col("timestamp") >= "2024-01-01") & (col("timestamp") <= "2024-01-31"))
E.g.
filtered_df = df.filter((col("timestamp") >= "2024-01-01") & (col("timestamp") <= "2024-01-31")) window_spec = Window.partitionBy("id").orderBy(col("timestamp").desc()) df_with_rownum = filtered_df.withColumn("row_num", row_number().over(window_spec))
Filter by Entity:
When working with data tied to a specific entity (e.g., a user, customer, or region), you can filter by the relevant entity ID or group:filtered_df = df.filter(col("entity_id") == "12345")
E.g.
filtered_df = df.filter(col("entity_id") == "12345") window_spec = Window.partitionBy("id").orderBy(col("timestamp").desc()) df_with_rownum = filtered_df.withColumn("row_num", row_number().over(window_spec))
By applying these filters before leveraging window functions, you can optimize performance, reduce resource usage, and focus on the subset of data that matters most for your use case.
Conclusion#
In this notebook, we demonstrated how to handle duplicates and keep the latest updates using Apache Spark’s window functions. The window function allows you to:
Partition data by a unique identifier (
id
).Sort the records by a timestamp (or any other field to determine the latest).
Use row numbering to identify and filter out the most recent record.
This method is highly scalable and efficient, especially when working with large datasets, and ensures that only the most recent data is processed.