Spark Upserts#
For this example, we will use Spark Connect.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SparkConnectExample") \
.remote("sc://10.141.16.107:15002") \
.getOrCreate()
import pandas as pd
Specify the objects we are working with.
db = "main-bucket"
schema = "cc_schema"
table = "adida"
catalog_table_name = f"ndb.`{db}`.{schema}.{table}"
Show schemas#
Verify the schema exists.
print(spark.sql(f"SHOW SCHEMAS IN ndb").show(truncate=False))
+---------------------------+
|namespace |
+---------------------------+
|`main-bucket`.yuvalschema |
|`main-bucket`.cc_schema |
|streamingdb.streamingschema|
+---------------------------+
None
Read Data From Table#
Verify the data in the table.
print(spark.sql(f"SELECT * FROM ndb.`{db}`.{schema}.{table}").show(truncate=False))
+-------+---+-------------+
|Name |Age|City |
+-------+---+-------------+
|Bob |30 |San Francisco|
|David |28 |Chicago |
|Charlie|35 |Los Angeles |
|Alice |25 |New York |
+-------+---+-------------+
None
Creating source_df#
Create a dataset with an update and a new row.
from pyspark.sql import Row
source_data = [
Row(Name="Charlie", Age=36, City="San Diego"), # existing, modified (Age + City changed)
Row(Name="Eve", Age=23, City="Austin") # new row
]
source_df = spark.createDataFrame(source_data)
Apply Upsert Function#
Use the vastdb_upsert
and generate_where_clause_arbitrary
library functions. These are defined at the end of the notebook.
vastdb_upsert(
f"ndb.`{db}`.{schema}.{table}",
source_df,
generate_where_clause_arbitrary(source_df, ["Name"], ["Name"])
)
Examine The results#
print(spark.sql(f"SELECT * FROM ndb.`{db}`.{schema}.{table}").show(truncate=False))
+-------+---+-------------+
|Name |Age|City |
+-------+---+-------------+
|Bob |30 |San Francisco|
|David |28 |Chicago |
|Alice |25 |New York |
|Charlie|36 |San Diego |
|Eve |23 |Austin |
+-------+---+-------------+
None
Library Functions#
NOTE: These will be moved to the vastdb SDK so they can be made available by:
! pip install vastdb
from vastdb.spark import vastdb_upsert, generate_where_clause_arbitrary
Upsert Function#
def vastdb_upsert(target_table_name, source_df, sql_where_cond):
# verify that all the source df matches the where condition
if source_df.count() > source_df.filter(sql_where_cond).count():
raise Exception()
# Delete matching records
spark.sql("DELETE from " + target_table_name + " WHERE " + sql_where_cond)
# Insert updated records
source_df.write.mode("append").saveAsTable(target_table_name)
generate_where_clause_arbitrary(source_df, ["Name"], ["Name"])
"(Name = 'Charlie') OR (Name = 'Eve')"
Generate Where Clause Function#
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def generate_where_clause_arbitrary(df, source_columns, target_columns):
"""
Generates a SQL WHERE clause that checks if target columns match any combination of values
from the specified source columns in the input DataFrame.
Args:
df (pyspark.sql.DataFrame): The DataFrame containing the source values.
source_columns (list): A list of source column names.
target_columns (list): A list of target column names.
Returns:
str: A SQL WHERE clause string.
"""
if len(source_columns) != len(target_columns):
raise ValueError("Number of source and target columns must match.")
source_values = [tuple(row) for row in df.select(*source_columns).distinct().collect()]
if not source_values:
return "1=0"
conditions = []
for value_tuple in source_values:
individual_conditions = []
for i, value in enumerate(value_tuple):
individual_conditions.append(f"{target_columns[i]} = '{value}'")
conditions.append("(" + " AND ".join(individual_conditions) + ")")
where_clause = " OR ".join(conditions)
return where_clause
# Example DataFrame
data = [("USA", "New York", 123), ("Canada", "Toronto", 456), ("USA", "Los Angeles", 789), ("France", "Paris", 101), ("Canada", "Ottawa", 112)]
df = spark.createDataFrame(data, ["c_country", "c_city", "zip"])
# Example usage with country and city
source_cols = ["c_country", "c_city"]
target_cols = ["customer_country", "customer_city"]
where_clause = generate_where_clause_arbitrary(df, source_cols, target_cols)
print("Generated WHERE clause (country and city):", where_clause)
# Example usage with country, city, and zip
source_cols_all = ["c_country", "c_city", "zip"]
target_cols_all = ["customer_country", "customer_city", "customer_zip"]
where_clause_all = generate_where_clause_arbitrary(df, source_cols_all, target_cols_all)
print("Generated WHERE clause (country, city, and zip):", where_clause_all)
# Example usage with a different dataframe.
data2 = [("USA", "New York", 123, "A"), ("Germany", "Berlin", 456, "B"), ("Canada", "Toronto", 456, "C"), ("USA", "Los Angeles", 789, "D"), ("France", "Paris", 101, "E"), ("Canada", "Ottawa", 112, "F"), ("USA", "Chicago", 999, "G")]
df2 = spark.createDataFrame(data2, ["customer_country", "customer_city", "customer_zip", "id"])
# Example of how to use the where clause.
filtered_df = df2.where(F.expr(where_clause_all))
filtered_df.show()
Generated WHERE clause (country and city): (customer_country = 'USA' AND customer_city = 'New York') OR (customer_country = 'Canada' AND customer_city = 'Toronto') OR (customer_country = 'USA' AND customer_city = 'Los Angeles') OR (customer_country = 'France' AND customer_city = 'Paris') OR (customer_country = 'Canada' AND customer_city = 'Ottawa')
Generated WHERE clause (country, city, and zip): (customer_country = 'USA' AND customer_city = 'New York' AND customer_zip = '123') OR (customer_country = 'Canada' AND customer_city = 'Toronto' AND customer_zip = '456') OR (customer_country = 'USA' AND customer_city = 'Los Angeles' AND customer_zip = '789') OR (customer_country = 'France' AND customer_city = 'Paris' AND customer_zip = '101') OR (customer_country = 'Canada' AND customer_city = 'Ottawa' AND customer_zip = '112')
+----------------+-------------+------------+---+
|customer_country|customer_city|customer_zip| id|
+----------------+-------------+------------+---+
| USA| New York| 123| A|
| Canada| Toronto| 456| C|
| USA| Los Angeles| 789| D|
| France| Paris| 101| E|
| Canada| Ottawa| 112| F|
+----------------+-------------+------------+---+