Hi guys, I’m coming from thespian and I’ve some doubts with my code, basically I’ve a list of crypto pairs like [‘btc/usd’,‘eth/usd’,…] because fetch the historical data is really slow my idea was something like this:
- create a Dispatcher actor, this actor receive the pair list and ask to AsyncFetcher actors for the historical data
- the AsyncFetcher retrieve the data for 1000 ticks and call Dispatcher.save_data method
- the Dispatcher.save_data method store the data and ask to AsyncFetcher for the new data, that is, the next 1000 ticks
- this is an infinite loop, always Dispatcher is asking for new data and asyncFetcher receiving and sending data to save_data
here I’ve some doubts, if I need many actors of some kind (AsyncFetcher) must I create a pool? I didn’t find so much documentations about Queues, but would be a good approch use queues here?
right now my code is something like this (it’s really ugly because I’m continually making changes and testing)
@ray.remote class Dispatcher: self.fetcher def set_properties( self, fetcher test=False, ): logging.info("initializing dispatcher with required db and exchange") self.fetcher = fetcher def get_historical_data(self, tickers: List[TickerAndDate]): for ticker in tickers: self.fetcher.fetch_ohlcv.remote( self.exchange, self.timeframes_mapper, ticker["pair"], ticker["init_date"], ) def save_data(self, pair: str, data: List[OHLCVData], unix_date, wait_time=0): #save data #ask for next candles self.fetcher.fetch_ohlcv.remote( self.exchange, self.timeframes_mapper, pair, int(last_candle_time) ) # fetcher actor @ray.remote class AsyncFetcher: dispatcher = None def __init__(self,dispatcher): self.dispatcher = dispatcher async def fetch_ohlcv(self, exchange, timeframes_mapper, pair, unix_date: int, wait_time=0 ): data = await exchange.fetch_ohlcv( pair, timeframes_mapper["1m"], since=unix_date, limit=settings.ACTOR_MAX_CANDLES_TO_RETRIEVE_BY_CALL, # some code here self.dispatcher.save_data.remote(pair, formated_data, unix_date, wait_time) # and here actor_dispatcher = Dispatcher.remote() actor_fetcher = AsyncFetcher.remote(self.actor_dispatcher) #here I pass the dispatcher actor in order to call the method to save actor_dispatcher.set_properties.remote( fetcher= self.actor_fetcher, #same here, I pass the actor in order to call fetch_ohlcv ) actor_dispatcher.get_historical_data.remote(pairs_to_get_historical)
this code works but I’m using a single AsyncFetcher actor, when I open the dashboard I can see a single AsyncFetcher and many IDLE process, I’d like have several AsyncFetcher actors running because this is the bottleneck, fetch the data can be a slow process
- right now I’m passing the actor fetcher to my dispatcher and the dispatcher to my actor fetcher, in order to be able to call the required method, is this the best approach?..
hope you can help me to improve this code
thank you so much guys