Importing Files using Pandas#
Important
This notebook is in the process of being migrated to Vast Data Platform Field Docs. It will probably not run yet.
See also
The Vast DB SDK API Documentation is available here.
Install and Import the Vast DB SDK#
Before doing anything else, we need to import the vastdb api library.
Note that we also import annotations. This mode makes Python’s behavior more strict, including raising errors for some cases where variables are accidentally undefined.
!pip install vastdb | tail -5
?25hDownloading markdown_it_py-3.0.0-py3-none-any.whl (87 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 87.5/87.5 kB 6.8 MB/s eta 0:00:00
?25hDownloading mdurl-0.1.2-py3-none-any.whl (10.0 kB)
Installing collected packages: flatbuffers, xmltodict, sqlglot, parsy, mdurl, jmespath, bidict, backoff, atpublic, markdown-it-py, botocore, aws-requests-auth, s3transfer, rich, ibis-framework, boto3, vastdb
Successfully installed atpublic-4.1.0 aws-requests-auth-0.4.3 backoff-2.2.1 bidict-0.23.1 boto3-1.34.126 botocore-1.34.126 flatbuffers-24.3.25 ibis-framework-9.0.0 jmespath-1.0.1 markdown-it-py-3.0.0 mdurl-0.1.2 parsy-2.1 rich-13.7.1 s3transfer-0.10.1 sqlglot-23.12.2 vastdb-0.1.7 xmltodict-0.13.0
import vastdb
from __future__ import annotations # Enable stricter type checking
Creating the initial session#
In the code below, we read the connection details from environment variables to enable automated teesting of the notebook.
Change these to reflect your environment, e.g.
ENDPOINT = 'http://your_vast_endpoint:12345'
DATABASE_NAME = ...
import os
ENDPOINT = os.environ['ENDPOINT']
ACCESS_KEY = os.environ['ACCESS_KEY']
SECRET_KEY = os.environ['SECRET_KEY']
DATABASE_NAME = os.environ['DATABASE_NAME']
# Schema and Table will get created if they doesn't exist
DATABASE_SCHEMA = os.environ['DATABASE_SCHEMA']
TABLE_NAME = os.environ['TABLE_NAME']
# S3 File Details
BUCKET_NAME = os.environ['BUCKET_NAME']
LOCAL_FILE_PATH = 'flights.parquet'
S3_FILE_KEY = 'pythonsdk/import/flights.parquet'
print(f"""
{ENDPOINT=}
{ACCESS_KEY=}
---
{DATABASE_NAME=}
{DATABASE_SCHEMA=}
{TABLE_NAME=}
---
{BUCKET_NAME=}
{LOCAL_FILE_PATH=}
{S3_FILE_KEY=}
""")
import pyarrow as pa
import vastdb
import os
session = vastdb.connect(
endpoint=ENDPOINT,
access=ACCESS_KEY,
secret=SECRET_KEY)
print("Vast Cluster version: ", session.api.vast_version)
Vast Cluster version: (5, 1, 0, 131)
Import Parquet Files#
The following example will use Pandas to load a local parquet file and insert the data into a VAST DB table
# Set your parameters and credentials
TABLE_NAME = 'flights'
# print parquet file name
print(f"{LOCAL_FILE_PATH=}")
LOCAL_FILE_PATH='flights.parquet'
import pandas as pd
import pyarrow.parquet as pq
DF = pd.read_parquet(LOCAL_FILE_PATH)
# Read Parquet file directly into PyArrow table
ARROW_TABLE = pq.read_table(LOCAL_FILE_PATH)
ARROW_SCHEMA = ARROW_TABLE.schema
def create_table(database_name, database_schema, table_name, arrow_schema):
with session.transaction() as tx:
bucket = tx.bucket(database_name)
# first retrieve the schema
try:
schema = bucket.schema(name=database_schema, fail_if_missing=False)
print(schema)
except Exception as e:
print("Schema doesn't exist:", e)
if schema:
try:
table = schema.create_table(table_name=table_name, columns=arrow_schema)
print(f"Table created: {table.name}")
except TableExists as e:
print("Couldn't create table because it already exists:", e)
except Exception as e:
print("Couldn't create table:", e)
create_table(DATABASE_NAME, DATABASE_SCHEMA, TABLE_NAME, ARROW_SCHEMA)
Schema(name='python-sdk-schema', bucket=Bucket(name='demo-database', tx=Transaction(id=0x0000300000000036)))
Table created: flights
def insert_dataframe_to_database(database_name, database_schema, table_name, record_batch):
with session.transaction() as tx:
try:
schema = tx.bucket(database_name).schema(name=database_schema, fail_if_missing=False)
if schema:
try:
table = schema.table(name=table_name)
table.insert(record_batch)
print("Data inserted.")
except Exception as e:
print("Couldn't insert data:", e)
except Exception as e:
print("Schema doesn't exist:", e)
RECORD_BATCH = pa.RecordBatch.from_pandas(DF)
insert_dataframe_to_database(DATABASE_NAME, DATABASE_SCHEMA, TABLE_NAME, RECORD_BATCH)
Data inserted.
with session.transaction() as tx:
try:
schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
if schema:
try:
table = schema.table(name=TABLE_NAME)
reader = table.select()
pyarrow_table = pa.Table.from_batches(reader)
df = pyarrow_table.to_pandas()
print(f"Listing rows in {TABLE_NAME}")
display(df)
except Exception as e:
print("Couldn't select data:", e)
except Exception as e:
print("Schema doesn't exist:", e)
Listing rows in flights
FL_DATE | DEP_DELAY | ARR_DELAY | AIR_TIME | DISTANCE | DEP_TIME | ARR_TIME | |
---|---|---|---|---|---|---|---|
0 | 2006-01-01 | 5 | 19 | 350 | 2475 | 9.083333 | 12.483334 |
1 | 2006-01-02 | 167 | 216 | 343 | 2475 | 11.783334 | 15.766666 |
2 | 2006-01-03 | -7 | -2 | 344 | 2475 | 8.883333 | 12.133333 |
3 | 2006-01-04 | -5 | -13 | 331 | 2475 | 8.916667 | 11.950000 |
4 | 2006-01-05 | -3 | -17 | 321 | 2475 | 8.950000 | 11.883333 |
... | ... | ... | ... | ... | ... | ... | ... |
999995 | 2006-01-19 | 5 | 4 | 244 | 1781 | 15.000000 | 17.350000 |
999996 | 2006-01-20 | 14 | 12 | 240 | 1781 | 15.150000 | 17.483334 |
999997 | 2006-01-21 | 9 | 12 | 241 | 1781 | 15.066667 | 17.483334 |
999998 | 2006-01-22 | -2 | 8 | 242 | 1781 | 14.883333 | 17.416666 |
999999 | 2006-01-23 | 1 | -12 | 232 | 1781 | 14.933333 | 17.083334 |
1000000 rows × 7 columns