Spark Predicate Pushdown#

This notebook demonstrates how to verify prediate pushdown from Spark to Vast DB.

Load Endpoint Environment Variables#

These environment variables have been set when your docker container was created.

import os

DOCKER_HOST_OR_IP = os.getenv("DOCKER_HOST_OR_IP")

VASTDB_ENDPOINT = os.getenv("VASTDB_ENDPOINT")
VASTDB_ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY")
VASTDB_SECRET_KEY = os.getenv("VASTDB_SECRET_KEY")

VASTDB_TWITTER_INGEST_BUCKET = os.getenv("VASTDB_TWITTER_INGEST_BUCKET")
VASTDB_TWITTER_INGEST_SCHEMA = os.getenv("VASTDB_TWITTER_INGEST_SCHEMA")
VASTDB_TWITTER_INGEST_TABLE = os.getenv("VASTDB_TWITTER_INGEST_TABLE")

S3_ENDPOINT = os.getenv("S3A_ENDPOINT")
S3_ACCESS_KEY = os.getenv("S3A_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3A_SECRET_KEY")

S3A_ICEBERG_URI = os.getenv("S3A_ICEBERG_URI")

print(f"""
---
DOCKER_HOST_OR_IP={DOCKER_HOST_OR_IP}
---
VASTDB_ENDPOINT={VASTDB_ENDPOINT}
VASTDB_ACCESS_KEY={VASTDB_ACCESS_KEY[-4:]}
VASTDB_SECRET_KEY=****{VASTDB_SECRET_KEY[-4:]}
VASTDB_TWITTER_INGEST_BUCKET={VASTDB_TWITTER_INGEST_BUCKET}
VASTDB_TWITTER_INGEST_SCHEMA={VASTDB_TWITTER_INGEST_SCHEMA}
VASTDB_TWITTER_INGEST_TABLE={VASTDB_TWITTER_INGEST_TABLE}
---
S3_ENDPOINT={S3_ENDPOINT}
S3_ACCESS_KEY={S3_ACCESS_KEY[-4:]}
S3_SECRET_KEY=****{VASTDB_SECRET_KEY[-4:]}
S3A_ICEBERG_URI={S3A_ICEBERG_URI}
---
""")
---
DOCKER_HOST_OR_IP=10.143.11.241
---
VASTDB_ENDPOINT=http://172.200.204.2:80
VASTDB_ACCESS_KEY=QXN5
VASTDB_SECRET_KEY=****oLGr
VASTDB_TWITTER_INGEST_BUCKET=csnow-db
VASTDB_TWITTER_INGEST_SCHEMA=social_media
VASTDB_TWITTER_INGEST_TABLE=tweets
---
S3_ENDPOINT=http://172.200.204.2:80
S3_ACCESS_KEY=QXN5
S3_SECRET_KEY=****oLGr
S3A_ICEBERG_URI=s3a://csnow-bucket/iceberg/
---

Specify other Environment Variables#

SPARK_APPLICATION_NAME='Spark Demo'

Start Spark Session#

import socket
import os
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pandas as pd
pd.set_option("max_colwidth", 150)

