Spark Operations Guide#
Intro#
In the first section of this guide, we are going to perform data import to VAST-DB tables, for migration or other use-cases in which the data is exist on a tabular files - either locally or on S3 and should be ingested to VAST-DB tables.
The second part will walk you through a common SQL operations to demonstrate basic & advance sql-related usage on VastDB tables, using the spark-sql driver.
Prerequisites#
Spark 3.4.1 with scala 2.13 download
Java 1.8.0
VAST 4.7 or above
Spark is up and running with master and worker(s) & configured with VAST-DB
VAST-DB Bucket’s created & users have the needed user-identity-policy and roles to use database buckets.
S3 User Access & Secret Keys on VAST cluster KB article
Tabular Identity Policy with the proper permissions KB article
This is an example Identity Policy which will allow all S3 actions on all resources/buckets:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
}
]
}
Spark Connector - Configuring VastDB with Spark vanilla#
Configuring Spark connector’s jars#
The spark-connector jars can be found in one of your VAST cluster’s CNODE’s
Specifically, inside the vast_platform docker container
Copy the spark-connector jar’s from the container to outside /tmp dir & create a tar.gz:
docker cp vast_platform:/vast/spark3 /tmp/spark3
tar -xczf spark_vastdb.tar.gz /tmp/spark3/*
Untar and copy to your host, where the spark is running.
Place it inside a dir which is easy to access, like: /opt/spark_vastdb
Using the examples below, you will see how a new spark session can be submitted, together with the spark_vastdb jars dir.
Configuring Spark global config with VastDB#
The Spark default config should have the next configuration block, in order to interact with VastDB:
endpoint is the IP address of the VAST Cluster
data_endpoint is the list of all IPs in the Virtual IP pool
#[VastDB]
spark.ndb.endpoint=http://172.19.124.1
spark.ndb.data_endpoints=http://172.19.124.1,http://172.19.124.2,http://172.19.124.3,http://172.19.124.4,http://172.19.124.5,http://172.19.124.6,http://172.19.124.7,http://172.19.124.8
spark.ndb.access_key_id=RW60SSX5SI..
spark.ndb.secret_access_key=/XSDz0N8S2FXuR..
spark.ndb.num_of_splits=8
spark.ndb.num_of_sub_splits=8
spark.ndb.rowgroups_per_subsplit=1
spark.ndb.query_data_rows_per_split=4000000
spark.ndb.retry_max_count=3
spark.ndb.retry_sleep_duration=1
spark.ndb.parallel_import=true
spark.ndb.dynamic_filter_compaction_threshold=100
spark.ndb.dynamic_filtering_wait_timeout=2
spark.sql.catalog.ndb=spark.sql.catalog.ndb.VastCatalog
spark.sql.extensions=ndb.NDBSparkSessionExtension
spark.python.authenticate.socketTimeout=1m
Running the PySpark data-frame insert#
Data-frame Spark write method Apache Spark docs. We are going to use a short PySpark scripts to make it more simple to follow.
Importing multiple Parquet files to VAST-DB table#
A way to run the script :
spark-submit --master spark:master-ip:7077 \
<pyspark_ingest_to_vastdb_df.py> \
<bucket-name> <database-name> <table-name> <files-dir>
The script will loop through all files with ‘parquet’ extension in the directory you have passed (recursively), and will try to read & ingest the retrieved data frame to a VastDB table.
The bucket needs to be created in advance, from the VAST-VMS UI, but Schema & Table names can be created by the script, by passing the names you want.
NOTE : If the Schema and Table is already created, it will append the data to the table, if columns schema match.
This script is accepting from the user
VAST-DB bucket name
DB/Schema name
Table name
The parquet file path
Absolute path
from pyspark.sql import SparkSession
import sys
import os
import glob
import time
if __name__ == "__main__":
if len(sys.argv) != 5:
print("Usage: spark-submit <pyspark_ingest_to_vastdb_df.py> <bucket-name> <database-name> <table-name> <files-dir>", file=sys.stderr)
exit(-1)
bucket_name, database_name, table_name, files_dir = sys.argv[1:5]
spark = SparkSession.builder \
.appName("PySpark Data Ingestion to VASTDB Catalog") \
.getOrCreate()
# Creating the database (schema)
spark.sql(f"CREATE DATABASE IF NOT EXISTS `ndb`.`{bucket_name}`.`{database_name}`")
# Base directory containing Parquet files
read_directory = files_dir
catalog_table_name = f"`ndb`.`{bucket_name}`.`{database_name}`.`{table_name}`"
# Ingest data for each Parquet file in the directory
total_start_time = time.time()
for file_path in glob.glob(os.path.join(read_directory, '**', '*.parquet'), recursive=True):
try:
file_start_time = time.time()
df = spark.read.parquet(file_path)
df.write.mode("append").saveAsTable(catalog_table_name)
file_end_time = time.time()
print(f"File {os.path.basename(file_path)} ingested in {file_end_time - file_start_time} seconds")
except Exception as e:
print(f"An error occurred while processing {file_path}: {str(e)}", file=sys.stderr)
total_end_time = time.time()
print(f"Total ingestion time: {total_end_time - total_start_time} seconds")
spark.stop()
Querying with Spark SQL in VAST Database with TPCDS data#
This demonstrates using Spark SQL / Py-spark SQL to query tables in the VAST Database. The examples will illustrate various SQL operations, from basic to complex.
Spawning Spark session using a Py-spark driver with VastDB#
A Python interface for Apache Spark, allowing for data processing and analysis using Spark’s features within Python scripts.
spark-submit --master spark://master-ip:7077 \
--driver-class-path $(echo /opt/spark_vastdb/*.jar | tr ' ' ':') \
--conf spark.executor.extraClassPath=$(echo /opt/spark_vastdb/*.jar | tr ' ' ':') \
--jars $(echo /opt/spark_vastdb/*.jar | tr ' ' ',') \
--conf spark.executor.userClassPathFirst=true \
--conf spark.driver.userClassPathFirst=true pyspark_transaction_example.py
Example: Running Py-Spark SQL with explicit transaction#
In PySpark, SQL functionality enables you to run SQL queries on DataFrames, allowing for complex data manipulation and analysis using familiar SQL syntax within a Python environment.
Using the spark.sql method you can run same sql operations / selects within a python script.
This approach is also enables the use of VastDB explicit transactions, to ensure ACID operations.
from pyspark.sql import SparkSession
# Initialize a Spark session
spark = SparkSession.builder.appName("VastDB SQL Transaction Example").getOrCreate()
try:
# Start transaction
spark.sql("SELECT ndb.create_tx()")
# Execute two insert queries atomically
spark.sql("""
INSERT INTO `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
(c_customer_id, c_first_name, c_last_name)
VALUES ('1003', 'John', 'Foo')
""")
spark.sql("""
INSERT INTO `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
(c_customer_id, c_first_name, c_last_name)
VALUES ('1004', 'Jane', 'Bar')
""")
# Commit the transaction
spark.sql("SELECT ndb.commit_tx()")
# Select to show added rows
updated_rows = spark.sql("""
SELECT c_customer_id, c_first_name, c_last_name
FROM `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
WHERE c_customer_id IN ('1003', '1004')
""")
updated_rows.show()
except Exception as e:
print("An error occurred:", e)
# Rollback the transaction in case of an error
spark.sql("SELECT ndb.rollback_tx()")
# Stop the Spark session
spark.stop()
Spawning Spark session using Spark-sql driver with VastDB#
Here we show how to use spark-sql CLI to - a command-line tool for directly executing SQL queries on a Spark cluster which is configured with VastDB spark connector
spark-sql --master spark:master-ip:7077 \
--driver-class-path $(echo /opt/spark_vastdb/*.jar | tr ' ' ':') \
--conf spark.executor.extraClassPath=$(echo /opt/spark_vastdb/*.jar | tr ' ' ':') \
--jars $(echo /opt/spark_vastdb/*.jar | tr ' ' ',') \
--conf spark.executor.userClassPathFirst=true \
--conf spark.driver.userClassPathFirst=true \
--conf spark.sql.catalogImplementation=in-memory
Basic Data Retrieval#
Querying a Single Table
Operation: Retrieve specific columns.
SELECT c_customer_id, c_first_name, c_last_name
FROM `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
LIMIT 10;
Explanation: Retrieves the first 10 records from the customer table, including customer ID, first name, and last name.
Filtering Data
Operation: Select data with conditions.
SELECT * FROM `ndb`.`vastdb-bucket`.`tpcds_schema`.`store_sales`
WHERE ss_ticket_number = 12345;
Explanation: Retrieves records from store_sales where the ticket number is 12345.
Aggregate Functions#
Summarizing Data
Operation: Aggregate data.
SELECT cs_item_sk, SUM(cs_net_profit) as total_profit
FROM `ndb`.`vastdb-bucket`.`tpcds_schema`.`catalog_sales`
GROUP BY cs_item_sk;
Explanation: Calculates the total net profit for each item in catalog_sales. Join Operations
Simple Join
Operation: Combining data from multiple tables.
SELECT c.c_customer_id, c.c_first_name, c.c_last_name, ca.ca_street_name, ca.ca_city
FROM `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer` c
JOIN `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer_address` ca
ON c.c_current_addr_sk = ca.ca_address_sk;
Explanation: Joins customer and customer_address tables to retrieve customer details along with their addresses.
Complex Queries#
Multi-table Join with Filtering and Sorting
Operation: Advanced join with filtering and ordering.
SELECT
c.c_customer_id,
c.c_first_name,
c.c_last_name,
ss.ss_sales_price,
ss.ss_quantity,
i.i_item_id,
i.i_item_desc,
i.i_category
FROM
`ndb`.`vastdb-bucket`.`tpcds_schema`.`customer` c
JOIN
`ndb`.`vastdb-bucket`.`tpcds_schema`.`store_sales` ss
ON c.c_customer_sk = ss.ss_customer_sk
JOIN
`ndb`.`vastdb-bucket`.`tpcds_schema`.`item` i
ON ss.ss_item_sk = i.i_item_sk
ORDER BY
ss.ss_sales_price DESC;
Explanation: A multi-join query using the customer, store_sales, and item tables, retrieves a detailed view of the sales transactions, combining customer information with details of the items they’ve purchased.
Data & Schema manipulations:#
Inserting Data
Insert a Single Row
INSERT INTO `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
(c_customer_id, c_first_name, c_last_name)
VALUES ('1001', 'John', 'Doe');
Insert Multiple Rows
INSERT INTO `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
(c_customer_id, c_first_name, c_last_name)
VALUES ('1002', 'Alice', 'Smith'), ('1003', 'Bob', 'Johnson');
Deleting Data
Delete Specific Rows
DELETE FROM `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
WHERE c_customer_id = '1003';
Delete with Condition
DELETE FROM `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
WHERE c_last_name = 'Johnson';
Altering Table Schema#
Add a New Column
ALTER TABLE `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
ADD COLUMN c_phone_number STRING;
Drop an Existing Column
```sql
ALTER TABLE `ndb`.`vastdb-bucket`.`tpcds_schema`.`customer`
DROP COLUMN c_phone_number;
Monitoring SQL operations on Spark-driver UI#
The Spark UI, accessible at http://[driver-node]:4040, is an essential tool for monitoring Spark SQL queries.
SQL Tab: Displays a list of executed Spark SQL queries with status, duration, and detailed execution plans.
Query Details: Displays insights into logical and physical plans, job and stage info, and execution metrics like time and memory usage.
Performance Metrics: Shows execution times, task counts, shuffle read/write metrics, and memory usage for identifying bottlenecks.
Resource Utilization: Helps in tuning spark resource allocation by displaying how each executor contributes to query execution.