Ray always use a single actor

Hi guys, I’m coming from thespian and I’ve some doubts with my code, basically I’ve a list of crypto pairs like [‘btc/usd’,‘eth/usd’,…] because fetch the historical data is really slow my idea was something like this:

  1. create a Dispatcher actor, this actor receive the pair list and ask to AsyncFetcher actors for the historical data
  2. the AsyncFetcher retrieve the data for 1000 ticks and call Dispatcher.save_data method
  3. the Dispatcher.save_data method store the data and ask to AsyncFetcher for the new data, that is, the next 1000 ticks
  4. this is an infinite loop, always Dispatcher is asking for new data and asyncFetcher receiving and sending data to save_data

here I’ve some doubts, if I need many actors of some kind (AsyncFetcher) must I create a pool? I didn’t find so much documentations about Queues, but would be a good approch use queues here?

right now my code is something like this (it’s really ugly because I’m continually making changes and testing)

@ray.remote
class Dispatcher:
  self.fetcher

      def set_properties(
        self,
        fetcher
        test=False,
    ):
        logging.info("initializing dispatcher with required db and exchange")
        self.fetcher = fetcher
  

      def get_historical_data(self, tickers: List[TickerAndDate]):
        for ticker in tickers:
             self.fetcher.fetch_ohlcv.remote(
                self.exchange,
                self.timeframes_mapper,
                ticker["pair"],
                ticker["init_date"],
            )

      def save_data(self, pair: str, data: List[OHLCVData], unix_date, wait_time=0):
           #save data
           #ask for next candles

          self.fetcher.fetch_ohlcv.remote(
            self.exchange, self.timeframes_mapper, pair, int(last_candle_time)
        )


# fetcher actor

@ray.remote
class AsyncFetcher:
    dispatcher = None
    
    def __init__(self,dispatcher):
        self.dispatcher = dispatcher
    
    
    async def fetch_ohlcv(self,
        exchange, timeframes_mapper, pair, unix_date: int, wait_time=0
    ):
   
            data = await exchange.fetch_ohlcv(
                pair,
                timeframes_mapper["1m"],
                since=unix_date,
                limit=settings.ACTOR_MAX_CANDLES_TO_RETRIEVE_BY_CALL,
            
            # some code here
            self.dispatcher.save_data.remote(pair, formated_data, unix_date, wait_time)
            # and here


actor_dispatcher = Dispatcher.remote()
actor_fetcher = AsyncFetcher.remote(self.actor_dispatcher)  #here I pass the dispatcher actor in order to call the method to save

actor_dispatcher.set_properties.remote(
            fetcher= self.actor_fetcher, #same here, I pass the actor in order to call fetch_ohlcv
    )

actor_dispatcher.get_historical_data.remote(pairs_to_get_historical)

this code works but I’m using a single AsyncFetcher actor, when I open the dashboard I can see a single AsyncFetcher and many IDLE process, I’d like have several AsyncFetcher actors running because this is the bottleneck, fetch the data can be a slow process

  1. right now I’m passing the actor fetcher to my dispatcher and the dispatcher to my actor fetcher, in order to be able to call the required method, is this the best approach?..

hope you can help me to improve this code
thank you so much guys