{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Kafka via Spark Streaming\n", "\n", "This project provides and example of importing from Kafka to VastDB using Spark Streaming and the Vast DB Python SDK.\n", "\n", "The main benefits of this approach is that it can use any version of Spark (no dependency on VastDB Spark Library)." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Note: you may need to restart the kernel to use updated packages.\n" ] } ], "source": [ "!pip install --quiet pyspark kafka vastdb pyarrow" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "# Configure Kafka consumer parameters (replace with your details)\n", "brokers = \"\"\n", "topic = \"\"\n", "\n", "# Configure VastDB connection details (replace with your credentials)\n", "vastdb_endpoint = \"\"\n", "access_key_id = \"\"\n", "secret_access_key = \"\"\n", "bucket_name = \"\"\n", "schema_name = \"\"\n", "table_name = \"\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Verify that we can connect to Kafka" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "SparkSession created successfully\n", "Loaded packages: None\n" ] }, { "data": { "text/html": [ "\n", "
SparkSession - in-memory
\n", " \n", "SparkContext
\n", "\n", " \n", "\n", "v3.5.2
local[*]
KafkaToVastDB