DataSource blocks from Arrow RecordBatchReader?

I have a backend that is capable of returning an Arrow RecordBatchReader.

Can I make blocks using the read_next_batch method so that batches are processed in a distributed fashion across the ray cluster.

I think the RecordBatchReader would need to be a singleton across the cluster and distribute each calls to read_next_batch() across the workers?

Hi @chris_snow - what are you trying to do exactly?

Depending on the throughput you need you could easily farm this out via ray.remote calls

I have an api to query for my vast data lakehouse database:

with session.transaction() as tx:
    try:
        schema = tx.bucket(DATABASE_NAME).schema(name=DATABASE_SCHEMA, fail_if_missing=False)
        if schema:
            try:
                table = schema.table(name=TABLE_NAME)
                reader = table.select(predicate=predicate) # RecordBatchReader

I would like data processing to scale across multiple ray nodes.

Currently, the Vast python api doesn’t support sharding the reads to multiple clients.

A bit more info on the Vast interaction with RecordBatchReader:

https://vast-data.github.io/data-platform-field-docs/vast_database/sdk_ref/15_recordbatchreader.html

Any thoughts on this @rliaw? Should I just bug my engineering team to update the client with a shard id?

you should implement your own custom datasource.

See: Loading Data — Ray 2.42.1

here is an example datasource: ray/python/ray/data/_internal/datasource/mongo_datasource.py at master · ray-project/ray · GitHub

and then you can do something like:

read_datasource(VastDatasource(…))

thanks @rliaw. I had tried the datasource approach (here) but without vastdb client support for partitioning the query response, it appeared the only options seems to be to read the whole query response into a single node.