Importing Files using API#
Important
This notebook is in the process of being migrated to Vast Data Platform Field Docs. It will probably not run yet.
See also
The Vast DB SDK API Documentation is available here.
Overview#
Bulk import of data into VAST-DB, can be done with any method supported by the analytics/query engine.
Inserts, CTAS commands (CREATE TABLE AS [QUERY]), python scripts processing data frames, etc. Tools like Spark make this easy, but this type of processing will be slow compared to the LOAD operations supported by various databases. VAST-DB includes LOAD capabilities.
Ingesting Serialized Data Objects (LOAD) Parquet objects housed on VAST can be ingested directly into a VAST-DB table using an RPC call issued via the VAST-DB APIs. Issuing these calls is supported from Trino and Spark and this will usually be how these calls are made. Import process will look like this:
The target table with schema is prepared in VAST-DB
Data is placed in an S3 bucket on VAST storage
An RPC call is made to a VAST-DB CNode directing it to the object location(s) in VAST
The VAST-DB CNode cluster will then divide the workload and import the data directly from the file and into the prepared table.
Currently, Parquet is the only format supported for this operation, however creating additional filters for ORC, delimited text, CSV, JSON, etc. is a simple development task (if you want it, just ask). Bulk importation of data into VAST-DB, can be done with any method supported by the analytics/query engine. Inserts, CTAS commands (CREATE TABLE AS [QUERY]), python scripts processing data frames, etc. Tools like Spark make this easy, but this type of processing will be slow compared to the LOAD operations supported by various databases. VAST-DB includes LOAD capabilities.
Method: table.import_file
Usage: Imports a list of Parquet files into this table. The files must be on the VAST S3 server and be accessible using current credentials.
Args:
files_to_import
(Iterable[str]): An iterable of file paths to import.config
(Optional[ImportConfig], optional): Configuration for the import operation. Defaults to None.
Raises:
errors.NotSupportedCommand
: If the operation is not supported on the current table.
Method: table.import_partitioned_files
Usage: Imports a list of Parquet files into this table, with each file having its own partition values. The files must be on the VAST S3 server and be accessible using current credentials. Each file must have its own partition values defined as an Arrow RecordBatch.
Args:
files_and_partitions
(Dict[str, pa.RecordBatch]): A dictionary mapping file paths to their corresponding partition values.config
(Optional[ImportConfig], optional): Configuration for the import operation. Defaults to None.
Raises:
errors.NotSupportedCommand
: If the operation is not supported on the current table.
Create a database session#
This section creates a session as detailed here
!pip install --quiet vastdb
import os
# Change these variables to reflect your environment, E.g.
#
# ENDPOINT = 'http://your_vast_endpoint:12345'
# DATABASE_NAME = ...
ENDPOINT = os.environ['ENDPOINT']
ACCESS_KEY = os.environ['ACCESS_KEY']
SECRET_KEY = os.environ['SECRET_KEY']
DATABASE_NAME = os.environ['DATABASE_NAME']
# Schema and Table will get created if they doesn't exist
DATABASE_SCHEMA = os.environ['DATABASE_SCHEMA']
TABLE_NAME = os.environ['TABLE_NAME']
# S3 File Details
BUCKET_NAME = os.environ['BUCKET_NAME']
LOCAL_FILE_PATH = 'flights.parquet'
S3_FILE_KEY = 'pythonsdk/import/flights.parquet'
The following cell contains code you will need to run to establish a session.
The cell is hidden because creating a connection is not the key focus of this tutorial.
Show code cell source
print(f"""
{ENDPOINT=}
{ACCESS_KEY=}
---
{DATABASE_NAME=}
{DATABASE_SCHEMA=}
{TABLE_NAME=}
---
{BUCKET_NAME=}
{LOCAL_FILE_PATH=}
{S3_FILE_KEY=}
""")
import vastdb
session = vastdb.connect(
endpoint=ENDPOINT,
access=ACCESS_KEY,
secret=SECRET_KEY)
print("Vast Cluster version: ", session.api.vast_version)
Import using the importer on Cnodes#
The importer expects that S3 files are available in an S3 Bucket. For the tutorial we will upload an example airline parquet file to a VAST S3 Bucket
Import our flight data parquet file into s3 bucket#
The next cell creates some utility functions, upload_to_s3
and list_objects_in_bucket
. The cell is hidden because it is boilerplate code.
Show code cell source
import os
import boto3
from botocore.exceptions import NoCredentialsError
def upload_to_s3(local_file, bucket_name, s3_file, aws_access_key_id, aws_secret_access_key, s3_endpoint):
# Create an S3 client with custom endpoint
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, endpoint_url=s3_endpoint)
try:
# Upload the file
s3.upload_file(local_file, bucket_name, s3_file)
print(f"File {local_file} uploaded to {bucket_name}/{s3_file} successfully.")
except FileNotFoundError:
print(f"The file {local_file} was not found.")
except NoCredentialsError:
print("Credentials not available.")
def list_objects_in_bucket(bucket_name, aws_access_key_id, aws_secret_access_key, s3_endpoint, prefix=None):
# Create an S3 client with custom endpoint
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, endpoint_url=s3_endpoint)
try:
# List objects in the bucket with optional prefix
kwargs = {'Bucket': bucket_name}
if prefix:
kwargs['Prefix'] = prefix
response = s3.list_objects_v2(**kwargs)
if 'Contents' in response:
print(f"Objects in bucket {bucket_name} with prefix '{prefix}':")
for obj in response['Contents']:
print(f">>> obj['Key']")
else:
print(f"No objects found in bucket {bucket_name} with prefix '{prefix}'.")
except NoCredentialsError:
print("Credentials not available.")
upload_to_s3(
LOCAL_FILE_PATH,
BUCKET_NAME,
S3_FILE_KEY,
ACCESS_KEY,
SECRET_KEY,
ENDPOINT)
list_objects_in_bucket(
BUCKET_NAME,
ACCESS_KEY,
SECRET_KEY,
ENDPOINT,
S3_FILE_KEY)
Create Schema and use the API to start the import on the Cnodes#
import pyarrow as pa
from vastdb.errors import TableExists
# create table schema before using the importer API
ARROW_SCHEMA = pa.schema([
('FL_DATE', pa.date32()),
('DEP_DELAY', pa.int16()),
('ARR_DELAY', pa.int16()),
('AIR_TIME', pa.int16()),
('DISTANCE', pa.int16()),
('DEP_TIME', pa.float32()),
('ARR_TIME', pa.float32())
])
with session.transaction() as tx:
bucket = tx.bucket(DATABASE_NAME)
# first retrieve the schema
try:
schema = bucket.schema(name=DATABASE_SCHEMA, fail_if_missing=False)
print(schema)
except Exception as e:
print("Schema doesn't exist:", e)
if schema:
try:
table = schema.create_table(table_name=TABLE_NAME, columns=ARROW_SCHEMA)
print(f"Table created: {table.name}")
except TableExists as e:
print("Couldn't create table because it already exists:", e)
except Exception as e:
print("Couldn't create table:", e)
Schema(name='python-sdk-schema', bucket=Bucket(name='demo-database', tx=Transaction(id=0x0000300000000033)))
Table created: flights_cnode_importer
FILES_TO_IMPORT = [f'/{BUCKET_NAME}/{S3_FILE_KEY}']
print(f'Importing {FILES_TO_IMPORT}')
with session.transaction() as tx:
bucket = tx.bucket(DATABASE_NAME)
# first retrieve the schema
try:
schema = bucket.schema(name=DATABASE_SCHEMA, fail_if_missing=False)
table = schema.table(TABLE_NAME)
print(schema)
except Exception as e:
print("Schema doesn't exist:", e)
if table:
try:
table.import_files(files_to_import=FILES_TO_IMPORT)
except Exception as e:
import sys, traceback
traceback.print_exc(file=sys.stdout)
print("Couldn't import files:", e)
Importing ['/demo-database/pythonsdk/import/flights.parquet']
Schema(name='python-sdk-schema', bucket=Bucket(name='demo-database', tx=Transaction(id=0x0000300000000034)))
Let’s verify the import…
with session.transaction() as tx:
try:
schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
if schema:
try:
table = schema.table(name=TABLE_NAME)
reader = table.select()
pyarrow_table = pa.Table.from_batches(reader)
df = pyarrow_table.to_pandas()
print(f"Listing rows in {TABLE_NAME}")
display(df)
except Exception as e:
print("Couldn't select data:", e)
except Exception as e:
print("Schema doesn't exist:", e)
Listing rows in flights_cnode_importer
FL_DATE | DEP_DELAY | ARR_DELAY | AIR_TIME | DISTANCE | DEP_TIME | ARR_TIME | |
---|---|---|---|---|---|---|---|
0 | 2006-01-01 | 5 | 19 | 350 | 2475 | 9.083333 | 12.483334 |
1 | 2006-01-02 | 167 | 216 | 343 | 2475 | 11.783334 | 15.766666 |
2 | 2006-01-03 | -7 | -2 | 344 | 2475 | 8.883333 | 12.133333 |
3 | 2006-01-04 | -5 | -13 | 331 | 2475 | 8.916667 | 11.950000 |
4 | 2006-01-05 | -3 | -17 | 321 | 2475 | 8.950000 | 11.883333 |
... | ... | ... | ... | ... | ... | ... | ... |
999995 | 2006-01-19 | 5 | 4 | 244 | 1781 | 15.000000 | 17.350000 |
999996 | 2006-01-20 | 14 | 12 | 240 | 1781 | 15.150000 | 17.483334 |
999997 | 2006-01-21 | 9 | 12 | 241 | 1781 | 15.066667 | 17.483334 |
999998 | 2006-01-22 | -2 | 8 | 242 | 1781 | 14.883333 | 17.416666 |
999999 | 2006-01-23 | 1 | -12 | 232 | 1781 | 14.933333 | 17.083334 |
1000000 rows × 7 columns