Ray and Daft#

Example using Daft on a Ray cluster to query a Vast DB table and return it as a Daft Dataframe.

NOTE:

  • This notebook needs to be run with the same version of Python as the Ray Cluster.

  • This notebook does not parallelize reading data from Vast DB.

Install dependencies#

# pip uninstall -y "daft[ray]" "ray[client]==2.41.0"
%%capture --no-stderr
%pip install "ray[client,train]==2.41.0"
%pip install "daft[ray]" --no-deps
%pip install vastdb
!pip list | grep -E "(ray|daft)"
daft                      0.4.8
ray                       2.41.0

Create Vast DB Ray DataSource#

import pyarrow as pa
import vastdb
import daft
import ray
import traceback
import sys
import time
from typing import Dict, Any, Optional

@ray.remote
class VastDataSource:
    def __init__(
        self, 
        endpoint: str, 
        access_key: str, 
        secret_key: str,
        bucket_name: str, 
        schema_name: str, 
        table_name: str
    ):
        """
        Initialize VAST data source connection to be run on Ray cluster
        """
        self.endpoint = endpoint
        self.access_key = access_key
        self.secret_key = secret_key
        self.bucket_name = bucket_name
        self.schema_name = schema_name
        self.table_name = table_name
        
        # Verbose print for initialization
        print(f"[INIT] VastDataSource initialized for table: {table_name}", file=sys.stderr)
    
    def verbose_batch_reading(self, reader: pa.RecordBatchReader):
        """
        Verbose batch reading with detailed diagnostics
        """
        print(f"[DEBUG] Starting batch reading for {self.table_name}", file=sys.stderr)
        
        try:
            import socket
            import os
        
            # Capture host information
            hostname = socket.gethostname()
            ip_address = socket.gethostbyname(hostname)
            pid = os.getpid()
            
            # Print detailed host information
            print(f"[HOST] Hostname: {hostname}", file=sys.stderr)
            print(f"[DEBUG] Reader type: {type(reader)}", file=sys.stderr)
            print(f"[DEBUG] Reader schema: {reader.schema}", file=sys.stderr)
            
            # Batch collection with verbose logging
            batches = []
            batch_count = 0
            start_time = time.time()
            
            try:
                # Attempt different iteration methods
                for batch in reader:
                    print(f"[DEBUG] Collected batch {batch_count}", file=sys.stderr)
                    batches.append(batch)
                    batch_count += 1
                    
                    # Safety break to prevent infinite loops
                    if batch_count >= 1000:  # Adjust as needed
                        print(f"[WARNING] Reached batch limit of 1000", file=sys.stderr)
                        break
            except Exception as iter_err:
                print(f"[ERROR] Iteration error: {iter_err}", file=sys.stderr)
                traceback.print_exc(file=sys.stderr)
            
            # Timing and summary
            end_time = time.time()
            print(f"[DEBUG] Batch reading summary:", file=sys.stderr)
            print(f"  Total batches: {batch_count}", file=sys.stderr)
            print(f"  Total time: {end_time - start_time:.2f} seconds", file=sys.stderr)
            
            return batches
        
        except Exception as e:
            print(f"[CRITICAL] Batch reading failed: {e}", file=sys.stderr)
            traceback.print_exc(file=sys.stderr)
            return []

    def get_vast_data(self, columns=None, predicate=None, internal_row_id=False):
        """
        Retrieve data from VAST database with verbose error handling
        """
        print(f"[METHOD] get_vast_data called for {self.table_name}", file=sys.stderr)
        print(f"[PARAMS] Columns: {columns}, Predicate: {predicate}", file=sys.stderr)

        try:
            # Connection establishment with verbose logging
            try:
                session = vastdb.connect(
                    endpoint=self.endpoint,
                    access=self.access_key,
                    secret=self.secret_key
                )
                print("[CONNECTION] VAST database connection established", file=sys.stderr)
            except Exception as conn_err:
                print(f"[ERROR] Connection failed: {conn_err}", file=sys.stderr)
                traceback.print_exc(file=sys.stderr)
                raise
            
            with session.transaction() as tx:
                bucket = tx.bucket(self.bucket_name)
                schema = bucket.schema(self.schema_name)
                table = schema.table(self.table_name)
                
                try:
                    # Select data with comprehensive error handling
                    reader: pa.RecordBatchReader = table.select(
                        columns=columns, 
                        predicate=predicate,
                        internal_row_id=internal_row_id
                    )
                    
                    # Verbose batch reading
                    batches = self.verbose_batch_reading(reader)
                    
                    if not batches:
                        print("[WARNING] No batches collected", file=sys.stderr)
                        return pa.Table.from_batches([])
                    
                    # Convert to table
                    result_table = pa.Table.from_batches(batches)
                    
                    print(f"[SUCCESS] Retrieved table with {result_table.num_rows} rows", file=sys.stderr)
                    return result_table
                
                except Exception as select_err:
                    print(f"[ERROR] Data selection failed: {select_err}", file=sys.stderr)
                    traceback.print_exc(file=sys.stderr)
                    raise
        
        except Exception as e:
            print(f"[CRITICAL] Comprehensive error in get_vast_data: {e}", file=sys.stderr)
            traceback.print_exc(file=sys.stderr)
            raise ValueError(f"Error reading data from VAST: {e}")