conf = SparkConf()
conf.setAll([
    ("spark.driver.host", socket.gethostbyname(socket.gethostname())),
    ("spark.sql.execution.arrow.pyspark.enabled", "false"),
     # VASTDB
    ("spark.sql.catalog.ndb", 'spark.sql.catalog.ndb.VastCatalog'),
    ("spark.ndb.endpoint", VASTDB_ENDPOINT),
    ("spark.ndb.data_endpoints", VASTDB_ENDPOINT),
    ("spark.ndb.access_key_id", VASTDB_ACCESS_KEY),
    ("spark.ndb.secret_access_key", VASTDB_SECRET_KEY),
    ("spark.driver.extraClassPath", '/usr/local/spark/jars/spark3-vast-3.4.1-f93839bfa38a/*'),
    ("spark.executor.extraClassPath", '/usr/local/spark/jars/spark3-vast-3.4.1-f93839bfa38a/*'),
    ("spark.sql.extensions", 'ndb.NDBSparkSessionExtension'),
    # ICEBERG
    ("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.iceberg.type", "hive"),
    ("spark.sql.catalog.iceberg.uri", f"thrift://{DOCKER_HOST_OR_IP}:9083"),
    # S3A
    ("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
    ("fs.s3a.endpoint", S3_ENDPOINT),
    ("fs.s3a.access.key", S3_ACCESS_KEY),
    ("fs.s3a.secret.key", S3_SECRET_KEY),
    ("fs.s3a.endpoint.region", "vast"),
    ("fs.s3a.connection.ssl.enabled", "false"),
    # Hive
    ("hive.metastore.uris", f"thrift://{DOCKER_HOST_OR_IP}:9083"),
])

spark = SparkSession.builder \
    .master("local") \
    .appName(SPARK_APPLICATION_NAME) \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("DEBUG")

import logging

# Set logging for a specific class/package
logging.getLogger("com.example.HelloWorldCatalog").setLevel(logging.DEBUG)

print("Spark successfully loaded")
Spark successfully loaded
spark

SparkSession - hive

SparkContext

Spark UI

Version
v3.4.4
Master
local
AppName
Spark Demo

Social Media Dataset#

spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
""").show(truncate=False)
+-------------+--------------------+--------------------+---------------------------------------------------------+
|created_at   |id                  |id_str              |text                                                     |
+-------------+--------------------+--------------------+---------------------------------------------------------+
|1732208274768|-7643870142078251013|-7643870142078251013|so excited about how clever MobileDevelopment is!        |
|1732208274768|1020245429851211082 |1020245429851211082 |can't believe how interesting AI is!                     |
|1732208274768|2617468679894608857 |2617468679894608857 |prepared for how beautiful CloudEngineer is!             |
|1732208274869|-3100632458456822   |-3100632458456822   |eager to see how amazing WebDevelopment is!              |
|1732208274869|-6907312571603061960|-6907312571603061960|looking forward to see how lovely SoftwareDevelopment is!|
|1732208274869|-3012906547616722072|-3012906547616722072|finally got how beautiful Agile is!                      |
|1732208274869|-4196485847106647115|-4196485847106647115|eager to see how beautiful Agile is!                     |
|1732208274869|-4032196169367521327|-4032196169367521327|can't wait to see how fantastic Security is!             |
|1732208274869|-6418918754352525203|-6418918754352525203|inspired by how lovely Security is!                      |
|1732208274869|1603970341201682038 |1603970341201682038 |impressed with how nice DevOpsEngineering is!            |
|1732208274869|5381813079630643176 |5381813079630643176 |inspired by how kind IoT is!                             |
|1732208274869|-6993575410907140870|-6993575410907140870|just discovered how fantastic OpenSource is!             |
|1732208274869|-8297742997571818506|-8297742997571818506|blown away by how brilliant SecurityEngineering is!      |
|1732208274869|-680635112296625023 |-680635112296625023 |just discovered how perfect AI is!                       |
|1732208274971|-4712879233424671375|-4712879233424671375|can't wait to see how beautiful Blockchain is!           |
|1732208274971|1126266839692793283 |1126266839692793283 |eager to see how clever CloudEngineer is!                |
|1732208274971|7004216700398130448 |7004216700398130448 |blown away by how interesting Agile is!                  |
|1732208274971|6523371880127967106 |6523371880127967106 |looking forward to see how funny OpenSource is!          |
|1732208274971|-1253123398442262397|-1253123398442262397|just discovered how nice OpenSource is!                  |
|1732208274971|882386532907502832  |882386532907502832  |amazed by how funny WebDevelopment is!                   |
+-------------+--------------------+--------------------+---------------------------------------------------------+
only showing top 20 rows

Pushdown created_at (Int 64) greater than predicate#

df = spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
WHERE created_at > 123456
""")

