Spark Upserts#

For this example, we will use Spark Connect.

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("SparkConnectExample") \
    .remote("sc://10.141.16.107:15002") \
    .getOrCreate()
import pandas as pd

Specify the objects we are working with.

db = "main-bucket"
schema = "cc_schema"
table = "adida"
catalog_table_name = f"ndb.`{db}`.{schema}.{table}"

Show schemas#

Verify the schema exists.

print(spark.sql(f"SHOW SCHEMAS IN ndb").show(truncate=False))
+---------------------------+
|namespace                  |
+---------------------------+
|`main-bucket`.yuvalschema  |
|`main-bucket`.cc_schema    |
|streamingdb.streamingschema|
+---------------------------+

None

Read Data From Table#

Verify the data in the table.

print(spark.sql(f"SELECT * FROM ndb.`{db}`.{schema}.{table}").show(truncate=False))
+-------+---+-------------+
|Name   |Age|City         |
+-------+---+-------------+
|Bob    |30 |San Francisco|
|David  |28 |Chicago      |
|Charlie|35 |Los Angeles  |
|Alice  |25 |New York     |
+-------+---+-------------+

None

Creating source_df#

Create a dataset with an update and a new row.

from pyspark.sql import Row

source_data = [
    Row(Name="Charlie", Age=36, City="San Diego"),  # existing, modified (Age + City changed)
    Row(Name="Eve", Age=23, City="Austin")          # new row
]
source_df = spark.createDataFrame(source_data)

Apply Upsert Function#

Use the vastdb_upsert and generate_where_clause_arbitrary library functions. These are defined at the end of the notebook.

vastdb_upsert(
  f"ndb.`{db}`.{schema}.{table}", 
  source_df, 
  generate_where_clause_arbitrary(source_df, ["Name"], ["Name"])
  )

Examine The results#

print(spark.sql(f"SELECT * FROM ndb.`{db}`.{schema}.{table}").show(truncate=False))
+-------+---+-------------+
|Name   |Age|City         |
+-------+---+-------------+
|Bob    |30 |San Francisco|
|David  |28 |Chicago      |
|Alice  |25 |New York     |
|Charlie|36 |San Diego    |
|Eve    |23 |Austin       |
+-------+---+-------------+

None

Library Functions#

NOTE: These will be moved to the vastdb SDK so they can be made available by:

! pip install vastdb
from vastdb.spark import vastdb_upsert, generate_where_clause_arbitrary

Upsert Function#

def vastdb_upsert(target_table_name, source_df, sql_where_cond):
    # verify that all the source df matches the where condition
	if source_df.count() > source_df.filter(sql_where_cond).count():
	   raise Exception()

	# Delete matching records  
	spark.sql("DELETE from " + target_table_name + " WHERE " + sql_where_cond)  

	# Insert updated records  
	source_df.write.mode("append").saveAsTable(target_table_name)  
generate_where_clause_arbitrary(source_df, ["Name"], ["Name"])
"(Name = 'Charlie') OR (Name = 'Eve')"

Generate Where Clause Function#

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

def generate_where_clause_arbitrary(df, source_columns, target_columns):
    """
    Generates a SQL WHERE clause that checks if target columns match any combination of values 
    from the specified source columns in the input DataFrame.

    Args:
        df (pyspark.sql.DataFrame): The DataFrame containing the source values.
        source_columns (list): A list of source column names.
        target_columns (list): A list of target column names.

    Returns:
        str: A SQL WHERE clause string.
    """

    if len(source_columns) != len(target_columns):
        raise ValueError("Number of source and target columns must match.")

    source_values = [tuple(row) for row in df.select(*source_columns).distinct().collect()]

    if not source_values:
        return "1=0"

    conditions = []
    for value_tuple in source_values:
        individual_conditions = []
        for i, value in enumerate(value_tuple):
            individual_conditions.append(f"{target_columns[i]} = '{value}'")
        conditions.append("(" + " AND ".join(individual_conditions) + ")")

    where_clause = " OR ".join(conditions)
    return where_clause



# Example DataFrame
data = [("USA", "New York", 123), ("Canada", "Toronto", 456), ("USA", "Los Angeles", 789), ("France", "Paris", 101), ("Canada", "Ottawa", 112)]
df = spark.createDataFrame(data, ["c_country", "c_city", "zip"])

# Example usage with country and city
source_cols = ["c_country", "c_city"]
target_cols = ["customer_country", "customer_city"]
where_clause = generate_where_clause_arbitrary(df, source_cols, target_cols)
print("Generated WHERE clause (country and city):", where_clause)

# Example usage with country, city, and zip
source_cols_all = ["c_country", "c_city", "zip"]
target_cols_all = ["customer_country", "customer_city", "customer_zip"]
where_clause_all = generate_where_clause_arbitrary(df, source_cols_all, target_cols_all)
print("Generated WHERE clause (country, city, and zip):", where_clause_all)

# Example usage with a different dataframe.
data2 = [("USA", "New York", 123, "A"), ("Germany", "Berlin", 456, "B"), ("Canada", "Toronto", 456, "C"), ("USA", "Los Angeles", 789, "D"), ("France", "Paris", 101, "E"), ("Canada", "Ottawa", 112, "F"), ("USA", "Chicago", 999, "G")]
df2 = spark.createDataFrame(data2, ["customer_country", "customer_city", "customer_zip", "id"])

# Example of how to use the where clause.
filtered_df = df2.where(F.expr(where_clause_all))
filtered_df.show()
Generated WHERE clause (country and city): (customer_country = 'USA' AND customer_city = 'New York') OR (customer_country = 'Canada' AND customer_city = 'Toronto') OR (customer_country = 'USA' AND customer_city = 'Los Angeles') OR (customer_country = 'France' AND customer_city = 'Paris') OR (customer_country = 'Canada' AND customer_city = 'Ottawa')
Generated WHERE clause (country, city, and zip): (customer_country = 'USA' AND customer_city = 'New York' AND customer_zip = '123') OR (customer_country = 'Canada' AND customer_city = 'Toronto' AND customer_zip = '456') OR (customer_country = 'USA' AND customer_city = 'Los Angeles' AND customer_zip = '789') OR (customer_country = 'France' AND customer_city = 'Paris' AND customer_zip = '101') OR (customer_country = 'Canada' AND customer_city = 'Ottawa' AND customer_zip = '112')
+----------------+-------------+------------+---+
|customer_country|customer_city|customer_zip| id|
+----------------+-------------+------------+---+
|             USA|     New York|         123|  A|
|          Canada|      Toronto|         456|  C|
|             USA|  Los Angeles|         789|  D|
|          France|        Paris|         101|  E|
|          Canada|       Ottawa|         112|  F|
+----------------+-------------+------------+---+