Python SDK - import Grib2#
Download sample data#
! wget -c https://dd.weather.gc.ca/model_gem_global/15km/grib2/lat_lon/00/000/CMC_glb_ABSV_ISBL_200_latlon.15x.15_2024121100_P000.grib2
--2024-12-11 23:31:45-- https://dd.weather.gc.ca/model_gem_global/15km/grib2/lat_lon/00/000/CMC_glb_ABSV_ISBL_200_latlon.15x.15_2024121100_P000.grib2
Resolving dd.weather.gc.ca (dd.weather.gc.ca)... 205.189.10.47
Connecting to dd.weather.gc.ca (dd.weather.gc.ca)|205.189.10.47|:443... connected.
HTTP request sent, awaiting response... 416 Requested Range Not Satisfiable
The file is already fully retrieved; nothing to do.
Convert to Parquet#
! pip install --quiet cfgrib xarray pandas pyarrow
import cfgrib
import xarray as xr
import pandas as pd
# Path to the GRIB2 file
grib2_file = "CMC_glb_ABSV_ISBL_200_latlon.15x.15_2024121100_P000.grib2"
# Read GRIB2 file using xarray and cfgrib
ds = xr.open_dataset(grib2_file, engine="cfgrib")
# Convert to a Pandas DataFrame
df = ds.to_dataframe().reset_index()
# Save DataFrame to Parquet
parquet_file = "grib2.parquet"
df.to_parquet(parquet_file, engine="pyarrow", index=False)
print(f"Saved to {parquet_file}")
Saved to grib2.parquet
Check for fields incompatible with Vast DB#
See: https://vast-data.github.io/data-platform-field-docs/vast_database/sdk_ref/verify_parquet.html
! pip install --upgrade --quiet git+https://github.com/snowch/vastdb_parq_schema_file.git --use-pep517
! parquet_checker grib2.parquet
Parquet schema:
latitude: double
longitude: double
time: timestamp[ns]
step: duration[ns]
isobaricInhPa: double
valid_time: timestamp[ns]
absv: float
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 909
Checking column types...
Column 'time' has a non-matching type: TIMESTAMP[NS]
Column 'step' has a non-matching type: DURATION[NS]
Column 'valid_time' has a non-matching type: TIMESTAMP[NS]
Column type check complete.
TIMESTAMP[NS] should be supported, but DURATION[NS] not - vast-data/vastdb_sdk
Convert incompatible fields#
Convert to string for now
import pyarrow as pa
import pyarrow.parquet as pq
def convert_parquet_fields_to_string(input_file, output_file, chunksize=1024):
"""
Convert DURATION[NS] fields to strings in a Parquet file.
Args:
input_file (str): Path to the input Parquet file.
output_file (str): Path to the output Parquet file.
chunksize (int): Number of rows per batch to process.
"""
print(f"Converting DURATION[NS] to string in {input_file}")
# Open the input Parquet file
parquet_file_obj = pq.ParquetFile(input_file)
schema = parquet_file_obj.schema_arrow
# Modify schema to change relevant fields to string
new_fields = [
pa.field(
field.name,
pa.string() if field.type == pa.duration("ns") else field.type
# pa.string() if field.type in (pa.timestamp("ns"), pa.duration("ns")) else field.type
)
for field in schema
]
new_schema = pa.schema(new_fields)
# Open a ParquetWriter for the output file
with pq.ParquetWriter(output_file, new_schema) as writer:
# Process the file in chunks
for batch in parquet_file_obj.iter_batches(batch_size=chunksize):
table = pa.Table.from_batches([batch], schema=schema)
# Cast DURATION[NS] columns to string
for col_name in table.column_names:
# if table.schema.field(col_name).type in (pa.timestamp("ns"), pa.duration("ns")):
if table.schema.field(col_name).type == pa.duration("ns"):
table = table.set_column(
table.schema.get_field_index(col_name),
col_name,
table[col_name].cast(pa.string())
)
# Write the modified table to the output file
writer.write_table(table)
print(f"Converted file saved to {output_file}")
convert_parquet_fields_to_string('grib2.parquet', 'vastdb_compat.parquet', chunksize=1024)
Converting DURATION[NS] to string in grib2.parquet
Converted file saved to vastdb_compat.parquet
Re-check for incompatible fields#
! parquet_checker vastdb_compat.parquet
Parquet schema:
latitude: double
longitude: double
time: timestamp[ns]
step: string
isobaricInhPa: double
valid_time: timestamp[ns]
absv: float
Checking column types...
Column 'time' has a non-matching type: TIMESTAMP[NS]
Column 'valid_time' has a non-matching type: TIMESTAMP[NS]
Column type check complete.
Read parquet and write to Vast DB#
Code is from https://vast-data.github.io/data-platform-field-docs/vast_database/ingestion/python_sdk_parquet_import.html
! pip install --quiet --upgrade vastdb numpy pyarrow pandas
import io
import os
import pyarrow as pa
from pyarrow import csv as pa_csv
import pyarrow.parquet as pq
from io import StringIO
import numpy as np
import pandas as pd
import vastdb
from vastdb.config import QueryConfig
def read_parquet(file_path):
"""Reads Parquet data from a file."""
try:
return pq.read_table(file_path)
except Exception as e:
raise RuntimeError(f"Error reading Parquet file: {e}") from e
def connect_to_vastdb(endpoint, access_key, secret_key):
"""Connects to VastDB."""
try:
session = vastdb.connect(endpoint=endpoint, access=access_key, secret=secret_key)
print("Connected to VastDB")
return session
except Exception as e:
raise RuntimeError(f"Failed to connect to VastDB: {e}") from e
def write_to_vastdb(session, bucket_name, schema_name, table_name, pa_table):
"""Writes data to VastDB."""
with session.transaction() as tx:
bucket = tx.bucket(bucket_name)
schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)
columns_to_add = get_columns_to_add(table.arrow_schema, pa_table.schema)
for column in columns_to_add:
table.add_column(column)
table.insert(pa_table)
def get_columns_to_add(existing_schema, desired_schema):
"""Identifies columns to add to an existing schema."""
existing_fields = set(existing_schema.names)
desired_fields = set(desired_schema.names)
return [pa.schema([pa.field(name, desired_schema.field(name).type)]) for name in desired_fields - existing_fields]
def query_vastdb(session, bucket_name, schema_name, table_name, limit=None):
"""Writes data to VastDB."""
with session.transaction() as tx:
bucket = tx.bucket(bucket_name)
schema = bucket.schema(schema_name, fail_if_missing=True)
table = schema.table(table_name, fail_if_missing=True)
if limit:
# See: https://vast-data.github.io/data-platform-field-docs/vast_database/sdk_ref/limit_n.html
config = QueryConfig(
num_splits=1, # Manually specify 1 split
num_sub_splits=1, # Each split will be divided into 1 subsplits
limit_rows_per_sub_split=limit, # Each subsplit will process 10 rows at a time
)
batches = table.select(config=config)
first_batch = next(batches)
return first_batch.to_pandas()
else:
return table.select().read_all().to_pandas()
def drop_vastdb_table(session, bucket_name, schema_name, table_name):
"""Writes data to VastDB."""
with session.transaction() as tx:
bucket = tx.bucket(bucket_name)
schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
table = schema.table(table_name, fail_if_missing=False)
if table:
table.drop()
import os
VASTDB_ENDPOINT = os.getenv("VASTDB_ENDPOINT")
VASTDB_ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY")
VASTDB_SECRET_KEY = os.getenv("VASTDB_SECRET_KEY")
# Use NYT BUCKET (DB) for now
VASTDB_NYT_BUCKET=os.getenv("VASTDB_NYT_BUCKET")
schema_name = 'grib2'
table_name = 'grib2'
Write to Vast DB#
session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)
Connected to VastDB
pa_table = read_parquet('vastdb_compat.parquet')
with session.transaction() as tx:
bucket = tx.bucket(VASTDB_NYT_BUCKET)
schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)
if table:
table.drop()
write_to_vastdb(session, VASTDB_NYT_BUCKET, schema_name, table_name, pa_table)
Query grib2 data in Vast DB#
with session.transaction() as tx:
bucket = tx.bucket(VASTDB_NYT_BUCKET)
schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)
table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)
df = table.select().read_all().to_pandas()
df
latitude | longitude | time | step | isobaricInhPa | valid_time | absv | |
---|---|---|---|---|---|---|---|
0 | -57.3 | -16.80 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | -0.000118 |
1 | -57.3 | -16.65 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | -0.000119 |
2 | -57.3 | -16.50 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | -0.000119 |
3 | -57.3 | -16.35 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | -0.000119 |
4 | -57.3 | -16.20 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | -0.000118 |
... | ... | ... | ... | ... | ... | ... | ... |
2882395 | 73.8 | -84.75 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | 0.000173 |
2882396 | 73.8 | -84.60 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | 0.000174 |
2882397 | 73.8 | -84.45 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | 0.000174 |
2882398 | 73.8 | -84.30 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | 0.000174 |
2882399 | 73.8 | -84.15 | 2024-12-11 | 0 | 200.0 | 2024-12-11 | 0.000173 |
2882400 rows × 7 columns
Statistical Summary#
df.describe()
latitude | longitude | time | isobaricInhPa | valid_time | absv | |
---|---|---|---|---|---|---|
count | 2.882400e+06 | 2.882400e+06 | 2882400 | 2882400.0 | 2882400 | 2.882400e+06 |
mean | 5.627306e-13 | -7.500000e-02 | 2024-12-11 00:00:00 | 200.0 | 2024-12-11 00:00:00 | -1.044999e-06 |
min | -9.000000e+01 | -1.800000e+02 | 2024-12-11 00:00:00 | 200.0 | 2024-12-11 00:00:00 | -5.594596e-04 |
25% | -4.500000e+01 | -9.003750e+01 | 2024-12-11 00:00:00 | 200.0 | 2024-12-11 00:00:00 | -1.144595e-04 |
50% | 5.553891e-13 | -7.500000e-02 | 2024-12-11 00:00:00 | 200.0 | 2024-12-11 00:00:00 | -4.595337e-07 |
75% | 4.500000e+01 | 8.988750e+01 | 2024-12-11 00:00:00 | 200.0 | 2024-12-11 00:00:00 | 1.150405e-04 |
max | 9.000000e+01 | 1.798500e+02 | 2024-12-11 00:00:00 | 200.0 | 2024-12-11 00:00:00 | 5.095405e-04 |
std | 5.200482e+01 | 1.039231e+02 | NaN | 0.0 | NaN | 1.168390e-04 |
list(df['step'].drop_duplicates())
['0']
‘step’ appears to be convertible to a integer field type?