Importing Files using Pandas#

Important

This notebook is in the process of being migrated to Vast Data Platform Field Docs. It will probably not run yet.

See also

The Vast DB SDK API Documentation is available here.

Install and Import the Vast DB SDK#

Before doing anything else, we need to import the vastdb api library.

Note that we also import annotations. This mode makes Python’s behavior more strict, including raising errors for some cases where variables are accidentally undefined.

!pip install vastdb | tail -5
?25hDownloading markdown_it_py-3.0.0-py3-none-any.whl (87 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 87.5/87.5 kB 6.8 MB/s eta 0:00:00
?25hDownloading mdurl-0.1.2-py3-none-any.whl (10.0 kB)
Installing collected packages: flatbuffers, xmltodict, sqlglot, parsy, mdurl, jmespath, bidict, backoff, atpublic, markdown-it-py, botocore, aws-requests-auth, s3transfer, rich, ibis-framework, boto3, vastdb
Successfully installed atpublic-4.1.0 aws-requests-auth-0.4.3 backoff-2.2.1 bidict-0.23.1 boto3-1.34.126 botocore-1.34.126 flatbuffers-24.3.25 ibis-framework-9.0.0 jmespath-1.0.1 markdown-it-py-3.0.0 mdurl-0.1.2 parsy-2.1 rich-13.7.1 s3transfer-0.10.1 sqlglot-23.12.2 vastdb-0.1.7 xmltodict-0.13.0
import vastdb
from __future__ import annotations  # Enable stricter type checking

Creating the initial session#

In the code below, we read the connection details from environment variables to enable automated teesting of the notebook.

Change these to reflect your environment, e.g.

ENDPOINT = 'http://your_vast_endpoint:12345'
DATABASE_NAME = ...
import os

ENDPOINT = os.environ['ENDPOINT']
ACCESS_KEY = os.environ['ACCESS_KEY']
SECRET_KEY = os.environ['SECRET_KEY']

DATABASE_NAME = os.environ['DATABASE_NAME']

# Schema and Table will get created if they doesn't exist 
DATABASE_SCHEMA = os.environ['DATABASE_SCHEMA']
TABLE_NAME = os.environ['TABLE_NAME']

# S3 File Details
BUCKET_NAME = os.environ['BUCKET_NAME']
LOCAL_FILE_PATH = 'flights.parquet'
S3_FILE_KEY = 'pythonsdk/import/flights.parquet'
print(f"""
{ENDPOINT=} 
{ACCESS_KEY=}
---
{DATABASE_NAME=}
{DATABASE_SCHEMA=}
{TABLE_NAME=}
---
{BUCKET_NAME=}
{LOCAL_FILE_PATH=}
{S3_FILE_KEY=}
""")
import pyarrow as pa
import vastdb
import os

session = vastdb.connect(
    endpoint=ENDPOINT,
    access=ACCESS_KEY,
    secret=SECRET_KEY)
print("Vast Cluster version: ", session.api.vast_version)
Vast Cluster version:  (5, 1, 0, 131)

Import Parquet Files#

The following example will use Pandas to load a local parquet file and insert the data into a VAST DB table

# Set your parameters and credentials

TABLE_NAME = 'flights'
# print parquet file name
print(f"{LOCAL_FILE_PATH=}")
LOCAL_FILE_PATH='flights.parquet'
import pandas as pd
import pyarrow.parquet as pq

DF = pd.read_parquet(LOCAL_FILE_PATH)

# Read Parquet file directly into PyArrow table
ARROW_TABLE = pq.read_table(LOCAL_FILE_PATH)
ARROW_SCHEMA = ARROW_TABLE.schema
def create_table(database_name, database_schema, table_name, arrow_schema):
    
    with session.transaction() as tx:
        bucket = tx.bucket(database_name)
    
        # first retrieve the schema
        try:
            schema = bucket.schema(name=database_schema, fail_if_missing=False)
            print(schema)
        except Exception as e:
            print("Schema doesn't exist:", e)
    
        if schema:
            try:
                table = schema.create_table(table_name=table_name, columns=arrow_schema)
                print(f"Table created: {table.name}")
            except TableExists as e:
                print("Couldn't create table because it already exists:", e)
            except Exception as e:
                print("Couldn't create table:", e)
create_table(DATABASE_NAME, DATABASE_SCHEMA, TABLE_NAME, ARROW_SCHEMA)
Schema(name='python-sdk-schema', bucket=Bucket(name='demo-database', tx=Transaction(id=0x0000300000000036)))
Table created: flights
def insert_dataframe_to_database(database_name, database_schema, table_name, record_batch):
    with session.transaction() as tx:
        try:
            schema = tx.bucket(database_name).schema(name=database_schema, fail_if_missing=False)
            if schema:
                try:
                    table = schema.table(name=table_name)
                    table.insert(record_batch)
                    print("Data inserted.")
                except Exception as e:
                    print("Couldn't insert data:", e)
        except Exception as e:
            print("Schema doesn't exist:", e)
RECORD_BATCH = pa.RecordBatch.from_pandas(DF)

insert_dataframe_to_database(DATABASE_NAME, DATABASE_SCHEMA, TABLE_NAME, RECORD_BATCH)
Data inserted.
with session.transaction() as tx:
    try:
        schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
        if schema:
            try:
                table = schema.table(name=TABLE_NAME)
                reader = table.select()
                pyarrow_table = pa.Table.from_batches(reader)
                df = pyarrow_table.to_pandas()
                print(f"Listing rows in {TABLE_NAME}")
                display(df)
            except Exception as e:
                print("Couldn't select data:", e)
    except Exception as e:
        print("Schema doesn't exist:", e)
Listing rows in flights
FL_DATE DEP_DELAY ARR_DELAY AIR_TIME DISTANCE DEP_TIME ARR_TIME
0 2006-01-01 5 19 350 2475 9.083333 12.483334
1 2006-01-02 167 216 343 2475 11.783334 15.766666
2 2006-01-03 -7 -2 344 2475 8.883333 12.133333
3 2006-01-04 -5 -13 331 2475 8.916667 11.950000
4 2006-01-05 -3 -17 321 2475 8.950000 11.883333
... ... ... ... ... ... ... ...
999995 2006-01-19 5 4 244 1781 15.000000 17.350000
999996 2006-01-20 14 12 240 1781 15.150000 17.483334
999997 2006-01-21 9 12 241 1781 15.066667 17.483334
999998 2006-01-22 -2 8 242 1781 14.883333 17.416666
999999 2006-01-23 1 -12 232 1781 14.933333 17.083334

1000000 rows × 7 columns