How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I installed ray inside my databricks cluster following the next guide.
My idea was to use ray tune inside a Spark UDF like follows:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import numpy as np
from sklearn import linear_model
import ray
from ray import tune
covid_df = (spark
.read
.option("header", "true")
.option('inferSchema', 'true')
.csv('/databricks-datasets/COVID/USAFacts/covid_deaths_usafacts.csv'))
select_cols = covid_df.columns[4:]
df = (covid_df
.select(
col('County Name').alias('county_name'),
array([col(n) for n in select_cols]
).alias('deaths')))
@ray.remote
def linear_pred(x,y, i):
reg = linear_model.ElasticNet().fit(x, y)
p = reg.predict(np.array([[i + 1]]))
return p[0]
@pandas_udf(ArrayType(LongType()))
def ray_udf(s):
def objective(config):
score = config["a"] ** 2 + config["b"]
return {"score": score}
# 2. Define a search space.
search_space = {
"a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
"b": tune.choice([1, 2, 3]),
}
# 3. Start a Tune run and print the best result.
analysis = tune.run(objective, config=search_space, fail_fast="raise", verbose=3)
print(analysis.get_best_config(metric="score", mode="min"))
s = list(s)
pred = []
workers = []
for i in range(len(s)):
x = list(range(i+1))
x = np.asarray([[n] for n in x])
y = s[:i+1]
y = np.asarray(y)
workers.append(linear_pred.remote(x, y, i))
pred = ray.get(workers)
return pd.Series(pred)
res = df.select("county_name", "deaths", ray_udf("deaths").alias("preds"))
display(res)
But I am seeing the next error on spark workers:
Traceback (most recent call last):
File "python/ray/_raylet.pyx", line 799, in ray._raylet.task_execution_handler
File "python/ray/_raylet.pyx", line 618, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 740, in ray._raylet.execute_task
File "/databricks/python/lib/python3.8/site-packages/ray/_private/function_manager.py", line 589, in temporary_actor_method
raise RuntimeError(
RuntimeError: The actor with name ImplicitFunc failed to import on the worker. This may be because needed library dependencies are not installed in the worker environment:
Traceback (most recent call last):
File "/databricks/python/lib/python3.8/site-packages/ray/_private/function_manager.py", line 628, in _load_actor_class_from_gcs
actor_class = pickle.loads(pickled_class)
File "/databricks/python/lib/python3.8/site-packages/ray/tune/__init__.py", line 2, in <module>
from ray.tune.tune import run_experiments, run
File "/databricks/python/lib/python3.8/site-packages/ray/tune/tune.py", line 17, in <module>
from ray.tune.analysis import ExperimentAnalysis
File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/__init__.py", line 1, in <module>
from ray.tune.analysis.experiment_analysis import Analysis, ExperimentAnalysis
File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/experiment_analysis.py", line 18, in <module>
import pandas as pd
File "/databricks/spark/python/pyspark/pandas/__init__.py", line 32, in <module>
require_minimum_pandas_version()
File "/databricks/spark/python/pyspark/sql/pandas/utils.py", line 35, in require_minimum_pandas_version
if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
AttributeError: partially initialized module 'pandas' has no attribute '__version__' (most likely due to a circular import)
An unexpected internal error occurred while the worker was executing a task.
[2m[36m(TemporaryActor pid=20686)[0m 2022-06-22 12:00:00,095 ERROR serialization.py:342 -- partially initialized module 'pandas' has no attribute '__version__' (most likely due to a circular import)
[2m[36m(TemporaryActor pid=20686)[0m Traceback (most recent call last):
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
[2m[36m(TemporaryActor pid=20686)[0m obj = self._deserialize_object(data, metadata, object_ref)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
[2m[36m(TemporaryActor pid=20686)[0m return self._deserialize_msgpack_data(data, metadata_fields)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
[2m[36m(TemporaryActor pid=20686)[0m python_objects = self._deserialize_pickle5_data(pickle5_data)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
[2m[36m(TemporaryActor pid=20686)[0m obj = pickle.loads(in_band)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/__init__.py", line 2, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.tune import run_experiments, run
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/tune.py", line 17, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.analysis import ExperimentAnalysis
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/__init__.py", line 1, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.analysis.experiment_analysis import Analysis, ExperimentAnalysis
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/experiment_analysis.py", line 18, in <module>
[2m[36m(TemporaryActor pid=20686)[0m import pandas as pd
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/spark/python/pyspark/pandas/__init__.py", line 32, in <module>
[2m[36m(TemporaryActor pid=20686)[0m require_minimum_pandas_version()
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/spark/python/pyspark/sql/pandas/utils.py", line 35, in require_minimum_pandas_version
[2m[36m(TemporaryActor pid=20686)[0m if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
[2m[36m(TemporaryActor pid=20686)[0m AttributeError: partially initialized module 'pandas' has no attribute '__version__' (most likely due to a circular import)
[2m[36m(TemporaryActor pid=20686)[0m 2022-06-22 12:00:00,098 ERROR serialization.py:342 -- partially initialized module 'pandas' has no attribute '__version__' (most likely due to a circular import)
[2m[36m(TemporaryActor pid=20686)[0m Traceback (most recent call last):
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
[2m[36m(TemporaryActor pid=20686)[0m obj = self._deserialize_object(data, metadata, object_ref)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
[2m[36m(TemporaryActor pid=20686)[0m return self._deserialize_msgpack_data(data, metadata_fields)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
[2m[36m(TemporaryActor pid=20686)[0m python_objects = self._deserialize_pickle5_data(pickle5_data)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
[2m[36m(TemporaryActor pid=20686)[0m obj = pickle.loads(in_band)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/__init__.py", line 2, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.tune import run_experiments, run
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/tune.py", line 17, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.analysis import ExperimentAnalysis
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/__init__.py", line 1, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.analysis.experiment_analysis import Analysis, ExperimentAnalysis
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/experiment_analysis.py", line 18, in <module>
[2m[36m(TemporaryActor pid=20686)[0m import pandas as pd
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/spark/python/pyspark/pandas/__init__.py", line 32, in <module>
[2m[36m(TemporaryActor pid=20686)[0m require_minimum_pandas_version()
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/spark/python/pyspark/sql/pandas/utils.py", line 35, in require_minimum_pandas_version
[2m[36m(TemporaryActor pid=20686)[0m if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
[2m[36m(TemporaryActor pid=20686)[0m AttributeError: partially initialized module 'pandas' has no attribute '__version__' (most likely due to a circular import)
[2m[36m(TemporaryActor pid=20686)[0m 2022-06-22 12:00:00,099 ERROR worker.py:451 -- SystemExit was raised from the worker.
[2m[36m(TemporaryActor pid=20686)[0m Traceback (most recent call last):
[2m[36m(TemporaryActor pid=20686)[0m File "python/ray/_raylet.pyx", line 625, in ray._raylet.execute_task
[2m[36m(TemporaryActor pid=20686)[0m File "python/ray/_raylet.pyx", line 649, in ray._raylet.execute_task
[2m[36m(TemporaryActor pid=20686)[0m File "python/ray/_raylet.pyx", line 480, in ray._raylet.raise_if_dependency_failed
[2m[36m(TemporaryActor pid=20686)[0m ray.exceptions.RaySystemError: System error: partially initialized module 'pandas' has no attribute '__version__' (most likely due to a circular import)
[2m[36m(TemporaryActor pid=20686)[0m traceback: Traceback (most recent call last):
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 340, in deserialize_objects
[2m[36m(TemporaryActor pid=20686)[0m obj = self._deserialize_object(data, metadata, object_ref)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 237, in _deserialize_object
[2m[36m(TemporaryActor pid=20686)[0m return self._deserialize_msgpack_data(data, metadata_fields)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 192, in _deserialize_msgpack_data
[2m[36m(TemporaryActor pid=20686)[0m python_objects = self._deserialize_pickle5_data(pickle5_data)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/serialization.py", line 182, in _deserialize_pickle5_data
[2m[36m(TemporaryActor pid=20686)[0m obj = pickle.loads(in_band)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/__init__.py", line 2, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.tune import run_experiments, run
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/tune.py", line 17, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.analysis import ExperimentAnalysis
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/__init__.py", line 1, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.analysis.experiment_analysis import Analysis, ExperimentAnalysis
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/experiment_analysis.py", line 18, in <module>
[2m[36m(TemporaryActor pid=20686)[0m import pandas as pd
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/spark/python/pyspark/pandas/__init__.py", line 32, in <module>
[2m[36m(TemporaryActor pid=20686)[0m require_minimum_pandas_version()
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/spark/python/pyspark/sql/pandas/utils.py", line 35, in require_minimum_pandas_version
[2m[36m(TemporaryActor pid=20686)[0m if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
[2m[36m(TemporaryActor pid=20686)[0m AttributeError: partially initialized module 'pandas' has no attribute '__version__' (most likely due to a circular import)
[2m[36m(TemporaryActor pid=20686)[0m
[2m[36m(TemporaryActor pid=20686)[0m
[2m[36m(TemporaryActor pid=20686)[0m During handling of the above exception, another exception occurred:
[2m[36m(TemporaryActor pid=20686)[0m
[2m[36m(TemporaryActor pid=20686)[0m Traceback (most recent call last):
[2m[36m(TemporaryActor pid=20686)[0m File "python/ray/_raylet.pyx", line 799, in ray._raylet.task_execution_handler
[2m[36m(TemporaryActor pid=20686)[0m File "python/ray/_raylet.pyx", line 618, in ray._raylet.execute_task
[2m[36m(TemporaryActor pid=20686)[0m File "python/ray/_raylet.pyx", line 740, in ray._raylet.execute_task
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/_private/function_manager.py", line 589, in temporary_actor_method
[2m[36m(TemporaryActor pid=20686)[0m raise RuntimeError(
[2m[36m(TemporaryActor pid=20686)[0m RuntimeError: The actor with name ImplicitFunc failed to import on the worker. This may be because needed library dependencies are not installed in the worker environment:
[2m[36m(TemporaryActor pid=20686)[0m
[2m[36m(TemporaryActor pid=20686)[0m Traceback (most recent call last):
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/_private/function_manager.py", line 628, in _load_actor_class_from_gcs
[2m[36m(TemporaryActor pid=20686)[0m actor_class = pickle.loads(pickled_class)
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/__init__.py", line 2, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.tune import run_experiments, run
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/tune.py", line 17, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.analysis import ExperimentAnalysis
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/__init__.py", line 1, in <module>
[2m[36m(TemporaryActor pid=20686)[0m from ray.tune.analysis.experiment_analysis import Analysis, ExperimentAnalysis
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/python/lib/python3.8/site-packages/ray/tune/analysis/experiment_analysis.py", line 18, in <module>
[2m[36m(TemporaryActor pid=20686)[0m import pandas as pd
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/spark/python/pyspark/pandas/__init__.py", line 32, in <module>
[2m[36m(TemporaryActor pid=20686)[0m require_minimum_pandas_version()
[2m[36m(TemporaryActor pid=20686)[0m File "/databricks/spark/python/pyspark/sql/pandas/utils.py", line 35, in require_minimum_pandas_version
[2m[36m(TemporaryActor pid=20686)[0m if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
[2m[36m(TemporaryActor pid=20686)[0m AttributeError: partially initialized module 'pandas' has no attribute '__version__' (most likely due to a circular import)
[2m[36m(TemporaryActor pid=20686)[0m
[2m[36m(TemporaryActor pid=20686)[0m
[2m[36m(TemporaryActor pid=20686)[0m During handling of the above exception, another exception occurred:
[2m[36m(TemporaryActor pid=20686)[0m
[2m[36m(TemporaryActor pid=20686)[0m Traceback (most recent call last):
[2m[36m(TemporaryActor pid=20686)[0m File "python/ray/_raylet.pyx", line 828, in ray._raylet.task_execution_handler
It looks like ray is trying to import pandas from pyspark instead of base env.