def fetch_as_daft_dataframe(
    endpoint: str, 
    access_key: str, 
    secret_key: str,
    bucket_name: str, 
    schema_name: str, 
    table_name: str,
    columns: Optional[list] = None,
    predicate: Optional[Any] = None,
    internal_row_id: Optional[bool] = None,
    context = None
) -> daft.DataFrame:
    """
    Create a Daft DataFrame from a VAST data source running on Ray cluster
    """
    print(f"[FUNCTION] create_vast_table called for {table_name}", file=sys.stderr)
    
    try:
        # Create a Ray actor for the VAST data source
        vast_actor = VastDataSource.remote(
            endpoint, access_key, secret_key, 
            bucket_name, schema_name, table_name
        )
        
        # Retrieve Daft DataFrame via Ray with optional column and predicate filtering
        daft_dataframe = ray.get(vast_actor.get_vast_data.remote(columns, predicate, internal_row_id))
        
        print(f"[SUCCESS] Created Daft DataFrame from {table_name}", file=sys.stderr)
        return daft.from_arrow(daft_dataframe)
    
    except Exception as e:
        print(f"[ERROR] Failed to create VAST table: {e}", file=sys.stderr)
        traceback.print_exc(file=sys.stderr)
        raise

Connect to ray cluster#

import ray
from daft import context

# Initialize Ray with the specified address
if not ray.is_initialized():
    ray.init(address="ray://10.143.11.241:30001", runtime_env={"pip": ["daft", "vastdb", "pyarrow"]})

    # set daft to use ray runner
    if not context.get_context():
        context.set_runner_ray(address="ray://10.143.11.241:30001")
2025-03-26 16:44:19,259	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: log_to_driver
SIGTERM handler is not set because current thread is not the main thread.
(VastDataSource pid=65110, ip=10.1.171.2) [INIT] VastDataSource initialized for table: tweets
(VastDataSource pid=65110, ip=10.1.171.2) [METHOD] get_vast_data called for tweets
(VastDataSource pid=65110, ip=10.1.171.2) [PARAMS] Columns: ['id', 'text'], Predicate: _.text.contains('AI')
(VastDataSource pid=65110, ip=10.1.171.2) [CONNECTION] VAST database connection established
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Starting batch reading for tweets
(VastDataSource pid=65110, ip=10.1.171.2) [HOST] Hostname: raycluster-kuberay-workergroup-worker-m4vp6
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Reader type: <class 'pyarrow.lib.RecordBatchReader'>
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Reader schema: id: int64
(VastDataSource pid=65110, ip=10.1.171.2)   -- field metadata --
(VastDataSource pid=65110, ip=10.1.171.2)   VAST:column_id: '65'
(VastDataSource pid=65110, ip=10.1.171.2) text: string
(VastDataSource pid=65110, ip=10.1.171.2)   -- field metadata --
(VastDataSource pid=65110, ip=10.1.171.2)   VAST:column_id: '67'
(VastDataSource pid=65110, ip=10.1.171.2) $row_id: uint64
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Collected batch 0
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Collected batch 1
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Collected batch 2
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Collected batch 3
(VastDataSource pid=65110, ip=10.1.171.2) [DEBUG] Batch reading summary:
(VastDataSource pid=65110, ip=10.1.171.2)   Total batches: 4
(VastDataSource pid=65110, ip=10.1.171.2)   Total time: 0.40 seconds
(VastDataSource pid=65110, ip=10.1.171.2) [SUCCESS] Retrieved table with 130600 rows

