Python SDK - import Grib2#

Download sample data#

! wget -c https://dd.weather.gc.ca/model_gem_global/15km/grib2/lat_lon/00/000/CMC_glb_ABSV_ISBL_200_latlon.15x.15_2024121100_P000.grib2
--2024-12-11 23:31:45--  https://dd.weather.gc.ca/model_gem_global/15km/grib2/lat_lon/00/000/CMC_glb_ABSV_ISBL_200_latlon.15x.15_2024121100_P000.grib2
Resolving dd.weather.gc.ca (dd.weather.gc.ca)... 205.189.10.47
Connecting to dd.weather.gc.ca (dd.weather.gc.ca)|205.189.10.47|:443... connected.
HTTP request sent, awaiting response... 416 Requested Range Not Satisfiable

    The file is already fully retrieved; nothing to do.

Convert to Parquet#

! pip install --quiet cfgrib xarray pandas pyarrow
import cfgrib
import xarray as xr
import pandas as pd

# Path to the GRIB2 file
grib2_file = "CMC_glb_ABSV_ISBL_200_latlon.15x.15_2024121100_P000.grib2"

# Read GRIB2 file using xarray and cfgrib
ds = xr.open_dataset(grib2_file, engine="cfgrib")

# Convert to a Pandas DataFrame
df = ds.to_dataframe().reset_index()

# Save DataFrame to Parquet
parquet_file = "grib2.parquet"
df.to_parquet(parquet_file, engine="pyarrow", index=False)

print(f"Saved to {parquet_file}")
Saved to grib2.parquet

Check for fields incompatible with Vast DB#

See: https://vast-data.github.io/data-platform-field-docs/vast_database/sdk_ref/verify_parquet.html

! pip install --upgrade --quiet git+https://github.com/snowch/vastdb_parq_schema_file.git --use-pep517
! parquet_checker grib2.parquet
Parquet schema:
latitude: double
longitude: double
time: timestamp[ns]
step: duration[ns]
isobaricInhPa: double
valid_time: timestamp[ns]
absv: float
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 909
Checking column types...
Column 'time' has a non-matching type: TIMESTAMP[NS]
Column 'step' has a non-matching type: DURATION[NS]
Column 'valid_time' has a non-matching type: TIMESTAMP[NS]
Column type check complete.

TIMESTAMP[NS] should be supported, but DURATION[NS] not - vast-data/vastdb_sdk

Convert incompatible fields#

Convert to string for now

import pyarrow as pa
import pyarrow.parquet as pq

def convert_parquet_fields_to_string(input_file, output_file, chunksize=1024):
    """
    Convert DURATION[NS] fields to strings in a Parquet file.

    Args:
        input_file (str): Path to the input Parquet file.
        output_file (str): Path to the output Parquet file.
        chunksize (int): Number of rows per batch to process.
    """
    print(f"Converting DURATION[NS] to string in {input_file}")

    # Open the input Parquet file
    parquet_file_obj = pq.ParquetFile(input_file)
    schema = parquet_file_obj.schema_arrow

    # Modify schema to change relevant fields to string
    new_fields = [
        pa.field(
            field.name,
            pa.string() if field.type == pa.duration("ns") else field.type
            # pa.string() if field.type in (pa.timestamp("ns"), pa.duration("ns")) else field.type
        )
        for field in schema
    ]
    new_schema = pa.schema(new_fields)

    # Open a ParquetWriter for the output file
    with pq.ParquetWriter(output_file, new_schema) as writer:
        # Process the file in chunks
        for batch in parquet_file_obj.iter_batches(batch_size=chunksize):
            table = pa.Table.from_batches([batch], schema=schema)

            # Cast DURATION[NS] columns to string
            for col_name in table.column_names:
                # if table.schema.field(col_name).type in (pa.timestamp("ns"), pa.duration("ns")):
                if table.schema.field(col_name).type == pa.duration("ns"):
                    table = table.set_column(
                        table.schema.get_field_index(col_name),
                        col_name,
                        table[col_name].cast(pa.string())
                    )

            # Write the modified table to the output file
            writer.write_table(table)

    print(f"Converted file saved to {output_file}")
convert_parquet_fields_to_string('grib2.parquet', 'vastdb_compat.parquet', chunksize=1024)
Converting DURATION[NS] to string in grib2.parquet
Converted file saved to vastdb_compat.parquet

Re-check for incompatible fields#

! parquet_checker vastdb_compat.parquet
Parquet schema:
latitude: double
longitude: double
time: timestamp[ns]
step: string
isobaricInhPa: double
valid_time: timestamp[ns]
absv: float
Checking column types...
Column 'time' has a non-matching type: TIMESTAMP[NS]
Column 'valid_time' has a non-matching type: TIMESTAMP[NS]
Column type check complete.

Read parquet and write to Vast DB#

Code is from https://vast-data.github.io/data-platform-field-docs/vast_database/ingestion/python_sdk_parquet_import.html

! pip install --quiet --upgrade vastdb numpy pyarrow pandas
import io
import os
import pyarrow as pa
from pyarrow import csv as pa_csv
import pyarrow.parquet as pq
from io import StringIO
import numpy as np
import pandas as pd
import vastdb
from vastdb.config import QueryConfig

def read_parquet(file_path):
    """Reads Parquet data from a file."""
    try:
        return pq.read_table(file_path)
    except Exception as e:
        raise RuntimeError(f"Error reading Parquet file: {e}") from e

