Spark Predicate Pushdown#
This notebook demonstrates how to verify prediate pushdown from Spark to Vast DB.
Load Endpoint Environment Variables#
These environment variables have been set when your docker container was created.
import os
DOCKER_HOST_OR_IP = os.getenv("DOCKER_HOST_OR_IP")
VASTDB_ENDPOINT = os.getenv("VASTDB_ENDPOINT")
VASTDB_ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY")
VASTDB_SECRET_KEY = os.getenv("VASTDB_SECRET_KEY")
VASTDB_TWITTER_INGEST_BUCKET = os.getenv("VASTDB_TWITTER_INGEST_BUCKET")
VASTDB_TWITTER_INGEST_SCHEMA = os.getenv("VASTDB_TWITTER_INGEST_SCHEMA")
VASTDB_TWITTER_INGEST_TABLE = os.getenv("VASTDB_TWITTER_INGEST_TABLE")
S3_ENDPOINT = os.getenv("S3A_ENDPOINT")
S3_ACCESS_KEY = os.getenv("S3A_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3A_SECRET_KEY")
S3A_ICEBERG_URI = os.getenv("S3A_ICEBERG_URI")
print(f"""
---
DOCKER_HOST_OR_IP={DOCKER_HOST_OR_IP}
---
VASTDB_ENDPOINT={VASTDB_ENDPOINT}
VASTDB_ACCESS_KEY={VASTDB_ACCESS_KEY[-4:]}
VASTDB_SECRET_KEY=****{VASTDB_SECRET_KEY[-4:]}
VASTDB_TWITTER_INGEST_BUCKET={VASTDB_TWITTER_INGEST_BUCKET}
VASTDB_TWITTER_INGEST_SCHEMA={VASTDB_TWITTER_INGEST_SCHEMA}
VASTDB_TWITTER_INGEST_TABLE={VASTDB_TWITTER_INGEST_TABLE}
---
S3_ENDPOINT={S3_ENDPOINT}
S3_ACCESS_KEY={S3_ACCESS_KEY[-4:]}
S3_SECRET_KEY=****{VASTDB_SECRET_KEY[-4:]}
S3A_ICEBERG_URI={S3A_ICEBERG_URI}
---
""")
---
DOCKER_HOST_OR_IP=10.143.11.241
---
VASTDB_ENDPOINT=http://172.200.204.2:80
VASTDB_ACCESS_KEY=QXN5
VASTDB_SECRET_KEY=****oLGr
VASTDB_TWITTER_INGEST_BUCKET=csnow-db
VASTDB_TWITTER_INGEST_SCHEMA=social_media
VASTDB_TWITTER_INGEST_TABLE=tweets
---
S3_ENDPOINT=http://172.200.204.2:80
S3_ACCESS_KEY=QXN5
S3_SECRET_KEY=****oLGr
S3A_ICEBERG_URI=s3a://csnow-bucket/iceberg/
---
Specify other Environment Variables#
SPARK_APPLICATION_NAME='Spark Demo'
Start Spark Session#
import socket
import os
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pandas as pd
pd.set_option("max_colwidth", 150)
conf = SparkConf()
conf.setAll([
("spark.driver.host", socket.gethostbyname(socket.gethostname())),
("spark.sql.execution.arrow.pyspark.enabled", "false"),
# VASTDB
("spark.sql.catalog.ndb", 'spark.sql.catalog.ndb.VastCatalog'),
("spark.ndb.endpoint", VASTDB_ENDPOINT),
("spark.ndb.data_endpoints", VASTDB_ENDPOINT),
("spark.ndb.access_key_id", VASTDB_ACCESS_KEY),
("spark.ndb.secret_access_key", VASTDB_SECRET_KEY),
("spark.driver.extraClassPath", '/usr/local/spark/jars/spark3-vast-3.4.1-f93839bfa38a/*'),
("spark.executor.extraClassPath", '/usr/local/spark/jars/spark3-vast-3.4.1-f93839bfa38a/*'),
("spark.sql.extensions", 'ndb.NDBSparkSessionExtension'),
# ICEBERG
("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog"),
("spark.sql.catalog.iceberg.type", "hive"),
("spark.sql.catalog.iceberg.uri", f"thrift://{DOCKER_HOST_OR_IP}:9083"),
# S3A
("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"),
("fs.s3a.endpoint", S3_ENDPOINT),
("fs.s3a.access.key", S3_ACCESS_KEY),
("fs.s3a.secret.key", S3_SECRET_KEY),
("fs.s3a.endpoint.region", "vast"),
("fs.s3a.connection.ssl.enabled", "false"),
# Hive
("hive.metastore.uris", f"thrift://{DOCKER_HOST_OR_IP}:9083"),
])
spark = SparkSession.builder \
.master("local") \
.appName(SPARK_APPLICATION_NAME) \
.config(conf=conf) \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("DEBUG")
import logging
# Set logging for a specific class/package
logging.getLogger("com.example.HelloWorldCatalog").setLevel(logging.DEBUG)
print("Spark successfully loaded")
Spark successfully loaded
spark
SparkSession - hive
Pushdown created_at (Int 64) greater than predicate#
df = spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
WHERE created_at > 123456
""")
df.explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('created_at > 123456)
+- 'UnresolvedRelation [ndb, csnow-db, social_media, tweets], [], false
== Analyzed Logical Plan ==
created_at: bigint, id: bigint, id_str: string, text: string
Project [created_at#29L, id#30L, id_str#31, text#32]
+- Filter (created_at#29L > cast(123456 as bigint))
+- SubqueryAlias ndb.`csnow-db`.social_media.tweets
+- RelationV2[created_at#29L, id#30L, id_str#31, text#32] ndb.`csnow-db`.social_media.tweets csnow-db/social_media/tweets
== Optimized Logical Plan ==
Filter (isnotnull(created_at#29L) AND (created_at#29L > 123456))
+- RelationV2[created_at#29L, id#30L, id_str#31, text#32] csnow-db/social_media/tweets
== Physical Plan ==
*(1) ColumnarToRow
+- BatchScan csnow-db/social_media/tweets[created_at#29L, id#30L, id_str#31, text#32] VastScan{schemed_name=(csnow-db/social_media/tweets, -1935090858), pushed_down_limit=null, pushed_down_predicates=[[created_at IS NOT NULL], [created_at > 123456]]} RuntimeFilters: []
IMPORTANT:
Note the following in the Physical Plan: pushed_down_predicates=[[created_at IS NOT NULL], [created_at > 123456]]
This demonstrates that the predicate created_at > 123456
WAS pushed down to Vast DB and taking advantage of filtering the dataset BEFORE returning it to Spark.
Pushdown text (String) equality predicate#
df = spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
WHERE text = 'a'
""")
df.explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('text = a)
+- 'UnresolvedRelation [ndb, csnow-db, social_media, tweets], [], false
== Analyzed Logical Plan ==
created_at: bigint, id: bigint, id_str: string, text: string
Project [created_at#41L, id#42L, id_str#43, text#44]
+- Filter (text#44 = a)
+- SubqueryAlias ndb.`csnow-db`.social_media.tweets
+- RelationV2[created_at#41L, id#42L, id_str#43, text#44] ndb.`csnow-db`.social_media.tweets csnow-db/social_media/tweets
== Optimized Logical Plan ==
Filter (isnotnull(text#44) AND (text#44 = a))
+- RelationV2[created_at#41L, id#42L, id_str#43, text#44] csnow-db/social_media/tweets
== Physical Plan ==
*(1) ColumnarToRow
+- BatchScan csnow-db/social_media/tweets[created_at#41L, id#42L, id_str#43, text#44] VastScan{schemed_name=(csnow-db/social_media/tweets, -1731650491), pushed_down_limit=null, pushed_down_predicates=[[text IS NOT NULL], [text = 'a']]} RuntimeFilters: []
IMPORTANT:
Note the following in the Physical Plan: pushed_down_predicates=[[text IS NOT NULL], [text = 'a']]
This demonstrates that the predicate text = 'a'
WAS pushed down to Vast DB and taking advantage of filtering the dataset BEFORE returning it to Spark.
Pushdown text (String) LIKE predicate#
df = spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
WHERE text like 'a%'
""")
df.explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter 'text LIKE a%
+- 'UnresolvedRelation [ndb, csnow-db, social_media, tweets], [], false
== Analyzed Logical Plan ==
created_at: bigint, id: bigint, id_str: string, text: string
Project [created_at#53L, id#54L, id_str#55, text#56]
+- Filter text#56 LIKE a%
+- SubqueryAlias ndb.`csnow-db`.social_media.tweets
+- RelationV2[created_at#53L, id#54L, id_str#55, text#56] ndb.`csnow-db`.social_media.tweets csnow-db/social_media/tweets
== Optimized Logical Plan ==
Filter (isnotnull(text#56) AND StartsWith(text#56, a))
+- RelationV2[created_at#53L, id#54L, id_str#55, text#56] csnow-db/social_media/tweets
== Physical Plan ==
*(1) Filter StartsWith(text#56, a)
+- *(1) ColumnarToRow
+- BatchScan csnow-db/social_media/tweets[created_at#53L, id#54L, id_str#55, text#56] VastScan{schemed_name=(csnow-db/social_media/tweets, -198082486), pushed_down_limit=null, pushed_down_predicates=[[text IS NOT NULL]]} RuntimeFilters: []
IMPORTANT:
Note the following in the Physical Plan: pushed_down_predicates=[[text IS NOT NULL]]
This demonstrates that the predicate text like 'a'
was NOT pushed down to Vast DB and therefore does not take advantage of filtering the dataset before returning it to Spark.
Pushdown text (String) substring (LIKE %a%) predicate#
df = spark.sql("""
SELECT * FROM ndb.`csnow-db`.social_media.tweets
WHERE substring(text, 1, 1) = 'a'
""")
df.explain(True)
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('substring('text, 1, 1) = a)
+- 'UnresolvedRelation [ndb, csnow-db, social_media, tweets], [], false
== Analyzed Logical Plan ==
created_at: bigint, id: bigint, id_str: string, text: string
Project [created_at#65L, id#66L, id_str#67, text#68]
+- Filter (substring(text#68, 1, 1) = a)
+- SubqueryAlias ndb.`csnow-db`.social_media.tweets
+- RelationV2[created_at#65L, id#66L, id_str#67, text#68] ndb.`csnow-db`.social_media.tweets csnow-db/social_media/tweets
== Optimized Logical Plan ==
Filter (isnotnull(text#68) AND (substring(text#68, 1, 1) = a))
+- RelationV2[created_at#65L, id#66L, id_str#67, text#68] csnow-db/social_media/tweets
== Physical Plan ==
*(1) Filter (substring(text#68, 1, 1) = a)
+- *(1) ColumnarToRow
+- BatchScan csnow-db/social_media/tweets[created_at#65L, id#66L, id_str#67, text#68] VastScan{schemed_name=(csnow-db/social_media/tweets, 101095318), pushed_down_limit=null, pushed_down_predicates=[[text IS NOT NULL]]} RuntimeFilters: []
IMPORTANT:
Note the following in the Physical Plan: pushed_down_predicates=[[text IS NOT NULL]]
This demonstrates that the predicate substring(text, 1, 1) = 'a'
was NOT pushed down to Vast DB and therefore does not take advantage of filtering the dataset before returning it to Spark.
Social Media Dataset#