Importing Files using API#

Important

This notebook is in the process of being migrated to Vast Data Platform Field Docs. It will probably not run yet.

See also

The Vast DB SDK API Documentation is available here.

Overview#

Bulk import of data into VAST-DB, can be done with any method supported by the analytics/query engine.

Inserts, CTAS commands (CREATE TABLE AS [QUERY]), python scripts processing data frames, etc. Tools like Spark make this easy, but this type of processing will be slow compared to the LOAD operations supported by various databases. VAST-DB includes LOAD capabilities.

Ingesting Serialized Data Objects (LOAD) Parquet objects housed on VAST can be ingested directly into a VAST-DB table using an RPC call issued via the VAST-DB APIs. Issuing these calls is supported from Trino and Spark and this will usually be how these calls are made. Import process will look like this:

  • The target table with schema is prepared in VAST-DB

  • Data is placed in an S3 bucket on VAST storage

  • An RPC call is made to a VAST-DB CNode directing it to the object location(s) in VAST

  • The VAST-DB CNode cluster will then divide the workload and import the data directly from the file and into the prepared table.

Currently, Parquet is the only format supported for this operation, however creating additional filters for ORC, delimited text, CSV, JSON, etc. is a simple development task (if you want it, just ask). Bulk importation of data into VAST-DB, can be done with any method supported by the analytics/query engine. Inserts, CTAS commands (CREATE TABLE AS [QUERY]), python scripts processing data frames, etc. Tools like Spark make this easy, but this type of processing will be slow compared to the LOAD operations supported by various databases. VAST-DB includes LOAD capabilities.


  • Method: table.import_file

  • Usage: Imports a list of Parquet files into this table. The files must be on the VAST S3 server and be accessible using current credentials.

  • Args:

    • files_to_import (Iterable[str]): An iterable of file paths to import.

    • config (Optional[ImportConfig], optional): Configuration for the import operation. Defaults to None.

  • Raises:

    • errors.NotSupportedCommand: If the operation is not supported on the current table.


  • Method: table.import_partitioned_files

  • Usage: Imports a list of Parquet files into this table, with each file having its own partition values. The files must be on the VAST S3 server and be accessible using current credentials. Each file must have its own partition values defined as an Arrow RecordBatch.

  • Args:

    • files_and_partitions (Dict[str, pa.RecordBatch]): A dictionary mapping file paths to their corresponding partition values.

    • config (Optional[ImportConfig], optional): Configuration for the import operation. Defaults to None.

  • Raises:

    • errors.NotSupportedCommand: If the operation is not supported on the current table.

Create a database session#

This section creates a session as detailed here

!pip install --quiet vastdb
import os

# Change these variables to reflect your environment, E.g. 
#
# ENDPOINT = 'http://your_vast_endpoint:12345'
# DATABASE_NAME = ...

ENDPOINT = os.environ['ENDPOINT']
ACCESS_KEY = os.environ['ACCESS_KEY']
SECRET_KEY = os.environ['SECRET_KEY']

DATABASE_NAME = os.environ['DATABASE_NAME']

# Schema and Table will get created if they doesn't exist 
DATABASE_SCHEMA = os.environ['DATABASE_SCHEMA']
TABLE_NAME = os.environ['TABLE_NAME']

# S3 File Details
BUCKET_NAME = os.environ['BUCKET_NAME']
LOCAL_FILE_PATH = 'flights.parquet'
S3_FILE_KEY = 'pythonsdk/import/flights.parquet'

The following cell contains code you will need to run to establish a session.

The cell is hidden because creating a connection is not the key focus of this tutorial.

Hide code cell source
print(f"""
{ENDPOINT=} 
{ACCESS_KEY=}
---
{DATABASE_NAME=}
{DATABASE_SCHEMA=}
{TABLE_NAME=}
---
{BUCKET_NAME=}
{LOCAL_FILE_PATH=}
{S3_FILE_KEY=}
""")

import vastdb

session = vastdb.connect(
    endpoint=ENDPOINT,
    access=ACCESS_KEY,
    secret=SECRET_KEY)

print("Vast Cluster version: ", session.api.vast_version)

Import using the importer on Cnodes#

The importer expects that S3 files are available in an S3 Bucket. For the tutorial we will upload an example airline parquet file to a VAST S3 Bucket

Import our flight data parquet file into s3 bucket#

The next cell creates some utility functions, upload_to_s3 and list_objects_in_bucket. The cell is hidden because it is boilerplate code.

Hide code cell source
import os
import boto3
from botocore.exceptions import NoCredentialsError

def upload_to_s3(local_file, bucket_name, s3_file, aws_access_key_id, aws_secret_access_key, s3_endpoint):
    # Create an S3 client with custom endpoint
    s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, endpoint_url=s3_endpoint)

    try:
        # Upload the file
        s3.upload_file(local_file, bucket_name, s3_file)
        print(f"File {local_file} uploaded to {bucket_name}/{s3_file} successfully.")
    except FileNotFoundError:
        print(f"The file {local_file} was not found.")
    except NoCredentialsError:
        print("Credentials not available.")
        
