Kafka via Spark Streaming#
This project provides and example of importing from Kafka to VastDB using Spark Streaming and the Vast DB Python SDK.
The main benefits of this approach is that it can use any version of Spark (no dependency on VastDB Spark Library).
!pip install --quiet pyspark kafka vastdb pyarrow
Note: you may need to restart the kernel to use updated packages.
# Configure Kafka consumer parameters (replace with your details)
brokers = ""
topic = ""
# Configure VastDB connection details (replace with your credentials)
vastdb_endpoint = ""
access_key_id = ""
secret_access_key = ""
bucket_name = ""
schema_name = ""
table_name = ""
Verify that we can connect to Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
import vastdb
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--master local[*] --deploy-mode client --driver-memory 2g --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 pyspark-shell'
# Create SparkSession
spark = SparkSession.builder.appName("KafkaToVastDB") \
.getOrCreate()
print("SparkSession created successfully")
# Verify the Kafka package
packages = spark.sparkContext.getConf().get("spark.jars.packages")
print(f"Loaded packages: {packages}")
sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")
spark
SparkSession created successfully
Loaded packages: None
SparkSession - in-memory
# Run a simple Kafka query
try:
df = spark.read.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("subscribe", topic) \
.load() \
.limit(5)
df.show()
print("Kafka integration is working")
except Exception as e:
print(f"Error reading from Kafka: {e}")
raise(e)
[Stage 24:> (0 + 1) / 1][Stage 27:> (0 + 3) / 3]
+--------------------+--------------------+-------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+--------------------+--------------------+-------------+---------+------+--------------------+-------------+
|[35 36 63 31 33 3...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 0|2024-08-05 17:58:...| 0|
|[65 66 36 30 39 6...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 1|2024-08-05 17:58:...| 0|
|[32 38 38 32 63 6...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 2|2024-08-05 17:58:...| 0|
|[63 39 38 62 66 3...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 3|2024-08-05 17:58:...| 0|
|[63 37 34 34 62 3...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 4|2024-08-05 17:58:...| 0|
+--------------------+--------------------+-------------+---------+------+--------------------+-------------+
Kafka integration is working
Verify that we can connect to Vast DB
session = vastdb.connect(
timeout=60,
endpoint=vastdb_endpoint,
access=access_key_id,
secret=secret_access_key
)
print("Connected to VastDB")
Connected to VastDB
Verify we can perform a streaming read from Kaka
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
import vastdb
import pyarrow as pa
# Read data from Kafka using Structured Streaming
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.load()
raw_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
raw_query = raw_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
try:
raw_query.awaitTermination(20) # Run for a short time to inspect the output
raw_query.stop()
except Exception as e:
pass # Ignore the exception
[Stage 28:> (0 + 3) / 3]
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+
|key|value|
+---+-----+
+---+-----+
Successfully wrote 1000 rows to vastdb/airbyte_demo/spark_streaming
Consume from Kafka and write to Vast DB
# Define schema for incoming Kafka messages
schema = StructType([
StructField("key", StringType(), True),
StructField("value", StringType(), True)
])
# Parse JSON messages
# value_df = df.selectExpr("CAST(value AS STRING)")
# parsed_df = value_df.select(from_json(col("value"), schema).alias("parsed_value"))
# parsed_df = parsed_df.select("parsed_value.*")
# For now just save to the DB as Key, Value
parsed_df = raw_df
class StreamProcessor:
def __init__(self, await_termination_seconds=None):
self.query = None
self.await_termination_seconds = await_termination_seconds
def process_batch(self, df, epochId):
if df.count() == 0:
return
# Connect to VastDB and write data
try:
session = vastdb.connect(
timeout=60,
endpoint=vastdb_endpoint,
access=access_key_id,
secret=secret_access_key
)
except Exception as e:
raise Exception(f"Error connecting to VastDB") from e
pa_table = pa.Table.from_pandas(df.toPandas())
pa_schema = pa_table.schema
with session.transaction() as tx:
bucket = tx.bucket(bucket_name)
dbschema = bucket.schema(schema_name, fail_if_missing=False)
if dbschema is None:
dbschema = bucket.create_schema(schema_name)
print(f"Created DB schema: {schema_name}")
table = dbschema.table(name=table_name, fail_if_missing=False)
if table is None:
try:
table = dbschema.create_table(table_name=table_name, columns=pa_schema)
print(f"Created table: {table_name} with columns: {table.columns()}")
except Exception as e:
raise Exception(f"Error creating table: {table_name} from pa_table {pa_table} with schema {pa_schema}")
# Write data to VastDB table
try:
table.insert(pa_table)
print(f"Successfully wrote {df.count()} rows to {bucket_name}/{schema_name}/{table_name}")
except Exception as e:
raise Exception(f"Error inserting data into VastDB") from e
def start(self):
self.query = parsed_df.writeStream \
.foreachBatch(self.process_batch) \
.outputMode("append") \
.trigger(processingTime='10 seconds') \
.start()
self.query.awaitTermination(self.await_termination_seconds)
# Create an instance of StreamProcessor and start the stream
processor = StreamProcessor(await_termination_seconds=60)
processor.start()