def connect_to_vastdb(endpoint, access_key, secret_key):
    """Connects to VastDB."""
    try:
        session = vastdb.connect(endpoint=endpoint, access=access_key, secret=secret_key)
        print("Connected to VastDB")
        return session
    except Exception as e:
        raise RuntimeError(f"Failed to connect to VastDB: {e}") from e

def write_to_vastdb(session, bucket_name, schema_name, table_name, pa_table):
    """Writes data to VastDB."""
    with session.transaction() as tx:
        bucket = tx.bucket(bucket_name)
        schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)

        table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)

        columns_to_add = get_columns_to_add(table.arrow_schema, pa_table.schema)
        for column in columns_to_add:
            table.add_column(column)

        table.insert(pa_table)

def get_columns_to_add(existing_schema, desired_schema):
    """Identifies columns to add to an existing schema."""
    existing_fields = set(existing_schema.names)
    desired_fields = set(desired_schema.names)
    return [pa.schema([pa.field(name, desired_schema.field(name).type)]) for name in desired_fields - existing_fields]


def query_vastdb(session, bucket_name, schema_name, table_name, limit=None):
    """Writes data to VastDB."""
    with session.transaction() as tx:
        bucket = tx.bucket(bucket_name)
        schema = bucket.schema(schema_name, fail_if_missing=True)
        table = schema.table(table_name, fail_if_missing=True)

        if limit:
            # See: https://vast-data.github.io/data-platform-field-docs/vast_database/sdk_ref/limit_n.html
            config = QueryConfig(
                num_splits=1,                	  # Manually specify 1 split
                num_sub_splits=1,                 # Each split will be divided into 1 subsplits
                limit_rows_per_sub_split=limit,   # Each subsplit will process 10 rows at a time
            )
            batches = table.select(config=config)
            first_batch = next(batches)
            return first_batch.to_pandas()
        else:
            return table.select().read_all().to_pandas()

def drop_vastdb_table(session, bucket_name, schema_name, table_name):
    """Writes data to VastDB."""
    with session.transaction() as tx:
        bucket = tx.bucket(bucket_name)
        schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)

        table = schema.table(table_name, fail_if_missing=False)
        if table:
            table.drop()
import os

VASTDB_ENDPOINT = os.getenv("VASTDB_ENDPOINT")
VASTDB_ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY")
VASTDB_SECRET_KEY = os.getenv("VASTDB_SECRET_KEY")

# Use NYT BUCKET (DB) for now
VASTDB_NYT_BUCKET=os.getenv("VASTDB_NYT_BUCKET")
schema_name = 'grib2'
table_name = 'grib2'

Write to Vast DB#

session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)
Connected to VastDB
pa_table = read_parquet('vastdb_compat.parquet')

with session.transaction() as tx:
    bucket = tx.bucket(VASTDB_NYT_BUCKET)
    schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
    table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)
    if table:
        table.drop()

write_to_vastdb(session, VASTDB_NYT_BUCKET, schema_name, table_name, pa_table)

Query grib2 data in Vast DB#

with session.transaction() as tx:
    bucket = tx.bucket(VASTDB_NYT_BUCKET)
    schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
    table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)

    df = table.select().read_all().to_pandas()

df
latitude longitude time step isobaricInhPa valid_time absv
0 -57.3 -16.80 2024-12-11 0 200.0 2024-12-11 -0.000118
1 -57.3 -16.65 2024-12-11 0 200.0 2024-12-11 -0.000119
2 -57.3 -16.50 2024-12-11 0 200.0 2024-12-11 -0.000119
3 -57.3 -16.35 2024-12-11 0 200.0 2024-12-11 -0.000119
4 -57.3 -16.20 2024-12-11 0 200.0 2024-12-11 -0.000118
... ... ... ... ... ... ... ...
2882395 73.8 -84.75 2024-12-11 0 200.0 2024-12-11 0.000173
2882396 73.8 -84.60 2024-12-11 0 200.0 2024-12-11 0.000174
2882397 73.8 -84.45 2024-12-11 0 200.0 2024-12-11 0.000174
2882398 73.8 -84.30 2024-12-11 0 200.0 2024-12-11 0.000174
2882399 73.8 -84.15 2024-12-11 0 200.0 2024-12-11 0.000173

2882400 rows × 7 columns

Statistical Summary#

df.describe()
latitude longitude time isobaricInhPa valid_time absv
count 2.882400e+06 2.882400e+06 2882400 2882400.0 2882400 2.882400e+06
mean 5.627306e-13 -7.500000e-02 2024-12-11 00:00:00 200.0 2024-12-11 00:00:00 -1.044999e-06
min -9.000000e+01 -1.800000e+02 2024-12-11 00:00:00 200.0 2024-12-11 00:00:00 -5.594596e-04
25% -4.500000e+01 -9.003750e+01 2024-12-11 00:00:00 200.0 2024-12-11 00:00:00 -1.144595e-04
50% 5.553891e-13 -7.500000e-02 2024-12-11 00:00:00 200.0 2024-12-11 00:00:00 -4.595337e-07
75% 4.500000e+01 8.988750e+01 2024-12-11 00:00:00 200.0 2024-12-11 00:00:00 1.150405e-04
max 9.000000e+01 1.798500e+02 2024-12-11 00:00:00 200.0 2024-12-11 00:00:00 5.095405e-04
std 5.200482e+01 1.039231e+02 NaN 0.0 NaN 1.168390e-04
list(df['step'].drop_duplicates())
['0']

‘step’ appears to be convertible to a integer field type?