{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Kafka via Spark Streaming\n", "\n", "This project provides and example of importing from Kafka to VastDB using Spark Streaming and the Vast DB Python SDK.\n", "\n", "The main benefits of this approach is that it can use any version of Spark (no dependency on VastDB Spark Library)." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Note: you may need to restart the kernel to use updated packages.\n" ] } ], "source": [ "!pip install --quiet pyspark kafka vastdb pyarrow" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# Configure Kafka consumer parameters (replace with your details)\n", "brokers = \"\"\n", "topic = \"\"\n", "\n", "# Configure VastDB connection details (replace with your credentials)\n", "vastdb_endpoint = \"\"\n", "access_key_id = \"\"\n", "secret_access_key = \"\"\n", "bucket_name = \"\"\n", "schema_name = \"\"\n", "table_name = \"\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Verify that we can connect to Kafka" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "SparkSession created successfully\n", "Loaded packages: None\n" ] }, { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.5.2
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
KafkaToVastDB
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql import SparkSession\n", "from pyspark.sql.functions import from_json\n", "from pyspark.sql.types import StructType, StructField, StringType, TimestampType\n", "import vastdb\n", "import os\n", "\n", "os.environ['PYSPARK_SUBMIT_ARGS'] = \\\n", " '--master local[*] --deploy-mode client --driver-memory 2g --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 pyspark-shell'\n", "\n", "# Create SparkSession\n", "spark = SparkSession.builder.appName(\"KafkaToVastDB\") \\\n", " .getOrCreate()\n", "\n", "print(\"SparkSession created successfully\")\n", "\n", "# Verify the Kafka package\n", "packages = spark.sparkContext.getConf().get(\"spark.jars.packages\")\n", "print(f\"Loaded packages: {packages}\")\n", "\n", "sqlContext = SparkSession(spark)\n", "#Dont Show warning only error\n", "spark.sparkContext.setLogLevel(\"ERROR\")\n", "spark" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 24:> (0 + 1) / 1][Stage 27:> (0 + 3) / 3]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+--------------------+-------------+---------+------+--------------------+-------------+\n", "| key| value| topic|partition|offset| timestamp|timestampType|\n", "+--------------------+--------------------+-------------+---------+------+--------------------+-------------+\n", "|[35 36 63 31 33 3...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 0|2024-08-05 17:58:...| 0|\n", "|[65 66 36 30 39 6...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 1|2024-08-05 17:58:...| 0|\n", "|[32 38 38 32 63 6...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 2|2024-08-05 17:58:...| 0|\n", "|[63 39 38 62 66 3...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 3|2024-08-05 17:58:...| 0|\n", "|[63 37 34 34 62 3...|[7B 22 5F 61 69 7...|vastdb_tester| 0| 4|2024-08-05 17:58:...| 0|\n", "+--------------------+--------------------+-------------+---------+------+--------------------+-------------+\n", "\n", "Kafka integration is working\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "\n", "# Run a simple Kafka query\n", "try:\n", " df = spark.read.format(\"kafka\") \\\n", " .option(\"kafka.bootstrap.servers\", brokers) \\\n", " .option(\"subscribe\", topic) \\\n", " .load() \\\n", " .limit(5)\n", " \n", " df.show()\n", " print(\"Kafka integration is working\")\n", "except Exception as e:\n", " print(f\"Error reading from Kafka: {e}\")\n", " raise(e)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Verify that we can connect to Vast DB" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected to VastDB\n" ] } ], "source": [ "session = vastdb.connect(\n", " timeout=60,\n", " endpoint=vastdb_endpoint, \n", " access=access_key_id, \n", " secret=secret_access_key\n", " )\n", "print(\"Connected to VastDB\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Verify we can perform a streaming read from Kaka" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 28:> (0 + 3) / 3]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "-------------------------------------------\n", "Batch: 0\n", "-------------------------------------------\n", "+---+-----+\n", "|key|value|\n", "+---+-----+\n", "+---+-----+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Successfully wrote 1000 rows to vastdb/airbyte_demo/spark_streaming\n" ] } ], "source": [ "from pyspark.sql.functions import from_json, col\n", "from pyspark.sql.types import StructType, StructField, StringType\n", "import vastdb\n", "import pyarrow as pa\n", "\n", "# Read data from Kafka using Structured Streaming\n", "df = spark \\\n", " .readStream \\\n", " .format(\"kafka\") \\\n", " .option(\"kafka.bootstrap.servers\", brokers) \\\n", " .option(\"subscribe\", topic) \\\n", " .option(\"startingOffsets\", \"latest\") \\\n", " .load()\n", "\n", "raw_df = df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n", "raw_query = raw_df.writeStream \\\n", " .outputMode(\"append\") \\\n", " .format(\"console\") \\\n", " .start()\n", "\n", "try:\n", " raw_query.awaitTermination(20) # Run for a short time to inspect the output\n", " raw_query.stop()\n", "except Exception as e:\n", " pass # Ignore the exception" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Consume from Kafka and write to Vast DB" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "# Define schema for incoming Kafka messages \n", "schema = StructType([\n", " StructField(\"key\", StringType(), True),\n", " StructField(\"value\", StringType(), True)\n", "])\n", "\n", "# Parse JSON messages\n", "# value_df = df.selectExpr(\"CAST(value AS STRING)\")\n", "# parsed_df = value_df.select(from_json(col(\"value\"), schema).alias(\"parsed_value\"))\n", "# parsed_df = parsed_df.select(\"parsed_value.*\")\n", "\n", "# For now just save to the DB as Key, Value\n", "parsed_df = raw_df\n", "\n", "\n", "class StreamProcessor:\n", " def __init__(self, await_termination_seconds=None):\n", " self.query = None\n", " self.await_termination_seconds = await_termination_seconds\n", "\n", " def process_batch(self, df, epochId):\n", " \n", " if df.count() == 0:\n", " return\n", "\n", " # Connect to VastDB and write data\n", " try:\n", " session = vastdb.connect(\n", " timeout=60,\n", " endpoint=vastdb_endpoint, \n", " access=access_key_id, \n", " secret=secret_access_key\n", " )\n", " except Exception as e:\n", " raise Exception(f\"Error connecting to VastDB\") from e\n", " \n", " pa_table = pa.Table.from_pandas(df.toPandas())\n", " pa_schema = pa_table.schema\n", " \n", " with session.transaction() as tx:\n", " bucket = tx.bucket(bucket_name)\n", " dbschema = bucket.schema(schema_name, fail_if_missing=False)\n", " if dbschema is None:\n", " dbschema = bucket.create_schema(schema_name)\n", " print(f\"Created DB schema: {schema_name}\")\n", "\n", " table = dbschema.table(name=table_name, fail_if_missing=False)\n", " if table is None:\n", " try:\n", " table = dbschema.create_table(table_name=table_name, columns=pa_schema)\n", " print(f\"Created table: {table_name} with columns: {table.columns()}\")\n", " except Exception as e:\n", " raise Exception(f\"Error creating table: {table_name} from pa_table {pa_table} with schema {pa_schema}\")\n", " \n", " # Write data to VastDB table\n", " try:\n", " table.insert(pa_table)\n", " print(f\"Successfully wrote {df.count()} rows to {bucket_name}/{schema_name}/{table_name}\")\n", " except Exception as e:\n", " raise Exception(f\"Error inserting data into VastDB\") from e\n", "\n", " def start(self):\n", " self.query = parsed_df.writeStream \\\n", " .foreachBatch(self.process_batch) \\\n", " .outputMode(\"append\") \\\n", " .trigger(processingTime='10 seconds') \\\n", " .start()\n", " self.query.awaitTermination(self.await_termination_seconds)\n", "\n", "# Create an instance of StreamProcessor and start the stream\n", "processor = StreamProcessor(await_termination_seconds=60)\n", "processor.start()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "3.10.14-venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 2 }