{ "cells": [ { "cell_type": "markdown", "id": "87e6e872-be46-45b2-8185-6159214f0dc0", "metadata": {}, "source": [ "# Python SDK - import CSV" ] }, { "cell_type": "code", "execution_count": 1, "id": "6c6ab7f7-86d7-45e0-850e-66b0274e4de1", "metadata": {}, "outputs": [], "source": [ "! pip3 install --quiet --upgrade vastdb numpy pyarrow pandas" ] }, { "cell_type": "markdown", "id": "7c93b098-47ea-4278-aaf7-4a128a15b0da", "metadata": {}, "source": [ "## Define functions" ] }, { "cell_type": "code", "execution_count": 2, "id": "6b498ddf-9c14-40bc-94f4-687f355a9ea4", "metadata": {}, "outputs": [], "source": [ "import io\n", "import os\n", "import pyarrow as pa\n", "from pyarrow import csv as pa_csv\n", "import numpy as np\n", "import pandas as pd\n", "import vastdb\n", "\n", "def read_csv(file_path):\n", " \"\"\"Reads CSV data from a file.\"\"\"\n", " try:\n", " return pa_csv.read_csv(file_path)\n", " except Exception as e:\n", " raise RuntimeError(f\"Error reading CSV file: {e}\") from e\n", "\n", "def connect_to_vastdb(endpoint, access_key, secret_key):\n", " \"\"\"Connects to VastDB.\"\"\"\n", " try:\n", " session = vastdb.connect(endpoint=endpoint, access=access_key, secret=secret_key)\n", " print(\"Connected to VastDB\")\n", " return session\n", " except Exception as e:\n", " raise RuntimeError(f\"Failed to connect to VastDB: {e}\") from e\n", "\n", "def write_to_vastdb(session, bucket_name, schema_name, table_name, pa_table):\n", " \"\"\"Writes data to VastDB.\"\"\"\n", " with session.transaction() as tx:\n", " bucket = tx.bucket(bucket_name)\n", " schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)\n", "\n", " table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)\n", "\n", " columns_to_add = get_columns_to_add(table.arrow_schema, pa_table.schema)\n", " for column in columns_to_add:\n", " table.add_column(column)\n", "\n", " table.insert(pa_table)\n", "\n", "def get_columns_to_add(existing_schema, desired_schema):\n", " \"\"\"Identifies columns to add to an existing schema.\"\"\"\n", " existing_fields = set(existing_schema.names)\n", " desired_fields = set(desired_schema.names)\n", " return [pa.schema([pa.field(name, desired_schema.field(name).type)]) for name in desired_fields - existing_fields]\n", "\n", "\n", "def query_vastdb(session, bucket_name, schema_name, table_name):\n", " \"\"\"Writes data to VastDB.\"\"\"\n", " with session.transaction() as tx:\n", " bucket = tx.bucket(bucket_name)\n", " schema = bucket.schema(schema_name, fail_if_missing=False) or bucket.create_schema(schema_name)\n", " table = schema.table(table_name, fail_if_missing=False) or schema.create_table(table_name, pa_table.schema)\n", "\n", " return table.select().read_all()\n" ] }, { "cell_type": "markdown", "id": "c7685f00-eb33-4f75-a99a-08f94573903d", "metadata": {}, "source": [ "## CSV Example" ] }, { "cell_type": "code", "execution_count": 3, "id": "9148c4e2-da32-4464-a46a-deda18c97c7b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected to VastDB\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idtstampv1v2
012014-01-01 00:15:002.017.4
112014-01-01 00:30:001.920.2
212014-01-01 00:45:001.819.7
\n", "
" ], "text/plain": [ " id tstamp v1 v2\n", "0 1 2014-01-01 00:15:00 2.0 17.4\n", "1 1 2014-01-01 00:30:00 1.9 20.2\n", "2 1 2014-01-01 00:45:00 1.8 19.7" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "VASTDB_ENDPOINT = os.getenv(\"VASTDB_ENDPOINT\")\n", "VASTDB_ACCESS_KEY = os.getenv(\"VASTDB_ACCESS_KEY\")\n", "VASTDB_SECRET_KEY = os.getenv(\"VASTDB_SECRET_KEY\")\n", "\n", "VASTDB_TWITTER_INGEST_BUCKET = os.getenv(\"VASTDB_TWITTER_INGEST_BUCKET\")\n", "VASTDB_TWITTER_INGEST_SCHEMA = os.getenv(\"VASTDB_TWITTER_INGEST_SCHEMA\")\n", "VASTDB_TWITTER_INGEST_TABLE = 'csv_import'\n", "\n", "FILE_PATH = \"example.csv\"\n", "\n", "csv_data = \"\"\"\n", "id,tstamp,v1,v2\n", "1,2014-01-01 00:15:00.00000,2,17.4\n", "1,2014-01-01 00:30:00.00000,1.9,20.2\n", "1,2014-01-01 00:45:00.00000,1.8,19.7\n", "\"\"\"\n", "\n", "with open(FILE_PATH, 'w') as f:\n", " f.write(csv_data)\n", "\n", "pa_table = read_csv(FILE_PATH)\n", "\n", "\n", "# Connect to VastDB\n", "session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)\n", "\n", "# Write data to VastDB\n", "write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)\n", "\n", "# Query data\n", "pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)\n", "pa_table.to_pandas()\n" ] }, { "cell_type": "markdown", "id": "c605dcef-fecd-4aa9-9eaa-4fdf4972323e", "metadata": {}, "source": [ "### Schema evolution" ] }, { "cell_type": "code", "execution_count": 4, "id": "b795a2a1-47ba-436c-907d-3f94ae4c74eb", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected to VastDB\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idtstampv1v2v3
012014-01-01 00:15:002.017.4NaN
112014-01-01 00:30:001.920.2NaN
212014-01-01 00:45:001.819.7NaN
312014-01-01 00:15:002.017.41.0
412014-01-01 00:30:001.920.22.0
512014-01-01 00:45:001.819.73.0
\n", "
" ], "text/plain": [ " id tstamp v1 v2 v3\n", "0 1 2014-01-01 00:15:00 2.0 17.4 NaN\n", "1 1 2014-01-01 00:30:00 1.9 20.2 NaN\n", "2 1 2014-01-01 00:45:00 1.8 19.7 NaN\n", "3 1 2014-01-01 00:15:00 2.0 17.4 1.0\n", "4 1 2014-01-01 00:30:00 1.9 20.2 2.0\n", "5 1 2014-01-01 00:45:00 1.8 19.7 3.0" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "FILE_PATH = \"example2.csv\"\n", "\n", "csv_data = \"\"\"\n", "id,tstamp,v1,v2,v3\n", "1,2014-01-01 00:15:00.00000,2,17.4,1\n", "1,2014-01-01 00:30:00.00000,1.9,20.2,2\n", "1,2014-01-01 00:45:00.00000,1.8,19.7,3\n", "\"\"\"\n", "\n", "with open(FILE_PATH, 'w') as f:\n", " f.write(csv_data)\n", "\n", "pa_table = read_csv(FILE_PATH)\n", "\n", "\n", "# Connect to VastDB\n", "session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY)\n", "\n", "# Write data to VastDB\n", "write_to_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE, pa_table)\n", "\n", "# Query data\n", "pa_table = query_vastdb(session, VASTDB_TWITTER_INGEST_BUCKET, VASTDB_TWITTER_INGEST_SCHEMA, VASTDB_TWITTER_INGEST_TABLE)\n", "pa_table.to_pandas()" ] }, { "cell_type": "code", "execution_count": null, "id": "3c1bb6ad-def2-45e0-9e1d-7de6298883cf", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 5 }