{ "cells": [ { "cell_type": "markdown", "id": "87e6e872-be46-45b2-8185-6159214f0dc0", "metadata": {}, "source": [ "# Python SDK - import JSON" ] }, { "cell_type": "code", "execution_count": 1, "id": "6c6ab7f7-86d7-45e0-850e-66b0274e4de1", "metadata": {}, "outputs": [], "source": [ "! pip3 install --quiet --upgrade vastdb numpy pyarrow pandas" ] }, { "cell_type": "markdown", "id": "7c93b098-47ea-4278-aaf7-4a128a15b0da", "metadata": {}, "source": [ "## Define functions" ] }, { "cell_type": "code", "execution_count": 2, "id": "6b498ddf-9c14-40bc-94f4-687f355a9ea4", "metadata": {}, "outputs": [], "source": [ "import io\n", "import os\n", "import pyarrow as pa\n", "from pyarrow import json as pa_json\n", "import numpy as np\n", "import pandas as pd\n", "import vastdb\n", "\n", "\n", "def read_json(file_path):\n", " \"\"\"Reads JSON data from a file.\"\"\"\n", " try:\n", " with open(file_path, 'rb') as f:\n", " return pa_json.read_json(io.BytesIO(f.read()))\n", " except Exception as e:\n", " raise RuntimeError(f\"Error reading JSON file: {e}\") from e\n", "\n", "def connect_to_vastdb(endpoint, access_key, secret_key):\n", " \"\"\"Connects to VastDB.\"\"\"\n", " try:\n", " session = vastdb.connect(endpoint=endpoint, access=access_key, secret=secret_key)\n", " print(\"Connected to VastDB\")\n", " return session\n", " except Exception as e:\n", " raise RuntimeError(f\"Failed to connect to VastDB: {e}\") from e\n", "\n", "def write_to_vastdb(session, bucket_name, schema_name, table_name, pa_table):\n", " \"\"\"Writes data to VastDB.\"\"\"\n", " with session.transaction() as tx:\n", " bucket = tx.bucket(bucket_name)\n", " schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)\n", "\n", " table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)\n", "\n", " columns_to_add = get_columns_to_add(table.arrow_schema, pa_table.schema)\n", " for column in columns_to_add:\n", " table.add_column(column)\n", "\n", " table.insert(pa_table)\n", "\n", "def get_columns_to_add(existing_schema, desired_schema):\n", " \"\"\"Identifies columns to add to an existing schema.\"\"\"\n", " existing_fields = set(existing_schema.names)\n", " desired_fields = set(desired_schema.names)\n", " return [pa.schema([pa.field(name, desired_schema.field(name).type)]) for name in desired_fields - existing_fields]\n", "\n", "\n", "def query_vastdb(session, bucket_name, schema_name, table_name):\n", " \"\"\"Writes data to VastDB.\"\"\"\n", " with session.transaction() as tx:\n", " bucket = tx.bucket(bucket_name)\n", " schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)\n", " table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)\n", "\n", " return table.select().read_all()\n" ] }, { "cell_type": "markdown", "id": "c7685f00-eb33-4f75-a99a-08f94573903d", "metadata": {}, "source": [ "## JSON Example" ] }, { "cell_type": "code", "execution_count": 3, "id": "9148c4e2-da32-4464-a46a-deda18c97c7b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected to VastDB\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idtstampv1v2
012014-01-01 00:15:00.000002.017.4
112014-01-01 00:30:00.000001.920.2
212014-01-01 00:45:00.000001.819.7
\n", "
" ], "text/plain": [ " id tstamp v1 v2\n", "0 1 2014-01-01 00:15:00.00000 2.0 17.4\n", "1 1 2014-01-01 00:30:00.00000 1.9 20.2\n", "2 1 2014-01-01 00:45:00.00000 1.8 19.7" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "VASTDB_ENDPOINT = os.getenv(\"VASTDB_ENDPOINT\")\n", "VASTDB_ACCESS_KEY = os.getenv(\"VASTDB_ACCESS_KEY\")\n", "VASTDB_SECRET_KEY = os.getenv(\"VASTDB_SECRET_KEY\")\n", "\n", "VASTDB_TWITTER_INGEST_BUCKET = os.getenv(\"VASTDB_TWITTER_INGEST_BUCKET\")\n", "VASTDB_TWITTER_INGEST_SCHEMA = os.getenv(\"VASTDB_TWITTER_INGEST_SCHEMA\")\n", "VASTDB_TWITTER_INGEST_TABLE = 'json_import'\n", "\n", "FILE_PATH = \"example.json\"\n", "\n", "json_data = \"\"\"\n", "{\"id\":1, \"tstamp\":\"2014-01-01 00:15:00.00000\", \"v1\":2.0, \"v2\":17.4}\n", "{\"id\":1, \"tstamp\":\"2014-01-01 00:30:00.00000\", \"v1\":1.9, \"v2\":20.2}\n", "{\"id\":1, \"tstamp\":\"2014-01-01 00:45:00.00000\", \"v1\":1.8, \"v2\":19.7}\n", "\"\"\"\n", "\n", "with open(FILE_PATH, 'w') as f:\n", " f.write(json_data)\n", "\n", "pa_table = read_json(FILE_PATH)\n", "\n", "\n", "# Connect to VastDB\n", "session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)\n", "\n", "# Write data to VastDB\n", "write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)\n", "\n", "# Query data\n", "pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)\n", "pa_table.to_pandas()\n" ] }, { "cell_type": "markdown", "id": "c605dcef-fecd-4aa9-9eaa-4fdf4972323e", "metadata": {}, "source": [ "### Schema evolution" ] }, { "cell_type": "code", "execution_count": 4, "id": "b795a2a1-47ba-436c-907d-3f94ae4c74eb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected to VastDB\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idtstampv1v2v3
012014-01-01 00:15:00.000002.017.4NaN
112014-01-01 00:30:00.000001.920.2NaN
212014-01-01 00:45:00.000001.819.7NaN
312014-01-01 00:15:00.000002.017.41.0
412014-01-01 00:30:00.000001.920.22.0
512014-01-01 00:45:00.000001.819.73.0
\n", "
" ], "text/plain": [ " id tstamp v1 v2 v3\n", "0 1 2014-01-01 00:15:00.00000 2.0 17.4 NaN\n", "1 1 2014-01-01 00:30:00.00000 1.9 20.2 NaN\n", "2 1 2014-01-01 00:45:00.00000 1.8 19.7 NaN\n", "3 1 2014-01-01 00:15:00.00000 2.0 17.4 1.0\n", "4 1 2014-01-01 00:30:00.00000 1.9 20.2 2.0\n", "5 1 2014-01-01 00:45:00.00000 1.8 19.7 3.0" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "FILE_PATH = \"example2.json\"\n", "\n", "json_data = \"\"\"\n", "{\"id\":1, \"tstamp\":\"2014-01-01 00:15:00.00000\", \"v1\":2.0, \"v2\":17.4, \"v3\":1}\n", "{\"id\":1, \"tstamp\":\"2014-01-01 00:30:00.00000\", \"v1\":1.9, \"v2\":20.2, \"v3\":2}\n", "{\"id\":1, \"tstamp\":\"2014-01-01 00:45:00.00000\", \"v1\":1.8, \"v2\":19.7, \"v3\":3}\n", "\"\"\"\n", "\n", "with open(FILE_PATH, 'w') as f:\n", " f.write(json_data)\n", "\n", "pa_table = read_json(FILE_PATH)\n", "\n", "\n", "# Connect to VastDB\n", "session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)\n", "\n", "# Write data to VastDB\n", "write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)\n", "\n", "# Query data\n", "pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)\n", "pa_table.to_pandas()" ] }, { "cell_type": "code", "execution_count": null, "id": "3c1bb6ad-def2-45e0-9e1d-7de6298883cf", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 5 }