Ingest Vast DB table as Daft Dataframe#

from ibis import _

# Create Daft DataFrame from VAST
df = fetch_as_daft_dataframe (
    endpoint='http://172.200.204.2:80',
    access_key='Y5101AQQTB1PUAEKQXN5',
    secret_key='bsqwYOcsvfXxsvtTYruCT24c3w1E1Y8iBpmyoLGr',
    bucket_name='csnow-db',
    schema_name='social_media',
    table_name='tweets',
    columns=['id', 'text'],
    predicate=(_.text.contains('AI')),
    internal_row_id=True
)
[FUNCTION] create_vast_table called for tweets
[SUCCESS] Created Daft DataFrame from tweets

Basic Data Engineering on the Daft Dataframe#

We aren’t really doing anything substantial here - see the Daft DataFrame API for more interesing methods.

df.show()
id
Int64
text
Utf8
$row_id
UInt64
4972466409915944915
can't believe how incredible AI is!
1572879
-1590415462612194030
impressed with how cool AI is!
1572903
8486930471810695546
finally got how kind AI is!
1572932
-8343872824439823953
ready to see how wonderful AI is!
1573017
5043897367125593095
motivated by how awesome AI is!
1573018
-1657734256534097438
just discovered how powerful AI is!
1573030
-1815539176794851507
can't wait to see how brilliant AI is!
1573053
-5287482691421413087
can't believe how wonderful AI is!
1573131
(Showing first 8 of 130600 rows)
df.count().show()
count
UInt64
130600
(Showing first 1 of 1 rows)
df.limit(5).show()
id
Int64
text
Utf8
$row_id
UInt64
4972466409915944915
can't believe how incredible AI is!
1572879
-1590415462612194030
impressed with how cool AI is!
1572903
8486930471810695546
finally got how kind AI is!
1572932
-8343872824439823953
ready to see how wonderful AI is!
1573017
5043897367125593095
motivated by how awesome AI is!
1573018
(Showing first 5 of 5 rows)
daft.sql("SELECT * FROM df WHERE text like '%love%'").show()
id
Int64
text
Utf8
$row_id
UInt64
-1002806103367248550
totally in love with how perfect AI is!
1573338
4483608969875055006
totally in love with how clever AI is!
1573501
-3485059169440733923
looking forward to see how lovely AI is!
1573670
9061853070521936991
totally in love with how perfect AI is!
1573721
4765031840801089800
just discovered how lovely AI is!
1573810
-195608600448131006
totally in love with how lovely AI is!
1574541
-3990254773133657667
totally in love with how awesome AI is!
1574564
-7108812032965286885
totally in love with how beautiful AI is!
1574910
(Showing first 8 rows)

Train ML model#

Okay, the model is a bit useless, but you get to see the mechanics of using Ray Train on Vast DB data.

# Convert the Daft dataframe to a Ray Dataset

rdf = df.to_ray_dataset()
type(rdf)
ray.data.dataset.MaterializedDataset

Coming soon Ray Train example

Shutdown Ray client#

Shutdown the ray client when you have finished using it.

You can also achieve the same by stopping the jupyter notebook skernel.

# ray.shutdown()