Python SDK - import netcdf#
Introduction: Streamlining NetCDF Data with VAST Database#
NetCDF (Network Common Data Form) files are a standard for storing scientific data, particularly large, multi-dimensional datasets like climate and weather information. While effective, managing and querying data directly from NetCDF files can be cumbersome, slow, and resource-intensive. This is where VAST Database offers a significant advantage.
Benefits of using VAST Database over direct NetCDF file handling:
Scalability and Performance: VAST is designed for high-performance analytics on massive datasets. It allows for efficient querying and analysis of NetCDF data at scale, significantly outperforming traditional file-based approaches.
Efficient Data Management: VAST provides a structured, database-driven approach to data management. This simplifies data organization, versioning, and access control, reducing the complexity associated with managing numerous NetCDF files.
Optimized Querying: VAST enables complex queries using SQL-like syntax, allowing for fast and flexible data retrieval. This is particularly beneficial for tasks like finding minimum/maximum values, filtering data based on conditions, and performing aggregations.
Parallel Processing: VAST leverages parallel processing to accelerate data ingestion and querying. This is crucial for handling the large volumes of data typically found in NetCDF files.
Simplified Data Access: Instead of dealing with individual NetCDF files and their specific structures, VAST provides a unified interface for accessing and analyzing the data.
Integration with Python and other tools: VastDB python SDK allows easy integration with python, and tools like pandas and pyarrow, for data manipulation and analysis.
Reduced I/O Overhead: By storing data in a columnar format, VAST minimizes I/O operations, leading to faster query execution compared to row-based file access.
The provided Python script demonstrates how to:
Ingest NetCDF data into a VAST Database: It reads NetCDF files, converts them to Pandas DataFrames, and then loads them into a VAST table using PyArrow for efficient data transfer.
Query and Analyze data within VAST: It utilizes the VAST Python SDK to perform analytical queries, such as finding minimum and maximum values for a specified column and retrieving rows containing the maximum value.
By leveraging VAST Database, users can unlock the full potential of their NetCDF data, enabling faster, more efficient, and scalable analysis.
Setup#
Install Vast DB sdk#
!pip install vastdb
Install NETCDF and Xarray Stuff#
!conda install -y netcdf4 h5netcdf
!conda install xarray netCDF4 --yes
Import NETCDF Files into VAST Database#
Adjust File Location, Database Connection details before executing the script.
import os # Ensure os is imported
import time
import glob
import multiprocessing
import xarray as xr
import pandas as pd
import math
import logging
# Import VAST SDK and PyArrow
import vastdb
import pyarrow as pa
from vastdb import errors as vast_errors # Import specific errors module
# --- Configuration ---
# VAST Cluster Connection Details from Environment Variables
VAST_ENDPOINT = os.getenv("VASTDB_ENDPOINT")
ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY")
SECRET_KEY = os.getenv("VASTDB_SECRET_KEY")
# Data and Table Details (Updated as per request)
SOURCE_DIRECTORY = '/home/jovyan/datastore/weather_data/netcdf' # Directory containing .nc files (keep or adjust as needed)
TARGET_BUCKET = 'christian-db' # Updated Bucket Name
TARGET_SCHEMA = 'netcdfdemo' # Updated Schema Name
TARGET_TABLE_NAME = 'weather' # Updated Table Name
# Performance Tuning
MAX_PROCESSES = 18 # Adjust based on your client machine's resources
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(process)d - %(levelname)s - %(message)s')
# --- Validation for Environment Variables ---
if not VAST_ENDPOINT:
logging.error("Environment variable VASTDB_ENDPOINT is not set.")
exit("Error: VASTDB_ENDPOINT environment variable missing.") # Exit if essential config is missing
if not ACCESS_KEY:
logging.error("Environment variable VASTDB_ACCESS_KEY is not set.")
exit("Error: VASTDB_ACCESS_KEY environment variable missing.") # Exit if essential config is missing
if not SECRET_KEY:
logging.error("Environment variable VASTDB_SECRET_KEY is not set.")
exit("Error: VASTDB_SECRET_KEY environment variable missing.") # Exit if essential config is missing
logging.info(f"Using VAST Endpoint: {VAST_ENDPOINT}")
logging.info(f"Using VAST Bucket: {TARGET_BUCKET}")
logging.info(f"Using Target Schema: {TARGET_SCHEMA}")
logging.info(f"Using Target Table: {TARGET_TABLE_NAME}")
# --- Helper Functions ---
def create_vast_session(endpoint, access_key, secret_key):
"""Creates a VAST SDK session instance."""
# Function remains the same, uses provided arguments
try:
session = vastdb.connect(
endpoint=endpoint,
access=access_key,
secret=secret_key,
# verify_ssl=False # Add if needed
)
logging.info(f"Successfully connected VAST session to endpoint: {endpoint}")
return session
except Exception as e:
logging.error(f"Failed to create VAST session: {e}")
raise
def get_pyarrow_schema_from_dataframe(df):
"""Generates a pyarrow.Schema from a Pandas DataFrame."""
# Function remains the same
try:
arrow_schema = pa.Schema.from_pandas(df, preserve_index=False)
logging.info("Inferred PyArrow schema from DataFrame.")
return arrow_schema
except Exception as e:
logging.error(f"Failed to create PyArrow schema from DataFrame: {e}")
raise
def create_vast_schema_and_table(session, bucket_name, schema_name, table_name, pa_schema):
"""Creates schema and table if they don't exist within a transaction."""
# Function remains the same, uses provided arguments
try:
with session.transaction() as tx:
bucket = tx.bucket(bucket_name)
try:
schema = bucket.schema(schema_name)
logging.info(f"Schema '{schema_name}' already exists in bucket '{bucket_name}'.")
except vast_errors.MissingSchema:
logging.info(f"Schema '{schema_name}' not found. Creating...")
schema = bucket.create_schema(schema_name)
logging.info(f"Schema '{schema_name}' created successfully.")
try:
table = schema.table(table_name)
logging.info(f"Table '{schema_name}.{table_name}' already exists.")
except vast_errors.MissingTable:
logging.info(f"Table '{schema_name}.{table_name}' not found. Creating...")
table = schema.create_table(table_name, columns=pa_schema)
logging.info(f"Table '{schema_name}.{table_name}' created successfully.")
except (vast_errors.SchemaExists, vast_errors.TableExists) as e:
logging.warning(f"Schema or Table creation conflict (likely already exists): {e}")
except vast_errors.HttpError as e:
logging.error(f"VAST HTTP Error during schema/table setup: {e.status} - {e.message}")
raise
except Exception as e:
logging.error(f"Unexpected error during schema/table setup: {e}")
raise
def netcdf_to_dataframe(file_path):
"""Converts a NetCDF file to a Pandas DataFrame."""
# Function remains the same
start_time = time.time()
try:
with xr.open_dataset(file_path) as ds:
df = ds.to_dataframe().reset_index()
elapsed = time.time() - start_time
logging.info(f"NetCDF file '{os.path.basename(file_path)}' converted to DataFrame in {elapsed:.2f}s. Shape: {df.shape}")
return df
except Exception as e:
logging.error(f"Error converting NetCDF file '{file_path}': {e}")
return None
def insert_pyarrow_table_to_vast(session, bucket_name, schema_name, table_name, arrow_table):
"""Inserts a PyArrow Table into a VAST Data table within a transaction."""
# Function remains the same, uses provided arguments
if arrow_table is None or arrow_table.num_rows == 0:
logging.warning(f"Skipping insertion into '{schema_name}.{table_name}' due to empty or invalid Arrow Table.")
return False
logging.info(f"Inserting {arrow_table.num_rows} rows into table '{schema_name}.{table_name}'...")
start_time = time.time()
inserted = False
try:
with session.transaction() as tx:
table = tx.bucket(bucket_name).schema(schema_name).table(table_name)
table.insert(arrow_table)
elapsed = time.time() - start_time
logging.info(f"{arrow_table.num_rows} rows inserted into table '{schema_name}.{table_name}' in {elapsed:.2f}s.")
inserted = True
except vast_errors.HttpError as e:
logging.error(f"VAST HTTP Error inserting data into '{schema_name}.{table_name}': {e.status} - {e.message}")
except Exception as e:
logging.error(f"Unexpected error inserting data into '{schema_name}.{table_name}': {e}")
return inserted
# --- Multiprocessing Worker Function ---
def process_file_batch(file_batch, bucket, schema_name, table_name, vast_endpoint, access_key, secret_key, pa_schema_for_conversion):
"""
Worker function for multiprocessing.
Connects to VAST, processes a batch of files, converts to PyArrow Table, and inserts data.
"""
# Function remains the same, uses provided arguments
worker_session = None
files_processed = 0
files_failed = 0
process_id = os.getpid()
logging.info(f"Worker started processing batch of {len(file_batch)} files.")
try:
worker_session = create_vast_session(vast_endpoint, access_key, secret_key)
for file_path in file_batch:
logging.info(f"Processing file: {os.path.basename(file_path)}")
df = None
arrow_table = None
try:
df = netcdf_to_dataframe(file_path)
if df is not None and not df.empty:
arrow_table = pa.Table.from_pandas(df, schema=pa_schema_for_conversion, preserve_index=False)
logging.info(f"Converted DataFrame to Arrow Table for {os.path.basename(file_path)}. Shape: {arrow_table.shape}")
success = insert_pyarrow_table_to_vast(worker_session, bucket, schema_name, table_name, arrow_table)
if success:
files_processed += 1
else:
files_failed += 1
elif df is not None and df.empty:
logging.warning(f"Skipping empty DataFrame from file {file_path}")
files_processed += 1
else:
files_failed += 1 # Failure during NetCDF conversion
except Exception as e:
logging.error(f"Error processing/inserting file {file_path}: {e}")
files_failed += 1
finally:
del df
del arrow_table
except Exception as e:
logging.error(f"Worker process failed unexpectedly: {e}")
files_failed = len(file_batch) - files_processed
finally:
logging.info(f"Worker finished. Processed: {files_processed}, Failed: {files_failed}")
return files_processed, files_failed
# --- Main Execution ---
def main():
"""Main function to orchestrate the NetCDF to VAST Data process."""
# Ensure global config variables are accessible if needed, or pass explicitly
global VAST_ENDPOINT, ACCESS_KEY, SECRET_KEY, TARGET_BUCKET, TARGET_SCHEMA, TARGET_TABLE_NAME
logging.info("Starting NetCDF to VAST Data ingestion process using vastdb_sdk...")
script_start_time = time.time()
# 1. Find NetCDF files
logging.info(f"Searching for NetCDF files (*ncdd*.nc) in: {SOURCE_DIRECTORY}")
nc_files = glob.glob(os.path.join(SOURCE_DIRECTORY, '**', '*ncdd*.nc'), recursive=True)
num_files = len(nc_files)
logging.info(f"Found {num_files} NetCDF files.")
if num_files == 0:
logging.warning("No NetCDF files found matching the pattern. Exiting.")
return
# 2. Create main VAST session (using config from env vars)
try:
# Pass the globally defined config values
main_session = create_vast_session(VAST_ENDPOINT, ACCESS_KEY, SECRET_KEY)
except Exception:
logging.error("Could not establish initial VAST session. Exiting.")
return
# 3. Determine PyArrow Schema and Create Table/Schema (if needed)
pa_schema = None
try:
logging.info(f"Reading first file to determine schema: {nc_files[0]}")
first_df = netcdf_to_dataframe(nc_files[0])
if first_df is None:
logging.error("Failed to read the first NetCDF file to determine schema. Exiting.")
return
pa_schema = get_pyarrow_schema_from_dataframe(first_df)
if not pa_schema:
logging.error("Could not determine PyArrow schema from the first DataFrame. Exiting.")
return
logging.info(f"Derived PyArrow Schema: {pa_schema}")
del first_df
# Create schema and table using the derived PyArrow schema and updated names
create_vast_schema_and_table(main_session, TARGET_BUCKET, TARGET_SCHEMA, TARGET_TABLE_NAME, pa_schema)
except Exception as e:
logging.error(f"Failed during schema derivation or table creation: {e}")
return
finally:
pass
# 4. Prepare for Multiprocessing
num_processes = min(num_files, MAX_PROCESSES)
if num_processes <= 0 :
logging.error("Invalid number of processes calculated. Exiting.")
return
files_per_process = math.ceil(num_files / num_processes)
file_batches = [nc_files[i:i + files_per_process] for i in range(0, num_files, files_per_process)]
logging.info(f"Distributing {num_files} files across {num_processes} worker processes.")
# 5. Run Multiprocessing Pool
total_processed = 0
total_failed = 0
# Pass updated config values to the worker processes
pool_args = [(batch, TARGET_BUCKET, TARGET_SCHEMA, TARGET_TABLE_NAME, VAST_ENDPOINT, ACCESS_KEY, SECRET_KEY, pa_schema) for batch in file_batches]
with multiprocessing.Pool(processes=num_processes) as pool:
results_async = pool.starmap_async(process_file_batch, pool_args)
try:
worker_outputs = results_async.get()
for processed_count, failed_count in worker_outputs:
total_processed += processed_count
total_failed += failed_count
except KeyboardInterrupt:
logging.warning("Process interrupted by user. Terminating pool.")
pool.terminate()
pool.join()
total_failed = num_files - total_processed
except Exception as e:
logging.error(f"Error occurred during multiprocessing execution: {e}")
total_failed = num_files - total_processed
# 6. Log Summary
script_elapsed_time = time.time() - script_start_time
logging.info("--- Ingestion Summary ---")
logging.info(f"Target Bucket: {TARGET_BUCKET}")
logging.info(f"Target Schema.Table: {TARGET_SCHEMA}.{TARGET_TABLE_NAME}")
logging.info(f"Total NetCDF files found: {num_files}")
logging.info(f"Successfully processed files: {total_processed}")
logging.info(f"Failed files: {total_failed}")
logging.info(f"Total script execution time: {script_elapsed_time:.2f} seconds.")
logging.info("-------------------------")
if __name__ == "__main__":
# multiprocessing.set_start_method('spawn') # Consider uncommenting for macOS/Windows
main()
2025-03-26 11:14:43,927 - 4090262 - INFO - Using VAST Endpoint: http://172.200.203.1:80
2025-03-26 11:14:43,928 - 4090262 - INFO - Using VAST Bucket: christian-db
2025-03-26 11:14:43,929 - 4090262 - INFO - Using Target Schema: netcdfdemo
2025-03-26 11:14:43,929 - 4090262 - INFO - Using Target Table: weather
2025-03-26 11:14:43,933 - 4090262 - INFO - Starting NetCDF to VAST Data ingestion process using vastdb_sdk...
2025-03-26 11:14:43,933 - 4090262 - INFO - Searching for NetCDF files (*ncdd*.nc) in: /home/jovyan/datastore/weather_data/netcdf
2025-03-26 11:14:43,938 - 4090262 - INFO - Found 1 NetCDF files.
2025-03-26 11:14:43,944 - 4090262 - INFO - Successfully connected VAST session to endpoint: http://172.200.203.1:80
2025-03-26 11:14:43,945 - 4090262 - INFO - Reading first file to determine schema: /home/jovyan/datastore/weather_data/netcdf/netcdf_files/ncdd-202211-grd-scaled.nc
2025-03-26 11:14:58,435 - 4090262 - INFO - NetCDF file 'ncdd-202211-grd-scaled.nc' converted to DataFrame in 14.49s. Shape: (24763800, 7)
2025-03-26 11:14:58,453 - 4090262 - INFO - Inferred PyArrow schema from DataFrame.
2025-03-26 11:14:58,456 - 4090262 - INFO - Derived PyArrow Schema: time: timestamp[ns]
lat: float
lon: float
tmax: double
tmin: double
prcp: double
tavg: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 842
2025-03-26 11:14:58,501 - 4090262 - INFO - Schema 'netcdfdemo' not found. Creating...
2025-03-26 11:14:58,512 - 4090262 - INFO - Created schema: netcdfdemo
2025-03-26 11:14:58,522 - 4090262 - INFO - Schema 'netcdfdemo' created successfully.
2025-03-26 11:14:58,529 - 4090262 - INFO - Table 'netcdfdemo.weather' not found. Creating...
2025-03-26 11:14:58,578 - 4090262 - INFO - Created table: weather
2025-03-26 11:14:58,626 - 4090262 - INFO - Table 'netcdfdemo.weather' created successfully.
2025-03-26 11:14:58,638 - 4090262 - INFO - Distributing 1 files across 1 worker processes.
2025-03-26 11:14:58,653 - 4095577 - INFO - Worker started processing batch of 1 files.
2025-03-26 11:14:58,698 - 4095577 - INFO - Successfully connected VAST session to endpoint: http://172.200.203.1:80
2025-03-26 11:14:58,701 - 4095577 - INFO - Processing file: ncdd-202211-grd-scaled.nc
2025-03-26 11:15:11,279 - 4095577 - INFO - NetCDF file 'ncdd-202211-grd-scaled.nc' converted to DataFrame in 12.58s. Shape: (24763800, 7)
2025-03-26 11:15:11,549 - 4095577 - INFO - Converted DataFrame to Arrow Table for ncdd-202211-grd-scaled.nc. Shape: (24763800, 7)
2025-03-26 11:15:11,551 - 4095577 - INFO - Inserting 24763800 rows into table 'netcdfdemo.weather'...
2025-03-26 11:15:28,721 - 4095577 - INFO - 24763800 rows inserted into table 'netcdfdemo.weather' in 17.17s.
2025-03-26 11:15:28,745 - 4095577 - INFO - Worker finished. Processed: 1, Failed: 0
2025-03-26 11:15:28,769 - 4090262 - INFO - --- Ingestion Summary ---
2025-03-26 11:15:28,770 - 4090262 - INFO - Target Bucket: christian-db
2025-03-26 11:15:28,770 - 4090262 - INFO - Target Schema.Table: netcdfdemo.weather
2025-03-26 11:15:28,771 - 4090262 - INFO - Total NetCDF files found: 1
2025-03-26 11:15:28,772 - 4090262 - INFO - Successfully processed files: 1
2025-03-26 11:15:28,772 - 4090262 - INFO - Failed files: 0
2025-03-26 11:15:28,773 - 4090262 - INFO - Total script execution time: 44.84 seconds.
2025-03-26 11:15:28,773 - 4090262 - INFO - -------------------------
Read data from the VAST Database using python API#
import os
import vastdb
import pyarrow as pa
from vastdb import errors as vast_errors
import logging
import pandas as pd
import sys
from datetime import datetime # To get current time for summary
# ibis is needed for creating the predicate (_.prcp == max_val)
try:
from ibis import _
IBIS_AVAILABLE = True
except ImportError:
logging.warning("`ibis-framework` not found. Install it (`pip install ibis-framework`) to filter for max value rows.")
IBIS_AVAILABLE = False
# --- Configuration ---
# VAST Cluster Connection Details from Environment Variables
VAST_ENDPOINT = os.getenv("VASTDB_ENDPOINT")
ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY")
SECRET_KEY = os.getenv("VASTDB_SECRET_KEY")
# Data and Table Details
TARGET_BUCKET = 'christian-db'
TARGET_SCHEMA = 'netcdfdemo'
TARGET_TABLE_NAME = 'weather'
TARGET_COLUMN = 'prcp' # Column to query for min/max
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Validation for Environment Variables ---
# (Keep the validation block from the previous script)
if not VAST_ENDPOINT:
logging.error("Environment variable VASTDB_ENDPOINT is not set.")
sys.exit("Error: VASTDB_ENDPOINT environment variable missing.")
if not ACCESS_KEY:
logging.error("Environment variable VASTDB_ACCESS_KEY is not set.")
sys.exit("Error: VASTDB_ACCESS_KEY environment variable missing.")
if not SECRET_KEY:
logging.error("Environment variable VASTDB_SECRET_KEY is not set.")
sys.exit("Error: VASTDB_SECRET_KEY environment variable missing.")
logging.info(f"Using VAST Endpoint: {VAST_ENDPOINT}")
logging.info(f"Querying Bucket: {TARGET_BUCKET}")
logging.info(f"Querying Schema: {TARGET_SCHEMA}")
logging.info(f"Querying Table: {TARGET_TABLE_NAME}")
logging.info(f"Analyzing Column: {TARGET_COLUMN}")
def get_stats_and_max_rows(session, bucket_name, schema_name, table_name, column_name):
"""
Connects to VAST, finds min/max of a column, and retrieves row(s) with the max value.
Returns: min_val, max_val, record_count, max_rows_df (Pandas DF or None)
"""
min_val = None
max_val = None
record_count = 0
max_rows_df = pd.DataFrame() # Initialize empty DataFrame
if not IBIS_AVAILABLE:
logging.error("Cannot fetch max rows because `ibis-framework` is not installed.")
# Still attempt to get min/max if possible without ibis for filtering
# You might want to return early or handle this differently
# For now, we'll proceed to get min/max but won't query for max rows.
pass
logging.info(f"Starting analysis for '{column_name}' in {schema_name}.{table_name}...")
try:
# Use a single transaction for both steps if possible
with session.transaction() as tx:
try:
table = tx.bucket(bucket_name).schema(schema_name).table(table_name)
logging.info(f"Accessed table {schema_name}.{table_name}")
# --- Step 1: Find Min/Max Value ---
logging.info(f"Executing select for column: {column_name} to find min/max")
reader = table.select(columns=[column_name])
logging.info("Reading data into PyArrow table...")
arrow_table = reader.read_all()
record_count = arrow_table.num_rows
logging.info(f"Read {record_count} records for min/max calculation.")
if record_count > 0:
logging.info("Converting to Pandas DataFrame for min/max...")
df_col = arrow_table.to_pandas()
if column_name in df_col.columns and pd.api.types.is_numeric_dtype(df_col[column_name]):
# Drop NaNs before calculating min/max if they shouldn't be considered
valid_series = df_col[column_name].dropna()
if not valid_series.empty:
min_val = valid_series.min()
max_val = valid_series.max()
logging.info(f"Calculated MIN={min_val}, MAX={max_val}.")
else:
logging.warning(f"Column '{column_name}' contains only NaN/Null values after dropping.")
else:
logging.warning(f"Column '{column_name}' not found or is not numeric in the result.")
del df_col # Free memory
del arrow_table # Free memory
else:
logging.warning("Table appears to be empty or column has no data.")
return min_val, max_val, record_count, max_rows_df # Return early
# --- Step 2: Find Row(s) with Max Value ---
if max_val is not None and IBIS_AVAILABLE:
logging.info(f"Executing select for full rows where {column_name} == {max_val}")
try:
# Create the predicate using ibis placeholder
predicate = (_[column_name] == max_val)
# Select all columns (*) where the predicate matches
max_rows_reader = table.select(predicate=predicate)
logging.info("Reading max value rows into PyArrow table...")
max_rows_arrow_table = max_rows_reader.read_all()
if max_rows_arrow_table.num_rows > 0:
logging.info(f"Found {max_rows_arrow_table.num_rows} row(s) with the max value.")
logging.info("Converting max rows to Pandas DataFrame...")
max_rows_df = max_rows_arrow_table.to_pandas()
else:
# This might happen with floating point inaccuracies, though less likely with ==
logging.warning(f"Could not find rows matching the calculated max value {max_val}. Potentially a float precision issue?")
except vast_errors.InvalidArgument as e:
logging.error(f"Error during filtering query (maybe column name issue?): {e}")
except Exception as e:
logging.error(f"An unexpected error occurred fetching max rows: {e}")
elif max_val is not None and not IBIS_AVAILABLE:
logging.warning("Cannot query for max rows because `ibis-framework` is not installed.")
except vast_errors.MissingSchema:
logging.error(f"Schema '{schema_name}' not found in bucket '{bucket_name}'.")
except vast_errors.MissingTable:
logging.error(f"Table '{table_name}' not found in schema '{schema_name}'.")
except vast_errors.InvalidArgument as e:
if column_name in str(e):
logging.error(f"Column '{column_name}' likely not found in table '{table_name}'. Error: {e}")
else:
logging.error(f"Invalid argument during initial query: {e}")
except vast_errors.HttpError as e:
logging.error(f"VAST HTTP Error during query: {e.status} - {e.message}")
except Exception as e:
logging.error(f"An unexpected error occurred during query processing: {e}")
except vast_errors.ConnectionError as e:
logging.error(f"VAST Connection Error: {e}")
except Exception as e:
logging.error(f"Failed to establish VAST transaction context: {e}")
return min_val, max_val, record_count, max_rows_df
def main():
"""
Main function to connect to VAST and run the analysis.
"""
session = None
start_time = datetime.now()
logging.info(f"Script started at {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
try:
session = vastdb.connect(
endpoint=VAST_ENDPOINT,
access=ACCESS_KEY,
secret=SECRET_KEY,
# verify_ssl=False # Add if needed
)
logging.info("Successfully connected VAST session.")
min_prcp, max_prcp, count, max_rows_dataframe = get_stats_and_max_rows(
session,
TARGET_BUCKET,
TARGET_SCHEMA,
TARGET_TABLE_NAME,
TARGET_COLUMN
)
end_time = datetime.now()
duration = end_time - start_time
# --- Enhanced Summary ---
print("\n" + "="*60)
print(f" VAST Data Query Summary - {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
print(f" Target: {TARGET_BUCKET}.{TARGET_SCHEMA}.{TARGET_TABLE_NAME}")
print(f" Analyzed Column: {TARGET_COLUMN}")
print(f" Query Duration: {duration}")
print("-"*60)
print(f" Statistics for '{TARGET_COLUMN}':")
print(f" Total Records Read (for {TARGET_COLUMN}): {count}")
if min_prcp is not None:
print(f" Minimum Value: {min_prcp}")
else:
print(" Minimum Value: N/A (No valid data found)")
if max_prcp is not None:
print(f" Maximum Value: {max_prcp}")
else:
print(" Maximum Value: N/A (No valid data found)")
print("-"*60)
if not max_rows_dataframe.empty:
print(f" Full Row(s) Containing Maximum '{TARGET_COLUMN}' Value ({max_prcp}):")
# Configure pandas display options if needed (e.g., for many columns)
# pd.set_option('display.max_rows', None)
# pd.set_option('display.max_columns', None)
# pd.set_option('display.width', 1000)
print(max_rows_dataframe.to_string(index=False)) # `to_string` avoids truncation sometimes
elif max_prcp is not None and not IBIS_AVAILABLE:
print(" Row(s) for maximum value could not be retrieved.")
print(" Please install `ibis-framework` (`pip install ibis-framework`).")
elif max_prcp is not None:
print(f" No rows found matching the calculated maximum value ({max_prcp}).")
else:
print(" No maximum value found, cannot display corresponding rows.")
print("="*60 + "\n")
except Exception as e:
logging.error(f"An error occurred in main execution: {e}")
print(f"\nAn error occurred: {e}", file=sys.stderr)
# finally:
# Session object from vastdb.connect doesn't have explicit close in examples
if __name__ == "__main__":
# Ensure necessary libraries are installed:
# pip install vastdb pandas pyarrow ibis-framework
main()
2025-03-26 11:48:49,679 - 4090262 - INFO - Using VAST Endpoint: http://172.200.203.1:80
2025-03-26 11:48:49,681 - 4090262 - INFO - Querying Bucket: christian-db
2025-03-26 11:48:49,682 - 4090262 - INFO - Querying Schema: netcdfdemo
2025-03-26 11:48:49,683 - 4090262 - INFO - Querying Table: weather
2025-03-26 11:48:49,684 - 4090262 - INFO - Analyzing Column: prcp
2025-03-26 11:48:49,686 - 4090262 - INFO - Script started at 2025-03-26 11:48:49
2025-03-26 11:48:49,695 - 4090262 - INFO - Successfully connected VAST session.
2025-03-26 11:48:49,697 - 4090262 - INFO - Starting analysis for 'prcp' in netcdfdemo.weather...
2025-03-26 11:48:49,727 - 4090262 - INFO - Accessed table netcdfdemo.weather
2025-03-26 11:48:49,728 - 4090262 - INFO - Executing select for column: prcp to find min/max
2025-03-26 11:48:49,738 - 4090262 - INFO - Reading data into PyArrow table...
2025-03-26 11:48:50,560 - 4090262 - INFO - Read 24763800 records for min/max calculation.
2025-03-26 11:48:50,561 - 4090262 - INFO - Converting to Pandas DataFrame for min/max...
2025-03-26 11:48:50,942 - 4090262 - INFO - Calculated MIN=0.0, MAX=302.3125.
2025-03-26 11:48:50,944 - 4090262 - INFO - Executing select for full rows where prcp == 302.3125
2025-03-26 11:48:50,957 - 4090262 - INFO - Reading max value rows into PyArrow table...
2025-03-26 11:48:51,046 - 4090262 - INFO - Found 1 row(s) with the max value.
2025-03-26 11:48:51,047 - 4090262 - INFO - Converting max rows to Pandas DataFrame...
============================================================
VAST Data Query Summary - 2025-03-26 11:48:51
============================================================
Target: christian-db.netcdfdemo.weather
Analyzed Column: prcp
Query Duration: 0:00:01.368626
------------------------------------------------------------
Statistics for 'prcp':
Total Records Read (for prcp): 24763800
Minimum Value: 0.0
Maximum Value: 302.3125
------------------------------------------------------------
Full Row(s) Containing Maximum 'prcp' Value (302.3125):
time lat lon tmax tmin prcp tavg
2022-11-05 46.854168 -121.770836 -4.828125 -9.726562 302.3125 -7.28125
============================================================
Python API Examples and API Documentation links#
Python SDK Examples#
Python SDK Documentation#
VAST DB Field Engineering Documentation#
https://vast-data.github.io/data-platform-field-docs/intro.html