Kafka via Spark Streaming

Kafka via Spark Streaming#

This project provides and example of importing from Kafka to VastDB using Spark Streaming and the Vast DB Python SDK.

The main benefits of this approach is that it can use any version of Spark (no dependency on VastDB Spark Library).

!pip install --quiet pyspark kafka vastdb pyarrow
Note: you may need to restart the kernel to use updated packages.
# Configure Kafka consumer parameters (replace with your details)
brokers = ""
topic = ""

# Configure VastDB connection details (replace with your credentials)
vastdb_endpoint = ""
access_key_id = ""
secret_access_key = ""
bucket_name = ""
schema_name = ""
table_name = ""

Verify that we can connect to Kafka

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
import vastdb
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = \
    '--master local[*] --deploy-mode client --driver-memory 2g --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 pyspark-shell'

# Create SparkSession
spark = SparkSession.builder.appName("KafkaToVastDB") \
        .getOrCreate()

print("SparkSession created successfully")

# Verify the Kafka package
packages = spark.sparkContext.getConf().get("spark.jars.packages")
print(f"Loaded packages: {packages}")

sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")
spark
SparkSession created successfully
Loaded packages: None

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.5.2
Master
local[*]
AppName
KafkaToVastDB
# Run a simple Kafka query
try:
    df = spark.read.format("kafka") \
        .option("kafka.bootstrap.servers", brokers) \
        .option("subscribe", topic) \
        .load() \
        .limit(5)
    
    df.show()
    print("Kafka integration is working")
except Exception as e:
    print(f"Error reading from Kafka: {e}")
    raise(e)
[Stage 24:>                 (0 + 1) / 1][Stage 27:>                 (0 + 3) / 3]
+--------------------+--------------------+-------------+---------+------+--------------------+-------------+
|                 key|               value|        topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+-------------+---------+------+--------------------+-------------+
|[35 36 63 31 33 3...|[7B 22 5F 61 69 7...|vastdb_tester|        0|     0|2024-08-05 17:58:...|            0|
|[65 66 36 30 39 6...|[7B 22 5F 61 69 7...|vastdb_tester|        0|     1|2024-08-05 17:58:...|            0|
|[32 38 38 32 63 6...|[7B 22 5F 61 69 7...|vastdb_tester|        0|     2|2024-08-05 17:58:...|            0|
|[63 39 38 62 66 3...|[7B 22 5F 61 69 7...|vastdb_tester|        0|     3|2024-08-05 17:58:...|            0|
|[63 37 34 34 62 3...|[7B 22 5F 61 69 7...|vastdb_tester|        0|     4|2024-08-05 17:58:...|            0|
+--------------------+--------------------+-------------+---------+------+--------------------+-------------+

Kafka integration is working
                                                                                

Verify that we can connect to Vast DB

session = vastdb.connect(
        timeout=60,
        endpoint=vastdb_endpoint, 
        access=access_key_id, 
        secret=secret_access_key
    )
print("Connected to VastDB")
Connected to VastDB

Verify we can perform a streaming read from Kaka

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
import vastdb
import pyarrow as pa

# Read data from Kafka using Structured Streaming
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", brokers) \
    .option("subscribe", topic) \
    .option("startingOffsets", "latest") \
    .load()

raw_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
raw_query = raw_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

try:
    raw_query.awaitTermination(20)  # Run for a short time to inspect the output
    raw_query.stop()
except Exception as e:
    pass # Ignore the exception
[Stage 28:>                                                         (0 + 3) / 3]
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+
|key|value|
+---+-----+
+---+-----+
                                                                                
Successfully wrote 1000 rows to vastdb/airbyte_demo/spark_streaming

Consume from Kafka and write to Vast DB

# Define schema for incoming Kafka messages 
schema = StructType([
    StructField("key", StringType(), True),
    StructField("value", StringType(), True)
])

# Parse JSON messages
# value_df = df.selectExpr("CAST(value AS STRING)")
# parsed_df = value_df.select(from_json(col("value"), schema).alias("parsed_value"))
# parsed_df = parsed_df.select("parsed_value.*")

# For now just save to the DB as Key, Value
parsed_df = raw_df


class StreamProcessor:
    def __init__(self, await_termination_seconds=None):
        self.query = None
        self.await_termination_seconds = await_termination_seconds

    def process_batch(self, df, epochId):
        
        if df.count() == 0:
            return

        # Connect to VastDB and write data
        try:
            session = vastdb.connect(
                timeout=60,
                endpoint=vastdb_endpoint, 
                access=access_key_id, 
                secret=secret_access_key
            )
        except Exception as e:
            raise Exception(f"Error connecting to VastDB") from e
        
        pa_table = pa.Table.from_pandas(df.toPandas())
        pa_schema = pa_table.schema
        
        with session.transaction() as tx:
            bucket = tx.bucket(bucket_name)
            dbschema = bucket.schema(schema_name, fail_if_missing=False)
            if dbschema is None:
                dbschema = bucket.create_schema(schema_name)
                print(f"Created DB schema: {schema_name}")

            table = dbschema.table(name=table_name, fail_if_missing=False)
            if table is None:
                try:
                    table = dbschema.create_table(table_name=table_name, columns=pa_schema)
                    print(f"Created table: {table_name} with columns: {table.columns()}")
                except Exception as e:
                    raise Exception(f"Error creating table: {table_name} from pa_table {pa_table} with schema {pa_schema}")
                
            # Write data to VastDB table
            try:
                table.insert(pa_table)
                print(f"Successfully wrote {df.count()} rows to {bucket_name}/{schema_name}/{table_name}")
            except Exception as e:
                raise Exception(f"Error inserting data into VastDB") from e

    def start(self):
        self.query = parsed_df.writeStream \
            .foreachBatch(self.process_batch) \
            .outputMode("append") \
            .trigger(processingTime='10 seconds') \
            .start()
        self.query.awaitTermination(self.await_termination_seconds)

# Create an instance of StreamProcessor and start the stream
processor = StreamProcessor(await_termination_seconds=60)
processor.start()