def list_objects_in_bucket(bucket_name, aws_access_key_id, aws_secret_access_key, s3_endpoint, prefix=None):
    # Create an S3 client with custom endpoint
    s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, endpoint_url=s3_endpoint)

    try:
        # List objects in the bucket with optional prefix
        kwargs = {'Bucket': bucket_name}
        if prefix:
            kwargs['Prefix'] = prefix

        response = s3.list_objects_v2(**kwargs)

        if 'Contents' in response:
            print(f"Objects in bucket {bucket_name} with prefix '{prefix}':")
            for obj in response['Contents']:
                print(f">>> obj['Key']")
        else:
            print(f"No objects found in bucket {bucket_name} with prefix '{prefix}'.")
    except NoCredentialsError:
        print("Credentials not available.")
upload_to_s3(
  LOCAL_FILE_PATH, 
  BUCKET_NAME, 
  S3_FILE_KEY, 
  ACCESS_KEY, 
  SECRET_KEY, 
  ENDPOINT)

list_objects_in_bucket(
  BUCKET_NAME, 
  ACCESS_KEY, 
  SECRET_KEY, 
  ENDPOINT, 
  S3_FILE_KEY)

Create Schema and use the API to start the import on the Cnodes#

import pyarrow as pa
from vastdb.errors import TableExists

# create table schema before using the importer API

ARROW_SCHEMA = pa.schema([
    ('FL_DATE', pa.date32()), 
    ('DEP_DELAY', pa.int16()),
    ('ARR_DELAY', pa.int16()),
    ('AIR_TIME', pa.int16()),
    ('DISTANCE', pa.int16()),
    ('DEP_TIME', pa.float32()),
    ('ARR_TIME', pa.float32())
])

with session.transaction() as tx:
    bucket = tx.bucket(DATABASE_NAME)

    # first retrieve the schema
    try:
        schema = bucket.schema(name=DATABASE_SCHEMA, fail_if_missing=False)
        print(schema)
    except Exception as e:
        print("Schema doesn't exist:", e)

    if schema:
        try:
            table = schema.create_table(table_name=TABLE_NAME, columns=ARROW_SCHEMA)
            print(f"Table created: {table.name}")
        except TableExists as e:
            print("Couldn't create table because it already exists:", e)
        except Exception as e:
            print("Couldn't create table:", e)
Schema(name='python-sdk-schema', bucket=Bucket(name='demo-database', tx=Transaction(id=0x0000300000000033)))
Table created: flights_cnode_importer
FILES_TO_IMPORT = [f'/{BUCKET_NAME}/{S3_FILE_KEY}']
print(f'Importing {FILES_TO_IMPORT}')

with session.transaction() as tx:
    bucket = tx.bucket(DATABASE_NAME)

    # first retrieve the schema
    try:
        schema = bucket.schema(name=DATABASE_SCHEMA, fail_if_missing=False)
        table = schema.table(TABLE_NAME)
        print(schema)
    except Exception as e:
        print("Schema doesn't exist:", e)

    if table:
        try:
            table.import_files(files_to_import=FILES_TO_IMPORT)
        except Exception as e:
            import sys, traceback
            traceback.print_exc(file=sys.stdout)
            print("Couldn't import files:", e)
Importing ['/demo-database/pythonsdk/import/flights.parquet']
Schema(name='python-sdk-schema', bucket=Bucket(name='demo-database', tx=Transaction(id=0x0000300000000034)))

Let’s verify the import…

with session.transaction() as tx:
    try:
        schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
        if schema:
            try:
                table = schema.table(name=TABLE_NAME)
                reader = table.select()
                pyarrow_table = pa.Table.from_batches(reader)
                df = pyarrow_table.to_pandas()
                print(f"Listing rows in {TABLE_NAME}")
                display(df)
            except Exception as e:
                print("Couldn't select data:", e)
    except Exception as e:
        print("Schema doesn't exist:", e)
Listing rows in flights_cnode_importer
FL_DATE DEP_DELAY ARR_DELAY AIR_TIME DISTANCE DEP_TIME ARR_TIME
0 2006-01-01 5 19 350 2475 9.083333 12.483334
1 2006-01-02 167 216 343 2475 11.783334 15.766666
2 2006-01-03 -7 -2 344 2475 8.883333 12.133333
3 2006-01-04 -5 -13 331 2475 8.916667 11.950000
4 2006-01-05 -3 -17 321 2475 8.950000 11.883333
... ... ... ... ... ... ... ...
999995 2006-01-19 5 4 244 1781 15.000000 17.350000
999996 2006-01-20 14 12 240 1781 15.150000 17.483334
999997 2006-01-21 9 12 241 1781 15.066667 17.483334
999998 2006-01-22 -2 8 242 1781 14.883333 17.416666
999999 2006-01-23 1 -12 232 1781 14.933333 17.083334

1000000 rows × 7 columns