df.explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('created_at > 123456)
   +- 'UnresolvedRelation [ndb, csnow-db, social_media, tweets], [], false

== Analyzed Logical Plan ==
created_at: bigint, id: bigint, id_str: string, text: string
Project [created_at#29L, id#30L, id_str#31, text#32]
+- Filter (created_at#29L > cast(123456 as bigint))
   +- SubqueryAlias ndb.`csnow-db`.social_media.tweets
      +- RelationV2[created_at#29L, id#30L, id_str#31, text#32] ndb.`csnow-db`.social_media.tweets csnow-db/social_media/tweets

== Optimized Logical Plan ==
Filter (isnotnull(created_at#29L) AND (created_at#29L > 123456))
+- RelationV2[created_at#29L, id#30L, id_str#31, text#32] csnow-db/social_media/tweets

== Physical Plan ==
*(1) ColumnarToRow
+- BatchScan csnow-db/social_media/tweets[created_at#29L, id#30L, id_str#31, text#32] VastScan{schemed_name=(csnow-db/social_media/tweets, -1935090858), pushed_down_limit=null, pushed_down_predicates=[[created_at IS NOT NULL], [created_at > 123456]]} RuntimeFilters: []

IMPORTANT:

Note the following in the Physical Plan: pushed_down_predicates=[[created_at IS NOT NULL], [created_at > 123456]]

This demonstrates that the predicate created_at > 123456 WAS pushed down to Vast DB and taking advantage of filtering the dataset BEFORE returning it to Spark.

Pushdown text (String) equality predicate#

df = spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
WHERE text = 'a'
""")

df.explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('text = a)
   +- 'UnresolvedRelation [ndb, csnow-db, social_media, tweets], [], false

== Analyzed Logical Plan ==
created_at: bigint, id: bigint, id_str: string, text: string
Project [created_at#41L, id#42L, id_str#43, text#44]
+- Filter (text#44 = a)
   +- SubqueryAlias ndb.`csnow-db`.social_media.tweets
      +- RelationV2[created_at#41L, id#42L, id_str#43, text#44] ndb.`csnow-db`.social_media.tweets csnow-db/social_media/tweets

== Optimized Logical Plan ==
Filter (isnotnull(text#44) AND (text#44 = a))
+- RelationV2[created_at#41L, id#42L, id_str#43, text#44] csnow-db/social_media/tweets

== Physical Plan ==
*(1) ColumnarToRow
+- BatchScan csnow-db/social_media/tweets[created_at#41L, id#42L, id_str#43, text#44] VastScan{schemed_name=(csnow-db/social_media/tweets, -1731650491), pushed_down_limit=null, pushed_down_predicates=[[text IS NOT NULL], [text = 'a']]} RuntimeFilters: []

IMPORTANT:

Note the following in the Physical Plan: pushed_down_predicates=[[text IS NOT NULL], [text = 'a']]

This demonstrates that the predicate text = 'a' WAS pushed down to Vast DB and taking advantage of filtering the dataset BEFORE returning it to Spark.

Pushdown text (String) LIKE predicate#

df = spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
WHERE text like 'a%'
""")

df.explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter 'text LIKE a%
   +- 'UnresolvedRelation [ndb, csnow-db, social_media, tweets], [], false

== Analyzed Logical Plan ==
created_at: bigint, id: bigint, id_str: string, text: string
Project [created_at#53L, id#54L, id_str#55, text#56]
+- Filter text#56 LIKE a%
   +- SubqueryAlias ndb.`csnow-db`.social_media.tweets
      +- RelationV2[created_at#53L, id#54L, id_str#55, text#56] ndb.`csnow-db`.social_media.tweets csnow-db/social_media/tweets

== Optimized Logical Plan ==
Filter (isnotnull(text#56) AND StartsWith(text#56, a))
+- RelationV2[created_at#53L, id#54L, id_str#55, text#56] csnow-db/social_media/tweets

== Physical Plan ==
*(1) Filter StartsWith(text#56, a)
+- *(1) ColumnarToRow
   +- BatchScan csnow-db/social_media/tweets[created_at#53L, id#54L, id_str#55, text#56] VastScan{schemed_name=(csnow-db/social_media/tweets, -198082486), pushed_down_limit=null, pushed_down_predicates=[[text IS NOT NULL]]} RuntimeFilters: []

IMPORTANT:

Note the following in the Physical Plan: pushed_down_predicates=[[text IS NOT NULL]]

This demonstrates that the predicate text like 'a' was NOT pushed down to Vast DB and therefore does not take advantage of filtering the dataset before returning it to Spark.

Pushdown text (String) substring (LIKE %a%) predicate#

df = spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
WHERE substring(text, 1, 1) = 'a'
""")

df.explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('substring('text, 1, 1) = a)
   +- 'UnresolvedRelation [ndb, csnow-db, social_media, tweets], [], false

== Analyzed Logical Plan ==
created_at: bigint, id: bigint, id_str: string, text: string
Project [created_at#65L, id#66L, id_str#67, text#68]
+- Filter (substring(text#68, 1, 1) = a)
   +- SubqueryAlias ndb.`csnow-db`.social_media.tweets
      +- RelationV2[created_at#65L, id#66L, id_str#67, text#68] ndb.`csnow-db`.social_media.tweets csnow-db/social_media/tweets

== Optimized Logical Plan ==
Filter (isnotnull(text#68) AND (substring(text#68, 1, 1) = a))
+- RelationV2[created_at#65L, id#66L, id_str#67, text#68] csnow-db/social_media/tweets

== Physical Plan ==
*(1) Filter (substring(text#68, 1, 1) = a)
+- *(1) ColumnarToRow
   +- BatchScan csnow-db/social_media/tweets[created_at#65L, id#66L, id_str#67, text#68] VastScan{schemed_name=(csnow-db/social_media/tweets, 101095318), pushed_down_limit=null, pushed_down_predicates=[[text IS NOT NULL]]} RuntimeFilters: []

IMPORTANT:

Note the following in the Physical Plan: pushed_down_predicates=[[text IS NOT NULL]]

This demonstrates that the predicate substring(text, 1, 1) = 'a' was NOT pushed down to Vast DB and therefore does not take advantage of filtering the dataset before returning it to Spark.