{
"cells": [
{
"cell_type": "markdown",
"id": "87e6e872-be46-45b2-8185-6159214f0dc0",
"metadata": {},
"source": [
"# Python SDK - import JSON"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "6c6ab7f7-86d7-45e0-850e-66b0274e4de1",
"metadata": {},
"outputs": [],
"source": [
"! pip3 install --quiet --upgrade vastdb numpy pyarrow pandas"
]
},
{
"cell_type": "markdown",
"id": "7c93b098-47ea-4278-aaf7-4a128a15b0da",
"metadata": {},
"source": [
"## Define functions"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "6b498ddf-9c14-40bc-94f4-687f355a9ea4",
"metadata": {},
"outputs": [],
"source": [
"import io\n",
"import os\n",
"import pyarrow as pa\n",
"from pyarrow import json as pa_json\n",
"import numpy as np\n",
"import pandas as pd\n",
"import vastdb\n",
"\n",
"\n",
"def read_json(file_path):\n",
" \"\"\"Reads JSON data from a file.\"\"\"\n",
" try:\n",
" with open(file_path, 'rb') as f:\n",
" return pa_json.read_json(io.BytesIO(f.read()))\n",
" except Exception as e:\n",
" raise RuntimeError(f\"Error reading JSON file: {e}\") from e\n",
"\n",
"def connect_to_vastdb(endpoint, access_key, secret_key):\n",
" \"\"\"Connects to VastDB.\"\"\"\n",
" try:\n",
" session = vastdb.connect(endpoint=endpoint, access=access_key, secret=secret_key)\n",
" print(\"Connected to VastDB\")\n",
" return session\n",
" except Exception as e:\n",
" raise RuntimeError(f\"Failed to connect to VastDB: {e}\") from e\n",
"\n",
"def write_to_vastdb(session, bucket_name, schema_name, table_name, pa_table):\n",
" \"\"\"Writes data to VastDB.\"\"\"\n",
" with session.transaction() as tx:\n",
" bucket = tx.bucket(bucket_name)\n",
" schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)\n",
"\n",
" table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)\n",
"\n",
" columns_to_add = get_columns_to_add(table.arrow_schema, pa_table.schema)\n",
" for column in columns_to_add:\n",
" table.add_column(column)\n",
"\n",
" table.insert(pa_table)\n",
"\n",
"def get_columns_to_add(existing_schema, desired_schema):\n",
" \"\"\"Identifies columns to add to an existing schema.\"\"\"\n",
" existing_fields = set(existing_schema.names)\n",
" desired_fields = set(desired_schema.names)\n",
" return [pa.schema([pa.field(name, desired_schema.field(name).type)]) for name in desired_fields - existing_fields]\n",
"\n",
"\n",
"def query_vastdb(session, bucket_name, schema_name, table_name):\n",
" \"\"\"Writes data to VastDB.\"\"\"\n",
" with session.transaction() as tx:\n",
" bucket = tx.bucket(bucket_name)\n",
" schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)\n",
" table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)\n",
"\n",
" return table.select().read_all()\n"
]
},
{
"cell_type": "markdown",
"id": "c7685f00-eb33-4f75-a99a-08f94573903d",
"metadata": {},
"source": [
"## JSON Example"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "9148c4e2-da32-4464-a46a-deda18c97c7b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connected to VastDB\n"
]
},
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" tstamp | \n",
" v1 | \n",
" v2 | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" 2014-01-01 00:15:00.00000 | \n",
" 2.0 | \n",
" 17.4 | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" 2014-01-01 00:30:00.00000 | \n",
" 1.9 | \n",
" 20.2 | \n",
"
\n",
" \n",
" 2 | \n",
" 1 | \n",
" 2014-01-01 00:45:00.00000 | \n",
" 1.8 | \n",
" 19.7 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id tstamp v1 v2\n",
"0 1 2014-01-01 00:15:00.00000 2.0 17.4\n",
"1 1 2014-01-01 00:30:00.00000 1.9 20.2\n",
"2 1 2014-01-01 00:45:00.00000 1.8 19.7"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"VASTDB_ENDPOINT = os.getenv(\"VASTDB_ENDPOINT\")\n",
"VASTDB_ACCESS_KEY = os.getenv(\"VASTDB_ACCESS_KEY\")\n",
"VASTDB_SECRET_KEY = os.getenv(\"VASTDB_SECRET_KEY\")\n",
"\n",
"VASTDB_TWITTER_INGEST_BUCKET = os.getenv(\"VASTDB_TWITTER_INGEST_BUCKET\")\n",
"VASTDB_TWITTER_INGEST_SCHEMA = os.getenv(\"VASTDB_TWITTER_INGEST_SCHEMA\")\n",
"VASTDB_TWITTER_INGEST_TABLE = 'json_import'\n",
"\n",
"FILE_PATH = \"example.json\"\n",
"\n",
"json_data = \"\"\"\n",
"{\"id\":1, \"tstamp\":\"2014-01-01 00:15:00.00000\", \"v1\":2.0, \"v2\":17.4}\n",
"{\"id\":1, \"tstamp\":\"2014-01-01 00:30:00.00000\", \"v1\":1.9, \"v2\":20.2}\n",
"{\"id\":1, \"tstamp\":\"2014-01-01 00:45:00.00000\", \"v1\":1.8, \"v2\":19.7}\n",
"\"\"\"\n",
"\n",
"with open(FILE_PATH, 'w') as f:\n",
" f.write(json_data)\n",
"\n",
"pa_table = read_json(FILE_PATH)\n",
"\n",
"\n",
"# Connect to VastDB\n",
"session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)\n",
"\n",
"# Write data to VastDB\n",
"write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)\n",
"\n",
"# Query data\n",
"pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)\n",
"pa_table.to_pandas()\n"
]
},
{
"cell_type": "markdown",
"id": "c605dcef-fecd-4aa9-9eaa-4fdf4972323e",
"metadata": {},
"source": [
"### Schema evolution"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "b795a2a1-47ba-436c-907d-3f94ae4c74eb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Connected to VastDB\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" tstamp | \n",
" v1 | \n",
" v2 | \n",
" v3 | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" 2014-01-01 00:15:00.00000 | \n",
" 2.0 | \n",
" 17.4 | \n",
" NaN | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" 2014-01-01 00:30:00.00000 | \n",
" 1.9 | \n",
" 20.2 | \n",
" NaN | \n",
"
\n",
" \n",
" 2 | \n",
" 1 | \n",
" 2014-01-01 00:45:00.00000 | \n",
" 1.8 | \n",
" 19.7 | \n",
" NaN | \n",
"
\n",
" \n",
" 3 | \n",
" 1 | \n",
" 2014-01-01 00:15:00.00000 | \n",
" 2.0 | \n",
" 17.4 | \n",
" 1.0 | \n",
"
\n",
" \n",
" 4 | \n",
" 1 | \n",
" 2014-01-01 00:30:00.00000 | \n",
" 1.9 | \n",
" 20.2 | \n",
" 2.0 | \n",
"
\n",
" \n",
" 5 | \n",
" 1 | \n",
" 2014-01-01 00:45:00.00000 | \n",
" 1.8 | \n",
" 19.7 | \n",
" 3.0 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id tstamp v1 v2 v3\n",
"0 1 2014-01-01 00:15:00.00000 2.0 17.4 NaN\n",
"1 1 2014-01-01 00:30:00.00000 1.9 20.2 NaN\n",
"2 1 2014-01-01 00:45:00.00000 1.8 19.7 NaN\n",
"3 1 2014-01-01 00:15:00.00000 2.0 17.4 1.0\n",
"4 1 2014-01-01 00:30:00.00000 1.9 20.2 2.0\n",
"5 1 2014-01-01 00:45:00.00000 1.8 19.7 3.0"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"FILE_PATH = \"example2.json\"\n",
"\n",
"json_data = \"\"\"\n",
"{\"id\":1, \"tstamp\":\"2014-01-01 00:15:00.00000\", \"v1\":2.0, \"v2\":17.4, \"v3\":1}\n",
"{\"id\":1, \"tstamp\":\"2014-01-01 00:30:00.00000\", \"v1\":1.9, \"v2\":20.2, \"v3\":2}\n",
"{\"id\":1, \"tstamp\":\"2014-01-01 00:45:00.00000\", \"v1\":1.8, \"v2\":19.7, \"v3\":3}\n",
"\"\"\"\n",
"\n",
"with open(FILE_PATH, 'w') as f:\n",
" f.write(json_data)\n",
"\n",
"pa_table = read_json(FILE_PATH)\n",
"\n",
"\n",
"# Connect to VastDB\n",
"session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)\n",
"\n",
"# Write data to VastDB\n",
"write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)\n",
"\n",
"# Query data\n",
"pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)\n",
"pa_table.to_pandas()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3c1bb6ad-def2-45e0-9e1d-7de6298883cf",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}