- High: It blocks me to complete my task.
I am doing some DB transaction using ray worker. I have two tables one job_id and another one is queue table.
-
I need to create n workers, so that which ever the worker able to insert id(which is primary key) then the same worker will be able to fetch records from queue. later the same worker delete the id and will update the processing is completed.
-
the remaining workers are wait and try to insert again and again till success
-
I have implemented script below. surprisingly its working with one worker, but if i use more than one the first worker itself not able to finish the task
class StreamingTaskAssign:
def __init__(self) -> None:
pass
def take_record(self):
import sys, mysql.connector
connect = mysql.connector.connect \
(host="localhost", user="root", passwd="18621972", db='queue_management')
cursor=connect.cursor()
while True:
try:
print("inserting 1 in job_id table")
cursor.execute(""" INSERT INTO `job_id`(ID) VALUES(%s) """, (1,))
connect.commit()
cursor.execute("SELECT * FROM `queues` WHERE Status='INQUEUE' LIMIT 1")
print("got the records in INQUEUE status")
# cursor.reset()
for row in cursor:
row = list(row)
id = row[0]
url = row[1]
status = row[2]
# print(id,url,status)
print("url",url)
print("update the status to processing")
pid=os.getpid()
cursor.execute("""UPDATE `queues` SET Status=%s,PID=%s where ID=%s """, ('processing',pid, id))
connect.commit()
rand_wait=np.random.randint(1,5)
time.sleep(rand_wait)
cursor.execute("""DELETE FROM `job_id` WHERE ID=%s""", (1,))
connect.commit()
print("video completed")
print("update the status the comment to completed")
cursor.execute("""UPDATE `queues` SET Status=%s where ID=%s """, ('completed', id))
except Exception as e:
# time.sleep(2)
print("Unable to enter the record, waiting for term")
rand_wait=np.random.randint(1,5)
time.sleep(rand_wait)
if __name__=="__main__":
print("connecting to db")
# connect = _get_connection()
# cursor = connect.cursor()
print("connected successfully")
streaming_actors = [StreamingTaskAssign.remote() for _ in range(1)]
ray.get([actor.take_record.remote() for actor in streaming_actors])
output with num_cpu’s actor
**StreamingTaskAssign pid=4106791) inserting 1 in job_id table**
**(StreamingTaskAssign pid=4106791) got the records in INQUEUE status**
**(StreamingTaskAssign pid=4106791) url https://Url.com**
**(StreamingTaskAssign pid=4106791) update the status to processing**
**(StreamingTaskAssign pid=4106789) inserting 1 in job_id table**
**(StreamingTaskAssign pid=4106789) Unable to enter the record, waiting for term**
**(StreamingTaskAssign pid=4106790) inserting 1 in job_id table**
**(StreamingTaskAssign pid=4106790) Unable to enter the record, waiting for term**
**(StreamingTaskAssign pid=4106788) inserting 1 in job_id table**
**(StreamingTaskAssign pid=4106788) Unable to enter the record, waiting for term**
**(StreamingTaskAssign pid=4106790) inserting 1 in job_id table**
with single actor
4
connecting to db
connected successfully
(StreamingTaskAssign pid=4110015) inserting 1 in job_id table
(StreamingTaskAssign pid=4110015) got the records in INQUEUE status
(StreamingTaskAssign pid=4110015) url https://Url.com
(StreamingTaskAssign pid=4110015) update the status to processing
(StreamingTaskAssign pid=4110015) video completed
(StreamingTaskAssign pid=4110015) update the status the comment to completed
(StreamingTaskAssign pid=4110015) inserting 1 in job_id table
(StreamingTaskAssign pid=4110015) got the records in INQUEUE status
(StreamingTaskAssign pid=4110015) url https://www.youtube.com/?v=awda
(StreamingTaskAssign pid=4110015) update the status to processing
(StreamingTaskAssign pid=4110015) video completed
(StreamingTaskAssign pid=4110015) update the status the comment to completed
(StreamingTaskAssign pid=4110015) inserting 1 in job_id table
(StreamingTaskAssign pid=4110015) got the records in INQUEUE status
(StreamingTaskAssign pid=4110015) url https://www.youtube.com/?v=awda_3
(StreamingTaskAssign pid=4110015) update the status to processing
(StreamingTaskAssign pid=4110015) video completed
(StreamingTaskAssign pid=4110015) update the status the comment to completed
(StreamingTaskAssign pid=4110015) inserting 1 in job_id table
(StreamingTaskAssign pid=4110015) got the records in INQUEUE status
(StreamingTaskAssign pid=4110015) url https://www.youtube.com/?v=awda_4
(StreamingTaskAssign pid=4110015) update the status to processing
^C(StreamingTaskAssign pid=4110015) video completed
.............................