Python SDK - import CSV

Python SDK - import CSV#

! pip3 install --quiet --upgrade vastdb numpy pyarrow pandas

Define functions#

import io
import os
import pyarrow as pa
from pyarrow import csv as pa_csv
import numpy as np
import pandas as pd
import vastdb

def read_csv(file_path):
    """Reads CSV data from a file."""
    try:
        return pa_csv.read_csv(file_path)
    except Exception as e:
        raise RuntimeError(f"Error reading CSV file: {e}") from e

def connect_to_vastdb(endpoint, access_key, secret_key):
    """Connects to VastDB."""
    try:
        session = vastdb.connect(endpoint=endpoint, access=access_key, secret=secret_key)
        print("Connected to VastDB")
        return session
    except Exception as e:
        raise RuntimeError(f"Failed to connect to VastDB: {e}") from e

def write_to_vastdb(session, bucket_name, schema_name, table_name, pa_table):
    """Writes data to VastDB."""
    with session.transaction() as tx:
        bucket = tx.bucket(bucket_name)
        schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)

        table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)

        columns_to_add = get_columns_to_add(table.arrow_schema, pa_table.schema)
        for column in columns_to_add:
            table.add_column(column)

        table.insert(pa_table)

def get_columns_to_add(existing_schema, desired_schema):
    """Identifies columns to add to an existing schema."""
    existing_fields = set(existing_schema.names)
    desired_fields = set(desired_schema.names)
    return [pa.schema([pa.field(name, desired_schema.field(name).type)]) for name in desired_fields - existing_fields]


def query_vastdb(session, bucket_name, schema_name, table_name):
    """Writes data to VastDB."""
    with session.transaction() as tx:
        bucket = tx.bucket(bucket_name)
        schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
        table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)

        return table.select().read_all()

CSV Example#

VASTDB_ENDPOINT = os.getenv("VASTDB_ENDPOINT")
VASTDB_ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY")
VASTDB_SECRET_KEY = os.getenv("VASTDB_SECRET_KEY")

VASTDB_TWITTER_INGEST_BUCKET = os.getenv("VASTDB_TWITTER_INGEST_BUCKET")
VASTDB_TWITTER_INGEST_SCHEMA = os.getenv("VASTDB_TWITTER_INGEST_SCHEMA")
VASTDB_TWITTER_INGEST_TABLE = 'csv_import'

FILE_PATH = "example.csv"

csv_data = """
id,tstamp,v1,v2
1,2014-01-01 00:15:00.00000,2,17.4
1,2014-01-01 00:30:00.00000,1.9,20.2
1,2014-01-01 00:45:00.00000,1.8,19.7
"""

with open(FILE_PATH, 'w') as f:
    f.write(csv_data)

pa_table = read_csv(FILE_PATH)


# Connect to VastDB
session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)

# Write data to VastDB
write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)

# Query data
pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)
pa_table.to_pandas()
Connected to VastDB
id tstamp v1 v2
0 1 2014-01-01 00:15:00 2.0 17.4
1 1 2014-01-01 00:30:00 1.9 20.2
2 1 2014-01-01 00:45:00 1.8 19.7

Schema evolution#

FILE_PATH = "example2.csv"

csv_data = """
id,tstamp,v1,v2,v3
1,2014-01-01 00:15:00.00000,2,17.4,1
1,2014-01-01 00:30:00.00000,1.9,20.2,2
1,2014-01-01 00:45:00.00000,1.8,19.7,3
"""

with open(FILE_PATH, 'w') as f:
    f.write(csv_data)

pa_table = read_csv(FILE_PATH)


# Connect to VastDB
session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)

# Write data to VastDB
write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)

# Query data
pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)
pa_table.to_pandas()
Connected to VastDB
id tstamp v1 v2 v3
0 1 2014-01-01 00:15:00 2.0 17.4 NaN
1 1 2014-01-01 00:30:00 1.9 20.2 NaN
2 1 2014-01-01 00:45:00 1.8 19.7 NaN
3 1 2014-01-01 00:15:00 2.0 17.4 1.0
4 1 2014-01-01 00:30:00 1.9 20.2 2.0
5 1 2014-01-01 00:45:00 1.8 19.7 3.0