Run writing file jobs with the prespecified number of workers(?)

Hi,

I’m looking for the implementation of doing the following with ray:

  1. With the prespecified number of distributed workers (e.g., 32), all the workers do the jobs
    concurrently grasp the job from the job pool.
  2. Whenever the worker becomes idle, it takes the remaining job from the pool.
  3. Keep running all workers until all jobs get finished.

I considered using ray’s ActorPool; however, it didn’t work as I thought.

I guess that is because my job outcome is directly written in the file and return nothing to the main process. Hence ActorPool.map() does not return anything to the main process. I might not be correct on this.

Here’s a snippet of code that I used.

@ray.remote(num_cpus=1)
class runner:
     ...
     def run(path):
         result = do_some_job(path)         
         write_file(result)

if __name__ == "__main__":
     runners = [runner.remote() for _ in range(num_cpus)]
     actor_pool = ray.util.ActorPool(runners)
     jobs = [path1, path2, .... pathN]

     results = actor_pool.map(lambda r,j : r.run(j), jobs)
     reulsts = list(results)

How can I fix my code to attain the behavior that I wanted?
Thanks!

I think you need to do actor_pool.map(lambda r,j : r.run.remote(j), jobs)

however, it didn’t work as I thought.

Can you elaborate this a bit more? What’s the expected behavior and the actual behavior you are seeing?

@sangcho Oh, I mistype the code. yes it should be r.run.remote(j).

Here’s the traceback I had after running the code.
I suspect this happened since the output of run is none.

Traceback (most recent call last):
  File "/home/silab9/anaconda3/envs/torch190/lib/python3.8/site-packages/ray/actor.py", line 809, in __del__
AttributeError: 'NoneType' object has no attribute 'global_worker'```

Sorry the provided traceback doesn’t seem to contain all traceback? Is it the full traceback, or did you cut some of them there?

Also, is it possible to give me code that I can just run by copying/pasting it? (the existing information is not sufficient for me)

Also did you run ray.init() in your main function?

@sangcho Sorry for the late response.

Here’s the full traceback I’ve got from the run.

Exception ignored in: <function ActorHandle.__del__ at 0x7f11af8ba280>
Traceback (most recent call last):
  File "/home/silab9/anaconda3/envs/torch190/lib/python3.8/site-packages/ray/actor.py", line 809, in __del__
AttributeError: 'NoneType' object has no attribute 'global_worker'
Exception ignored in: <function ActorHandle.__del__ at 0x7f11af8ba280>
Traceback (most recent call last):
  File "/home/silab9/anaconda3/envs/torch190/lib/python3.8/site-packages/ray/actor.py", line 809, in __del__
AttributeError: 'NoneType' object has no attribute 'global_worker'
Exception ignored in: <function ActorHandle.__del__ at 0x7f11af8ba280>
Traceback (most recent call last):
  File "/home/silab9/anaconda3/envs/torch190/lib/python3.8/site-packages/ray/actor.py", line 809, in __del__
AttributeError: 'NoneType' object has no attribute 'global_worker'

This is the snippet of the code.

import ray
import optuna
import joblib


@ray.remote(num_cpus=1)
class runner:

    def __init__(self, rank: int):
        self.rank = rank
        self.num_runs = 0

    def run(self, bias, n_trials=10):
        def _objective(trial):
            x = trial.suggest_float("x", -10, 10)
            return (x - bias) ** 2

        study = optuna.create_study()
        study.optimize(_objective,
                       n_trials=n_trials)
        joblib.dump(study, "{}_{}.pkl".format(self.rank, self.num_runs))
        self.num_runs += 1


if __name__ == '__main__':
    ray.init(local_mode=False)
    num_runners = 3
    runners = [runner.remote(i) for i in range(num_runners)]
    actor_pool = ray.util.ActorPool(runners)
    jobs = [i for i in range(64)]

    results = actor_pool.map(lambda r, j: r.run.remote(j), jobs)
    results = list(results)
    ray.shutdown()

@sangcho
I find that a similar issue has been observed for many ray users.
I could resolve the issue by updating ray from 1.3.0 to 1.9.0
Thanks for the help :slight_smile:

Best,
Junyoung

1 Like