Can someone please help me figure out what I am doing wrong here? I am trying to run parallel processing however it seems like there is limited parallel processing happening here. I have a list of about 11k stock symbols that I need to iterate through to get data for the last two years. I am scrolling through the list and then have a nested loop for each symbol to pull data for the last 2 years one day at a time. Here is the screenshot of the debug output I have. Here is my code. By the way, I am using Python.
@ray.remote
def get_trades(symbol, start_date, end_date):
try:
#s = stocks.StocksPolygon
client = polygon.StocksClient(stocks_key)
#request_date = date(2020, 01, 01)
#end_date = date(2021, 12, 04)
delta = timedelta(days=1)
#get_bars(symbol)
while start_date <= end_date:
print(symbol, start_date)
resp = client.get_trades_vx(symbol=symbol, timestamp=start_date, sort=polygon.enums.SortOrder.ASCENDING ,order=polygon.enums.StocksTradesSort.TIMESTAMP, limit=50000)
#resp = s.get_trades(symbol=symbol, requestdate=requestdate, timestamp=timestamp, reverse=reverse, limit=limit)
#print('resp=')
#print(resp)
if resp == 'results':
df = None
else:
if 'results' in resp:
df = pd.DataFrame.from_dict(resp['results'])
#pprint.pprint(df[:5])
#if 'c' in df.columns:
if len(df) > 0:
df = df[['conditions', 'id', 'price', 'sequence_number', 'size', 'sip_timestamp', 'exchange', 'participant_timestamp', 'tape']]
df.columns = ['conditions', 'id', 'price', 'sequence_number',
'size', 'sip_timestamp', 'exchange', 'participant_timestamp', 'tape']
df.insert(0, 'symbol', symbol)
df['tdate'] = df['sip_timestamp'].map(lambda x: unix_convert.remote(x))
df['save_date'] = datetime.utcnow()
df.columns = ['symbol', 'conditions', 'id', 'price', 'sequence_number',
'size', 'sip_timestamp', 'exchange', 'participant_timestamp', 'tape', 'tdate', 'save_date']
#ray.get(redis_message('stocksdata_hist_trades', df.to_dict()))
#print(df.to_dict())
df=None
start_date += delta
start_time2 = time.time()
#print(time.time() - start_time2)
except Exception as exe:
print(exe)
traceback.print_exc()
time.sleep(10)
df = None
ray.init(ignore_reinit_error=True, num_cpus=6)
@timebudget
def download_my_stocks(operation, input):
mydata = ray.get([operation.remote(ticker,start_date,date_end) for ticker in input])
return mydata
mydata=download_my_stocks(get_trades, tickers)
ray.get(mydata)
ray.shutdown()