Python SDK - import JSON#
! pip3 install --quiet --upgrade vastdb numpy pyarrow pandas
Define functions#
import io
import os
import pyarrow as pa
from pyarrow import json as pa_json
import numpy as np
import pandas as pd
import vastdb
def read_json(file_path):
"""Reads JSON data from a file."""
try:
with open(file_path, 'rb') as f:
return pa_json.read_json(io.BytesIO(f.read()))
except Exception as e:
raise RuntimeError(f"Error reading JSON file: {e}") from e
def connect_to_vastdb(endpoint, access_key, secret_key):
"""Connects to VastDB."""
try:
session = vastdb.connect(endpoint=endpoint, access=access_key, secret=secret_key)
print("Connected to VastDB")
return session
except Exception as e:
raise RuntimeError(f"Failed to connect to VastDB: {e}") from e
def write_to_vastdb(session, bucket_name, schema_name, table_name, pa_table):
"""Writes data to VastDB."""
with session.transaction() as tx:
bucket = tx.bucket(bucket_name)
schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)
columns_to_add = get_columns_to_add(table.arrow_schema, pa_table.schema)
for column in columns_to_add:
table.add_column(column)
table.insert(pa_table)
def get_columns_to_add(existing_schema, desired_schema):
"""Identifies columns to add to an existing schema."""
existing_fields = set(existing_schema.names)
desired_fields = set(desired_schema.names)
return [pa.schema([pa.field(name, desired_schema.field(name).type)]) for name in desired_fields - existing_fields]
def query_vastdb(session, bucket_name, schema_name, table_name):
"""Writes data to VastDB."""
with session.transaction() as tx:
bucket = tx.bucket(bucket_name)
schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)
return table.select().read_all()
JSON Example#
VASTDB_ENDPOINT = os.getenv("VASTDB_ENDPOINT")
VASTDB_ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY")
VASTDB_SECRET_KEY = os.getenv("VASTDB_SECRET_KEY")
VASTDB_TWITTER_INGEST_BUCKET = os.getenv("VASTDB_TWITTER_INGEST_BUCKET")
VASTDB_TWITTER_INGEST_SCHEMA = os.getenv("VASTDB_TWITTER_INGEST_SCHEMA")
VASTDB_TWITTER_INGEST_TABLE = 'json_import'
FILE_PATH = "example.json"
json_data = """
{"id":1, "tstamp":"2014-01-01 00:15:00.00000", "v1":2.0, "v2":17.4}
{"id":1, "tstamp":"2014-01-01 00:30:00.00000", "v1":1.9, "v2":20.2}
{"id":1, "tstamp":"2014-01-01 00:45:00.00000", "v1":1.8, "v2":19.7}
"""
with open(FILE_PATH, 'w') as f:
f.write(json_data)
pa_table = read_json(FILE_PATH)
# Connect to VastDB
session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)
# Write data to VastDB
write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)
# Query data
pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)
pa_table.to_pandas()
Connected to VastDB
id | tstamp | v1 | v2 | |
---|---|---|---|---|
0 | 1 | 2014-01-01 00:15:00.00000 | 2.0 | 17.4 |
1 | 1 | 2014-01-01 00:30:00.00000 | 1.9 | 20.2 |
2 | 1 | 2014-01-01 00:45:00.00000 | 1.8 | 19.7 |
Schema evolution#
FILE_PATH = "example2.json"
json_data = """
{"id":1, "tstamp":"2014-01-01 00:15:00.00000", "v1":2.0, "v2":17.4, "v3":1}
{"id":1, "tstamp":"2014-01-01 00:30:00.00000", "v1":1.9, "v2":20.2, "v3":2}
{"id":1, "tstamp":"2014-01-01 00:45:00.00000", "v1":1.8, "v2":19.7, "v3":3}
"""
with open(FILE_PATH, 'w') as f:
f.write(json_data)
pa_table = read_json(FILE_PATH)
# Connect to VastDB
session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)
# Write data to VastDB
write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)
# Query data
pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)
pa_table.to_pandas()
Connected to VastDB
id | tstamp | v1 | v2 | v3 | |
---|---|---|---|---|---|
0 | 1 | 2014-01-01 00:15:00.00000 | 2.0 | 17.4 | NaN |
1 | 1 | 2014-01-01 00:30:00.00000 | 1.9 | 20.2 | NaN |
2 | 1 | 2014-01-01 00:45:00.00000 | 1.8 | 19.7 | NaN |
3 | 1 | 2014-01-01 00:15:00.00000 | 2.0 | 17.4 | 1.0 |
4 | 1 | 2014-01-01 00:30:00.00000 | 1.9 | 20.2 | 2.0 |
5 | 1 | 2014-01-01 00:45:00.00000 | 1.8 | 19.7 | 3.0 |