Adding Spark libraries#
# The IP of the driver host
SPARK_DRIVER_HOST='10.143.15.111'
SPARK_DRIVER_PORT=2222
SPARK_BLOCK_MANAGER_PORT=7777
# The IP of the spark cluster master
SPARK_MASTER_HOST='172.200.202.20'
SPARK_MASTER_PORT=2424
%%bash
[ -d spark_jars ] || mkdir spark_jars
wget -c -q -O spark_jars/aws-java-sdk.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.742/aws-java-sdk-1.12.742.jar
wget -c -q -O spark_jars/hadoop-aws.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
wget -c -q -O spark_jars/hadoop-client.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/3.3.4/hadoop-client-3.3.4.jar
wget -c -q -O spark_jars/aws-java-sdk-dynamodb.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.12.742/aws-java-sdk-dynamodb-1.12.742.jar
wget -c -q -O spark_jars/iceberg-spark-runtime.jar https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.13/1.6.1/iceberg-spark-runtime-3.4_2.13-1.6.1.jar
import os
jars_directory = 'spark_jars'
jar_files = [
os.path.join(jars_directory, file)
for file in os.listdir(jars_directory)
if file.endswith(".jar")
]
jar_files
['spark_jars/aws-java-sdk.jar',
'spark_jars/hadoop-aws.jar',
'spark_jars/hadoop-client.jar',
'spark_jars/aws-java-sdk-dynamodb.jar',
'spark_jars/iceberg-spark-runtime.jar']
! rm -rf /tmp/iceberg_warehouse
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("SparkConnectionTest")
.master(f"spark://{SPARK_MASTER_HOST}:{SPARK_MASTER_PORT}")
.config("spark.driver.host", SPARK_DRIVER_HOST)
.config("spark.driver.port", SPARK_DRIVER_PORT)
.config("spark.driver.blockManager.port", SPARK_BLOCK_MANAGER_PORT)
.config("spark.sql.session.timeZone", "UTC")
.config("spark.driver.memory", "8g")
.config("spark.executor.memory", "1g")
.config("spark.driver.extraClassPath", "spark_jars/*")
.config("spark.jars", ",".join(jar_files))
# iceberg specific config
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop") # Using Hadoop as catalog type
.config("spark.sql.catalog.spark_catalog.warehouse", "file:///tmp/iceberg_warehouse")
.getOrCreate()
)
print(spark.version)
3.4.1
# Test the connection by running a simple operation
test_df = spark.sql("SELECT 1 as test")
test_df.collect()
[Row(test=1)]
test_df.writeTo("spark_catalog.default.test_table") \
.using("iceberg") \
.createOrReplace()
! find /tmp/iceberg_warehouse/default/test_table
/tmp/iceberg_warehouse/default/test_table
/tmp/iceberg_warehouse/default/test_table/metadata
/tmp/iceberg_warehouse/default/test_table/metadata/957fff18-4bb9-4465-925c-2511ced81166-m0.avro
/tmp/iceberg_warehouse/default/test_table/metadata/.957fff18-4bb9-4465-925c-2511ced81166-m0.avro.crc
/tmp/iceberg_warehouse/default/test_table/metadata/snap-5037091842914563517-1-957fff18-4bb9-4465-925c-2511ced81166.avro
/tmp/iceberg_warehouse/default/test_table/metadata/.snap-5037091842914563517-1-957fff18-4bb9-4465-925c-2511ced81166.avro.crc
/tmp/iceberg_warehouse/default/test_table/metadata/v1.metadata.json
/tmp/iceberg_warehouse/default/test_table/metadata/.v1.metadata.json.crc
/tmp/iceberg_warehouse/default/test_table/metadata/version-hint.text
/tmp/iceberg_warehouse/default/test_table/metadata/.version-hint.text.crc
# free up resources
spark.stop()