The number of actors dont’t increase more than the head node. Earlier I was using ray client to submit jobs. Now I have changed it to use the ray jobs API as this seems to be the recommended way in production. While using ray client, the autoscaling happens. Also when concurrency is directly mentioned as a higher number of workers, it does work correctly. When I pass it as a tuple and expect it to scale up it doesn’t happen.
def main(df_path, metric_name, device, num_cpus, concurrency, model_id, dataset_id, **kwargs):
df = pl.read_parquet(df_path)
print("iterating over files")
print(df)
ray_dataset = ray.data.from_items(df.rows(named=True))
callable_class = load_metric_function(metric_name)
print("Going to start the mapping")
if device == "cpu":
(
ray_dataset.map(
callable_class,
# fn_kwargs={"write_to_db": False},
fn_constructor_kwargs={"device": device, **kwargs},
num_cpus=num_cpus,
concurrency=(1, concurrency),
)
.materialize()
.take_all()
)
else:
(
ray_dataset.map(
callable_class,
# fn_kwargs={"write_to_db": False},
fn_constructor_kwargs={"device": device, **kwargs},
num_gpus=1,
concurrency=(1, concurrency),
)
.materialize()
.take_all()
)
evaluation_aggregate_v2(model_id, dataset_id, metric_name)
def parse_arguments():
parser = argparse.ArgumentParser(description="Run the ray job with specified parameters.")
parser.add_argument("--df_path", required=True, help="DataFrame input path")
parser.add_argument("--metric", required=True, help="Metric name")
parser.add_argument("--device", required=True, choices=["cpu", "cuda"], help="Device to use")
parser.add_argument("--num-cpus", type=float, required=True, help="Number of CPUs to use")
parser.add_argument("--concurrency", type=int, required=True, help="Concurrency level")
parser.add_argument("--model_id", required=True, help="Model ID")
parser.add_argument("--dataset_id", required=True, help="Dataset ID")
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
result = main(
args.df_path, args.metric, args.device, args.num_cpus, args.concurrency, args.model_id, args.dataset_id
)
print(result)
The above is the ray job and it is submitted as follows:
def submit_job(self, df_path, metric_name, device, num_cpus, concurrency, env_vars, model_id, dataset_id):
print(f"{self.ray_dashboard_url}/api/jobs/")
res = requests.post(
f"{self.ray_dashboard_url}/api/jobs/",
json={
"entrypoint": f"python ray_job.py --df_path {df_path} --metric {metric_name} --device {device} --num-cpus {num_cpus} --concurrency {concurrency} --model_id {model_id} --dataset_id {dataset_id}",
"runtime_env": {"env_vars": env_vars},
},
)
self.job_id = res.json()["job_id"]
return self.job_id
Any ideas?
