Need help with Parallel Processing

Can someone please help me figure out what I am doing wrong here? I am trying to run parallel processing however it seems like there is limited parallel processing happening here. I have a list of about 11k stock symbols that I need to iterate through to get data for the last two years. I am scrolling through the list and then have a nested loop for each symbol to pull data for the last 2 years one day at a time. Here is the screenshot of the debug output I have. Here is my code. By the way, I am using Python.


@ray.remote
def get_trades(symbol, start_date, end_date):
    try:

        #s = stocks.StocksPolygon
        client = polygon.StocksClient(stocks_key)


        #request_date = date(2020, 01, 01)
        #end_date = date(2021, 12, 04)
        delta = timedelta(days=1)
        #get_bars(symbol)

        while start_date <= end_date:
            print(symbol, start_date)

            resp = client.get_trades_vx(symbol=symbol, timestamp=start_date, sort=polygon.enums.SortOrder.ASCENDING ,order=polygon.enums.StocksTradesSort.TIMESTAMP, limit=50000)
            #resp = s.get_trades(symbol=symbol, requestdate=requestdate, timestamp=timestamp, reverse=reverse, limit=limit)
            #print('resp=')
            #print(resp)
            if resp == 'results':
                df = None
            else:
                if 'results' in resp: 
                    df = pd.DataFrame.from_dict(resp['results'])
                    #pprint.pprint(df[:5])
                
                    #if 'c' in df.columns:
                    if len(df) > 0:
                        df = df[['conditions', 'id', 'price', 'sequence_number', 'size', 'sip_timestamp', 'exchange', 'participant_timestamp', 'tape']]
                        df.columns = ['conditions', 'id', 'price', 'sequence_number',
                                      'size', 'sip_timestamp', 'exchange', 'participant_timestamp', 'tape']
                        df.insert(0, 'symbol', symbol)

                        df['tdate'] = df['sip_timestamp'].map(lambda x: unix_convert.remote(x))
                        df['save_date'] = datetime.utcnow()
                        df.columns = ['symbol', 'conditions', 'id', 'price', 'sequence_number',
                                      'size', 'sip_timestamp', 'exchange', 'participant_timestamp', 'tape', 'tdate', 'save_date']
                    
                        #ray.get(redis_message('stocksdata_hist_trades', df.to_dict()))
                        #print(df.to_dict())
                        df=None
            start_date += delta
        start_time2 = time.time()

        
        #print(time.time() - start_time2)
    except Exception as exe:
        print(exe)
        traceback.print_exc()
        time.sleep(10)
        df = None

ray.init(ignore_reinit_error=True, num_cpus=6)
@timebudget
def download_my_stocks(operation, input):
    mydata = ray.get([operation.remote(ticker,start_date,date_end) for ticker in input]) 
    return mydata

mydata=download_my_stocks(get_trades, tickers)
ray.get(mydata)
ray.shutdown()