Ray and Daft#
Example using Daft on a Ray cluster to query a Vast DB table and return it as a Daft Dataframe.
NOTE:
This notebook needs to be run with the same version of Python as the Ray Cluster.
This notebook does not parallelize reading data from Vast DB.
Install dependencies#
# pip uninstall -y "daft[ray]" "ray[client]==2.41.0"
%%capture --no-stderr
%pip install "ray[client,train]==2.41.0"
%pip install "daft[ray]" --no-deps
%pip install vastdb
!pip list | grep -E "(ray|daft)"
daft 0.4.8
ray 2.41.0
Create Vast DB Ray DataSource#
import pyarrow as pa
import vastdb
import daft
import ray
import traceback
import sys
import time
from typing import Dict, Any, Optional
@ray.remote
class VastDataSource:
def __init__(
self,
endpoint: str,
access_key: str,
secret_key: str,
bucket_name: str,
schema_name: str,
table_name: str
):
"""
Initialize VAST data source connection to be run on Ray cluster
"""
self.endpoint = endpoint
self.access_key = access_key
self.secret_key = secret_key
self.bucket_name = bucket_name
self.schema_name = schema_name
self.table_name = table_name
# Verbose print for initialization
print(f"[INIT] VastDataSource initialized for table: {table_name}", file=sys.stderr)
def verbose_batch_reading(self, reader: pa.RecordBatchReader):
"""
Verbose batch reading with detailed diagnostics
"""
print(f"[DEBUG] Starting batch reading for {self.table_name}", file=sys.stderr)
try:
import socket
import os
# Capture host information
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
pid = os.getpid()
# Print detailed host information
print(f"[HOST] Hostname: {hostname}", file=sys.stderr)
print(f"[DEBUG] Reader type: {type(reader)}", file=sys.stderr)
print(f"[DEBUG] Reader schema: {reader.schema}", file=sys.stderr)
# Batch collection with verbose logging
batches = []
batch_count = 0
start_time = time.time()
try:
# Attempt different iteration methods
for batch in reader:
print(f"[DEBUG] Collected batch {batch_count}", file=sys.stderr)
batches.append(batch)
batch_count += 1
# Safety break to prevent infinite loops
if batch_count >= 1000: # Adjust as needed
print(f"[WARNING] Reached batch limit of 1000", file=sys.stderr)
break
except Exception as iter_err:
print(f"[ERROR] Iteration error: {iter_err}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Timing and summary
end_time = time.time()
print(f"[DEBUG] Batch reading summary:", file=sys.stderr)
print(f" Total batches: {batch_count}", file=sys.stderr)
print(f" Total time: {end_time - start_time:.2f} seconds", file=sys.stderr)
return batches
except Exception as e:
print(f"[CRITICAL] Batch reading failed: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
return []
def get_vast_data(self, columns=None, predicate=None, internal_row_id=False):
"""
Retrieve data from VAST database with verbose error handling
"""
print(f"[METHOD] get_vast_data called for {self.table_name}", file=sys.stderr)
print(f"[PARAMS] Columns: {columns}, Predicate: {predicate}", file=sys.stderr)
try:
# Connection establishment with verbose logging
try:
session = vastdb.connect(
endpoint=self.endpoint,
access=self.access_key,
secret=self.secret_key
)
print("[CONNECTION] VAST database connection established", file=sys.stderr)
except Exception as conn_err:
print(f"[ERROR] Connection failed: {conn_err}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
raise
with session.transaction() as tx:
bucket = tx.bucket(self.bucket_name)
schema = bucket.schema(self.schema_name)
table = schema.table(self.table_name)
try:
# Select data with comprehensive error handling
reader: pa.RecordBatchReader = table.select(
columns=columns,
predicate=predicate,
internal_row_id=internal_row_id
)
# Verbose batch reading
batches = self.verbose_batch_reading(reader)
if not batches:
print("[WARNING] No batches collected", file=sys.stderr)
return pa.Table.from_batches([])
# Convert to table
result_table = pa.Table.from_batches(batches)
print(f"[SUCCESS] Retrieved table with {result_table.num_rows} rows", file=sys.stderr)
return result_table
except Exception as select_err:
print(f"[ERROR] Data selection failed: {select_err}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
raise
except Exception as e:
print(f"[CRITICAL] Comprehensive error in get_vast_data: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
raise ValueError(f"Error reading data from VAST: {e}")
def fetch_as_daft_dataframe(
endpoint: str,
access_key: str,
secret_key: str,
bucket_name: str,
schema_name: str,
table_name: str,
columns: Optional[list] = None,
predicate: Optional[Any] = None,
internal_row_id: Optional[bool] = None,
context = None
) -> daft.DataFrame:
"""
Create a Daft DataFrame from a VAST data source running on Ray cluster
"""
print(f"[FUNCTION] create_vast_table called for {table_name}", file=sys.stderr)
try:
# Create a Ray actor for the VAST data source
vast_actor = VastDataSource.remote(
endpoint, access_key, secret_key,
bucket_name, schema_name, table_name
)
# Retrieve Daft DataFrame via Ray with optional column and predicate filtering
daft_dataframe = ray.get(vast_actor.get_vast_data.remote(columns, predicate, internal_row_id))
print(f"[SUCCESS] Created Daft DataFrame from {table_name}", file=sys.stderr)
return daft.from_arrow(daft_dataframe)
except Exception as e:
print(f"[ERROR] Failed to create VAST table: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
raise
Connect to ray cluster#
import ray
from daft import context
# Initialize Ray with the specified address
if not ray.is_initialized():
ray.init(address="ray://10.143.11.241:30001", runtime_env={"pip": ["daft", "vastdb", "pyarrow"]})
# set daft to use ray runner
if not context.get_context():
context.set_runner_ray(address="ray://10.143.11.241:30001")
2025-03-26 16:44:19,259 INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: log_to_driver
SIGTERM handler is not set because current thread is not the main thread.
(VastDataSource pid=65110, ip=10.1.171.2) [INIT] VastDataSource initialized for table: tweets
(VastDataSource pid=65110, ip=10.1.171.2) [METHOD] get_vast_data called for tweets
(VastDataSource pid=65110, ip=10.1.171.2) [PARAMS] Columns: ['id', 'text'], Predicate: _.text.contains('AI')
(VastDataSource pid=65110, ip=10.1.171.2) [CONNECTION] VAST database connection established
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Starting batch reading for tweets
(VastDataSource pid=65110, ip=10.1.171.2) [HOST] Hostname: raycluster-kuberay-workergroup-worker-m4vp6
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Reader type: <class 'pyarrow.lib.RecordBatchReader'>
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Reader schema: id: int64
(VastDataSource pid=65110, ip=10.1.171.2) -- field metadata --
(VastDataSource pid=65110, ip=10.1.171.2) VAST:column_id: '65'
(VastDataSource pid=65110, ip=10.1.171.2) text: string
(VastDataSource pid=65110, ip=10.1.171.2) -- field metadata --
(VastDataSource pid=65110, ip=10.1.171.2) VAST:column_id: '67'
(VastDataSource pid=65110, ip=10.1.171.2) $row_id: uint64
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Collected batch 0
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Collected batch 1
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Collected batch 2
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Collected batch 3
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Batch reading summary:
(VastDataSource pid=65110, ip=10.1.171.2) Total batches: 4
(VastDataSource pid=65110, ip=10.1.171.2) Total time: 0.40 seconds
(VastDataSource pid=65110, ip=10.1.171.2) [SUCCESS] Retrieved table with 130600 rows
Ingest Vast DB table as Daft Dataframe#
from ibis import _
# Create Daft DataFrame from VAST
df = fetch_as_daft_dataframe (
endpoint='http://172.200.204.2:80',
access_key='Y5101AQQTB1PUAEKQXN5',
secret_key='bsqwYOcsvfXxsvtTYruCT24c3w1E1Y8iBpmyoLGr',
bucket_name='csnow-db',
schema_name='social_media',
table_name='tweets',
columns=['id', 'text'],
predicate=(_.text.contains('AI')),
internal_row_id=True
)
[FUNCTION] create_vast_table called for tweets
[SUCCESS] Created Daft DataFrame from tweets
Basic Data Engineering on the Daft Dataframe#
We aren’t really doing anything substantial here - see the Daft DataFrame API for more interesing methods.
df.show()
id Int64 | text Utf8 | $row_id UInt64 |
---|---|---|
4972466409915944915 | can't believe how incredible AI is! | 1572879 |
-1590415462612194030 | impressed with how cool AI is! | 1572903 |
8486930471810695546 | finally got how kind AI is! | 1572932 |
-8343872824439823953 | ready to see how wonderful AI is! | 1573017 |
5043897367125593095 | motivated by how awesome AI is! | 1573018 |
-1657734256534097438 | just discovered how powerful AI is! | 1573030 |
-1815539176794851507 | can't wait to see how brilliant AI is! | 1573053 |
-5287482691421413087 | can't believe how wonderful AI is! | 1573131 |
df.count().show()
count UInt64 |
---|
130600 |
df.limit(5).show()
id Int64 | text Utf8 | $row_id UInt64 |
---|---|---|
4972466409915944915 | can't believe how incredible AI is! | 1572879 |
-1590415462612194030 | impressed with how cool AI is! | 1572903 |
8486930471810695546 | finally got how kind AI is! | 1572932 |
-8343872824439823953 | ready to see how wonderful AI is! | 1573017 |
5043897367125593095 | motivated by how awesome AI is! | 1573018 |
daft.sql("SELECT * FROM df WHERE text like '%love%'").show()
id Int64 | text Utf8 | $row_id UInt64 |
---|---|---|
-1002806103367248550 | totally in love with how perfect AI is! | 1573338 |
4483608969875055006 | totally in love with how clever AI is! | 1573501 |
-3485059169440733923 | looking forward to see how lovely AI is! | 1573670 |
9061853070521936991 | totally in love with how perfect AI is! | 1573721 |
4765031840801089800 | just discovered how lovely AI is! | 1573810 |
-195608600448131006 | totally in love with how lovely AI is! | 1574541 |
-3990254773133657667 | totally in love with how awesome AI is! | 1574564 |
-7108812032965286885 | totally in love with how beautiful AI is! | 1574910 |
Train ML model#
Okay, the model is a bit useless, but you get to see the mechanics of using Ray Train on Vast DB data.
# Convert the Daft dataframe to a Ray Dataset
rdf = df.to_ray_dataset()
type(rdf)
ray.data.dataset.MaterializedDataset
Coming soon Ray Train example …
Shutdown Ray client#
Shutdown the ray client when you have finished using it.
You can also achieve the same by stopping the jupyter notebook skernel.
# ray.shutdown()