Read_sql with parallelism and write out as soon as a parallel task returns

I am trying to use read_sql to read in parallel w/parallelism set to 100.
What I’d like to do is as soon as a chunk of rows come back from any of those parallel tasks, write them out to another table.

This looked like an appropriate-ish pattern:Pattern: Using pipelining to increase throughput — Ray 2.7.1
But I am having trouble to figure out how my “queue” would get populated with batches of rows as soon as they come back from any of the read tasks that are running in parallel.

Something like this…but ideally I can just pass pointers to the memory containing rows to be written to another table.

@ray.remote
class WorkQueue:
    def __init__(self, args):
       dataset = ray.data.read_sql(
              sql=query,
              connection_factory=connection_factory,
              parallelism=parallelism,
       )
      self.shards = dataset.streaming_split(1)
      self.queue = deque()
      for shard in self.shards:
          for batch in shard.iter_batches(batch_format="pandas", batch_size=10*1024):
              self.queue.append(batch)


     def get_work_item(self):
        if self.queue:
            return self.queue.popleft(0)
        else:
            return None


@ray.remote
class WorkerWithPipelining:
    def __init__(self, work_queue):
        self.work_queue = work_queue

    def process(self, work_item):
       ... write to another table ...

    def run(self):
        self.work_item_ref = self.work_queue.get_work_item.remote()

        while True:
            # Get work from the remote queue.
            work_item = ray.get(self.work_item_ref)

            if work_item is None:
                break

            self.work_item_ref = self.work_queue.get_work_item.remote()

            # Do work while we are fetching the next work item.
            self.process(work_item)

work_queue = WorkQueue.remote(args)
worker_with_pipelining = WorkerWithPipelining.remote(work_queue)
ray.get(worker_with_pipelining.run.remote())

There must be a better way to do this…I am new to Ray.

Hi @Priyanka_Sengupta, have you tried to use Dataset.write_sql directly - https://github.com/ray-project/ray/blob/master/python/ray/data/dataset.py#L3218-L3262 ?

ds = ray.data.read_sql(
    sql=query,
    connection_factory=connection_factory,
    parallelism=parallelism,
)
ds.write_sql(...)