Ray remote doesn't work properly on DB transactions with more than one actor

  • High: It blocks me to complete my task.

I am doing some DB transaction using ray worker. I have two tables one job_id and another one is queue table.

  1. I need to create n workers, so that which ever the worker able to insert id(which is primary key) then the same worker will be able to fetch records from queue. later the same worker delete the id and will update the processing is completed.

  2. the remaining workers are wait and try to insert again and again till success

  3. I have implemented script below. surprisingly its working with one worker, but if i use more than one the first worker itself not able to finish the task

class StreamingTaskAssign:

    def __init__(self) -> None:
        pass

    def take_record(self):
        import sys, mysql.connector
        connect = mysql.connector.connect \
        (host="localhost", user="root", passwd="18621972", db='queue_management')
        cursor=connect.cursor()
        while True:
            try:
                print("inserting 1 in job_id table")
                cursor.execute(""" INSERT INTO `job_id`(ID) VALUES(%s) """, (1,))
                connect.commit()
                cursor.execute("SELECT  * FROM  `queues` WHERE Status='INQUEUE' LIMIT 1")
                print("got the records in INQUEUE status")
            # cursor.reset()
            
                for row in cursor:
                    row = list(row)
                    id = row[0]
                    url = row[1]
                    status = row[2]
                # print(id,url,status)
                print("url",url)
                print("update the status to processing")
                pid=os.getpid()
                cursor.execute("""UPDATE `queues` SET Status=%s,PID=%s where ID=%s """, ('processing',pid, id))
                connect.commit()
                rand_wait=np.random.randint(1,5)
                time.sleep(rand_wait)
                cursor.execute("""DELETE FROM `job_id` WHERE ID=%s""", (1,))
                connect.commit()
                
                print("video completed")
                print("update the status the comment to completed")
                cursor.execute("""UPDATE `queues` SET Status=%s where ID=%s """, ('completed', id))
            except Exception as e:
                # time.sleep(2)
                print("Unable to enter the record, waiting for term")
                rand_wait=np.random.randint(1,5)
                time.sleep(rand_wait)




if __name__=="__main__":
    print("connecting to db")
    # connect = _get_connection()
    # cursor = connect.cursor()
    print("connected successfully")
    streaming_actors = [StreamingTaskAssign.remote() for _ in range(1)]
    ray.get([actor.take_record.remote() for actor in streaming_actors])

output with num_cpu’s actor

**StreamingTaskAssign pid=4106791) inserting 1 in job_id table**
**(StreamingTaskAssign pid=4106791) got the records in INQUEUE status**
**(StreamingTaskAssign pid=4106791) url https://Url.com**
**(StreamingTaskAssign pid=4106791) update the status to processing**
**(StreamingTaskAssign pid=4106789) inserting 1 in job_id table**
**(StreamingTaskAssign pid=4106789) Unable to enter the record, waiting for term**
**(StreamingTaskAssign pid=4106790) inserting 1 in job_id table**
**(StreamingTaskAssign pid=4106790) Unable to enter the record, waiting for term**
**(StreamingTaskAssign pid=4106788) inserting 1 in job_id table**
**(StreamingTaskAssign pid=4106788) Unable to enter the record, waiting for term**
**(StreamingTaskAssign pid=4106790) inserting 1 in job_id table**

with single actor


4
connecting to db
connected successfully
(StreamingTaskAssign pid=4110015) inserting 1 in job_id table
(StreamingTaskAssign pid=4110015) got the records in INQUEUE status
(StreamingTaskAssign pid=4110015) url https://Url.com
(StreamingTaskAssign pid=4110015) update the status to processing
(StreamingTaskAssign pid=4110015) video completed
(StreamingTaskAssign pid=4110015) update the status the comment to completed
(StreamingTaskAssign pid=4110015) inserting 1 in job_id table
(StreamingTaskAssign pid=4110015) got the records in INQUEUE status
(StreamingTaskAssign pid=4110015) url https://www.youtube.com/?v=awda
(StreamingTaskAssign pid=4110015) update the status to processing
(StreamingTaskAssign pid=4110015) video completed
(StreamingTaskAssign pid=4110015) update the status the comment to completed
(StreamingTaskAssign pid=4110015) inserting 1 in job_id table
(StreamingTaskAssign pid=4110015) got the records in INQUEUE status
(StreamingTaskAssign pid=4110015) url https://www.youtube.com/?v=awda_3
(StreamingTaskAssign pid=4110015) update the status to processing
(StreamingTaskAssign pid=4110015) video completed
(StreamingTaskAssign pid=4110015) update the status the comment to completed
(StreamingTaskAssign pid=4110015) inserting 1 in job_id table
(StreamingTaskAssign pid=4110015) got the records in INQUEUE status
(StreamingTaskAssign pid=4110015) url https://www.youtube.com/?v=awda_4
(StreamingTaskAssign pid=4110015) update the status to processing
^C(StreamingTaskAssign pid=4110015) video completed
.............................

hi @NSVR, welcome to the community!

This is interesting, I wonder if we are hitting the sql library issue or transaction confliction issue. When it fails, can you print the exception in print("Unable to enter the record, waiting for term") so we can see the exact error?

Hi Chen_Shen
Thanks for your reply, I figured out when I move Delete statement above the sleep(). its working as expected( but don’t know why).