Thread run() function not working in ray-ml docker image

I have used FastAPI to get the data from the frontend, then add those data in Python Queues, and get the values of those Queues inside the thread class. The docker image that I have used is ray-ml.

As soon as I call the API, data is inserted in the queues but the run() function of thread class does not run. What can cause this?

The code implementation is something like this:

# Load Modules
import threading
from threading import Thread
import json, uuid, time, ray
from timeit import default_timer as timer

import warnings
warnings.filterwarnings('ignore')

import asyncio, requests
from queue import Queue
# for inference api
import numpy as np
import base64, os
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware import Middleware
from fastapi import FastAPI



main_queue = Queue()



class inferenceThread(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.chunk_size = 500
        self.dbname = 'cache'
        os.makedirs(self.dbname, exist_ok=True)
        print("DB NAME IS", self.dbname)
    
    def run(self):
        while True: #polling
            time.sleep(1)
            print("QUEUE SIZE", main_queue.qsize())
            
            if main_queue.qsize() >= 1:
                print("NEW QUEUE SIZE", main_queue.qsize())
                db_blob_name, experiment_id, file_id = main_queue.get()

                t1 = timer()

                print("STARTED")

                time.sleep(3)

                print("COMPLETED")

                
                print("Total time taken",timer()-t1)




   
middleware = [
    Middleware(
        CORSMiddleware,
        allow_origins=['*'],
        allow_credentials=True,
        allow_methods=['*'],
        allow_headers=['*']
    )
]

app = FastAPI(middleware=middleware, version='v0.1', title='SWIN Inference API',
                description="<b>APIs for performing inference</b></br></br>")


ray.init()
thread_lock = threading.Lock()
inference_thread = inferenceThread()
inference_thread.start()



@app.get('/mlpipeline')
def ml_pipeline(blob_db_path, experiment_id, file_id):

    global main_queue
    print("Got value", experiment_id, file_id)
    # with thread_lock:
    print("Put in queue", experiment_id, file_id)
    main_queue.put((blob_db_path, experiment_id, file_id))
    # print(main_queue.qsize())
    print("Already in queue")
    return True


I solved the issue by using BackgroundTasks of FastAPI instead of using Thread.

1 Like