Manipulating Data#
See also
The Vast DB SDK API Documentation is available here.
Important
This notebook is in the process of being migrated to Vast Data Platform Field Docs. It will probably not run yet.
CRUD operations in VAST-DB are designed to be low-latency to the user while improving the overall performance of the system by implementing background operations in such a way that unnecessary I/O is minimized or eliminated.
insert
#
Row inserts into VAST-DB first occur into storage-class memory as discrete operations. While some combining of rows may occur when inserts are done very close together in time, it’s easier to think of a single SQL insert command as resulting in data chunks written to SCM.
This insert occurs according to isolation rules: clients that are part of the insert transaction are exposed to the new data immediately. Clients that begin a transaction after the insert transaction is committed are then exposed to the data. As inserts and other ingest transactions occur, asynchronous tasks will trigger, combining all of these discrete objects in SCM, and writing them to read-intensive NVMe along with updates to the associated metadata structures upstream of it.
Row inserts into VAST-DB first occur into storage-class memory as discrete operations. While some combining of rows may occur when inserts are done very close together in time, it’s easier to think of a single SQL insert command as resulting in data chunks written to SCM.
Usage: Inserts a RecordBatch into this table.
Parameters:
rows
(pa.RecordBatch): The record batch to be inserted into the table.
Raises:
errors.NotSupportedCommand
: If the operation is not supported on the current table.errors.TooWideRow
: If the row to be inserted is too wide.
Returns:
pa.ChunkedArray
: An array of internal row IDs of the inserted rows, if the feature is supported by the server.None
: If the feature of returning row IDs is not supported by the server.
Note: If a row is too wide to be inserted, the method falls back to inserting in column batches.
Show table#
def list_rows():
print(f"Listing rows in: Database='{DATABASE_NAME}' Schema='{DATABASE_SCHEMA}' Table='{TABLE_NAME}'")
with session.transaction() as tx:
try:
schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
if schema:
try:
table = schema.table(name=TABLE_NAME)
reader = table.select()
pyarrow_table = pa.Table.from_batches(reader)
df = pyarrow_table.to_pandas()
display(df)
except Exception as e:
print("Couldn't select data:", e)
except Exception as e:
print("Schema doesn't exist:", e)
list_rows()
Listing rows in: Database='demo-database' Schema='python-sdk-schema' Table='pythonsdkcitizen'
Citizen_Age | Citizen_Name | Citizen_experience | Is_married | |
---|---|---|---|---|
0 | 45 | Alice | 25.5 | True |
1 | 38 | Bob | 17.9 | False |
2 | 27 | Koko | 5.3 | False |
3 | 51 | Menny | 28.2 | True |
Insert the Record#
ROWS = {
'Citizen_Name': ['Alice','Bob'], 'Citizen_Age': [25,24]
}
PA_RECORD_BATCH = pa.RecordBatch.from_pydict(ROWS)
with session.transaction() as tx:
try:
schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
if schema:
try:
table = schema.table(name=TABLE_NAME)
table.insert(PA_RECORD_BATCH)
print("Data inserted.")
except Exception as e:
print("Couldn't insert data:", e)
except Exception as e:
print("Schema doesn't exist:", e)
Data inserted.
Show that the record was inserted#
list_rows()
Listing rows in: Database='demo-database' Schema='python-sdk-schema' Table='pythonsdkcitizen'
Citizen_Age | Citizen_Name | Citizen_experience | Is_married | |
---|---|---|---|---|
0 | 45 | Alice | 25.5 | True |
1 | 38 | Bob | 17.9 | False |
2 | 27 | Koko | 5.3 | False |
3 | 51 | Menny | 28.2 | True |
4 | 25 | Alice | NaN | None |
5 | 24 | Bob | NaN | None |
update
#
An update to a row results in new data written to SCM where metadata objects associated with the table are updated to link the newly written data in favor of the old (according to transaction isolation rules). Asynchronous tasks then combine all of the updates and rewrite data chunks in main storage as necessary to house updated data.
An additional async task (called snap delete) will unlink chunks if they are part of an existing snapshot or if they are needed inside or outside of an ongoing transaction.
Usage: Updates a subset of cells in this table.
Parameters:
rows
(Union[pa.RecordBatch, pa.Table]): The rows to be updated. Must include a special field named$row_id
of uint64 type.columns
(Optional[List[str]]): The subset of columns to be updated. If None, all columns are updated.
Raises:
errors.NotSupportedCommand
: If the operation is not supported on the current table.errors.MissingRowIdColumn
: If the$row_id
field is missing from the rows.
Note:
This function assumes that the
$row_id
field is part of the input rows. The$row_id
field is used to specify the row IDs to be updated.
list the table#
list_rows()
Listing rows in: Database='demo-database' Schema='python-sdk-schema' Table='pythonsdkcitizen'
Citizen_Age | Citizen_Name | Citizen_experience | Is_married | |
---|---|---|---|---|
0 | 45 | Alice | 25.5 | True |
1 | 38 | Bob | 17.9 | False |
2 | 27 | Koko | 5.3 | False |
3 | 51 | Menny | 28.2 | True |
4 | 25 | Alice | NaN | None |
5 | 24 | Bob | NaN | None |
Update the Citizen_experience and Is_married status for Alice & Bob#
# Define the fields and their types
FIELDS = [
("$row_id", pa.uint64()),
("Citizen_experience", pa.float64()),
("Is_married", pa.bool_())
]
# Define the values for each field
VALUES = [
[4, 5], # values for "$row_id"
[43, 28], # values for "Citizen_experience"
[False, True] # values for "Is_married"
]
# Create a record batch
RECORD_BATCH = pa.record_batch(VALUES, schema=pa.schema(FIELDS))
with session.transaction() as tx:
try:
schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
if schema:
try:
table = schema.table(name=TABLE_NAME)
# Update the table
table.update(RECORD_BATCH)
print("Data updated.")
except Exception as e:
print("Couldn't insert data:", e)
except Exception as e:
print("Schema doesn't exist:", e)
Data updated.
validate the updates for Alice & Bob#
list_rows()
Listing rows in: Database='demo-database' Schema='python-sdk-schema' Table='pythonsdkcitizen'
Citizen_Age | Citizen_Name | Citizen_experience | Is_married | |
---|---|---|---|---|
0 | 45 | Alice | 25.5 | True |
1 | 38 | Bob | 17.9 | False |
2 | 27 | Koko | 5.3 | False |
3 | 51 | Menny | 28.2 | True |
4 | 25 | Alice | 43.0 | False |
5 | 24 | Bob | 28.0 | True |
delete
#
Row-level deletes work similarly to updates. Table metadata is updated, logically removing the row (tombstoning it). Eventually asynchronous processes will then relocate (rewrite and unlink) data as necessary to reclaim space in main storage - taking care to account for ongoing transactions and snapshots.
Usage: Deletes a subset of rows in this table.
Parameters:
rows
(Union[pa.RecordBatch, pa.Table]): The rows to be deleted. Must include a special field named$row_id
of uint64 type.
Raises:
errors.NotSupportedCommand
: If the operation is not supported on the current table.errors.MissingRowIdColumn
: If the$row_id
field is missing from the rows.
Note:
This function assumes that the
$row_id
field is part of the input rows. The$row_id
field is used to specify the row IDs to be deleted.
List the rows in the table#
list_rows()
Listing rows in: Database='demo-database' Schema='python-sdk-schema' Table='pythonsdkcitizen'
Citizen_Age | Citizen_Name | Citizen_experience | Is_married | |
---|---|---|---|---|
0 | 45 | Alice | 25.5 | True |
1 | 38 | Bob | 17.9 | False |
2 | 27 | Koko | 5.3 | False |
3 | 51 | Menny | 28.2 | True |
4 | 25 | Alice | 43.0 | False |
5 | 24 | Bob | 28.0 | True |
Delete the Row for Bob#
PREDICATE = (_.Citizen_Name == 'Bob') & (_.Is_married == True)
COLUMNS = ['Citizen_Age', 'Citizen_Name', 'Citizen_experience']
with session.transaction() as tx:
try:
schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
if schema:
try:
table = schema.table(name=TABLE_NAME)
# IMPORTANT: internal_row_id=True
reader = table.select(columns=COLUMNS, predicate=PREDICATE, internal_row_id=True)
pyarrow_table = reader.read_all()
if pyarrow_table.num_rows == 0:
print(f"No records found.")
else:
print("Records to delete (note we ran select with `internal_row_id=True`) ...")
display(pyarrow_table.to_pandas())
print("Deleting rows");
table.delete(pyarrow_table)
except Exception as e:
import sys, traceback
traceback.print_exc(file=sys.stdout)
print("Exception encountered:", e)
except Exception as e:
print("Schema doesn't exist:", e)
Records to delete (note we ran select with `internal_row_id=True`) ...
Citizen_Age | Citizen_Name | Citizen_experience | $row_id | |
---|---|---|---|---|
0 | 38 | Bob | 17.9 | 1 |
Deleting rows
list_rows()
Listing rows in: Database='demo-database' Schema='python-sdk-schema' Table='pythonsdkcitizen'
Citizen_Age | Citizen_Name | Citizen_experience | Is_married | |
---|---|---|---|---|
0 | 45 | Alice | 25.5 | True |
1 | 27 | Koko | 5.3 | False |
2 | 51 | Menny | 28.2 | True |
3 | 25 | Alice | 43.0 | False |
4 | 24 | Bob | 28.0 | True |