Modin with ray client on k8s

Not sure if I should post it on Modin’s github, but do we have a way of getting Modin working with ray client on k8s.
Right now modin requires you to run ray.init() on head node, and then initialising modin as described in this link - Using Modin — Modin 0.8.3 documentation

However, it is unable to detect the ray runtime when running it with ray-client on a different node and it tries to start a fresh instance of ray. Is there any workaround for this?

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-3-7e7b44bd7fd7> in <module>()
----> 1 import modin.pandas as pd

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/pandas/__init__.py in <module>()
    178 
    179 
--> 180 Engine.subscribe(_update_engine)
    181 
    182 from .. import __version__

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/config/pubsub.py in subscribe(cls, callback)
    105     def subscribe(cls, callback):
    106         cls._subs.append(callback)
--> 107         callback(cls)
    108 
    109     @classmethod

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/pandas/__init__.py in _update_engine(publisher)
    117             os.environ["OMP_NUM_THREADS"] = str(multiprocessing.cpu_count())
    118         if _is_first_update.get("Ray", True):
--> 119             initialize_ray()
    120         num_cpus = ray.cluster_resources()["CPU"]
    121     elif publisher.get() == "Dask":  # pragma: no cover

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/ray/utils.py in initialize_ray(override_is_cluster, override_redis_address, override_redis_password)
    147                 logging_level=100,
    148                 _memory=object_store_memory,
--> 149                 _lru_evict=True,
    150             )
    151 

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
     44         global _client_hook_enabled
     45         if client_mode_enabled and _client_hook_enabled:
---> 46             return getattr(ray, func.__name__)(*args, **kwargs)
     47         return func(*args, **kwargs)
     48 

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/ray/util/client/__init__.py in init(self, *args, **kwargs)
     98         self._server, address_info = ray_client_server.init_and_serve(
     99             "localhost:50051", *args, **kwargs)
--> 100         self.connect("localhost:50051")
    101         self._connected_with_init = True
    102         return address_info

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/ray/util/client/__init__.py in connect(self, conn_str, secure, metadata, connection_retries)
     44                 return
     45             raise Exception(
---> 46                 "ray.connect() called, but ray client is already connected")
     47         if not self._inside_client_test:
     48             # If we're calling a client connect specifically and we're not

Exception: ray.connect() called, but ray client is already connected

@Bhavya_Agarwal how are you using ray client? Are you making sure you invoke to ray client before importing modin?

import ray
ray.util.connect()
import modin.pandas as pd

@Alex yes, I did connect to ray client

import ray
import ray.util
ray.util.connect("SERVICE_IP:50051")

It also succeeded giving me this output -

{'num_clients': 1,
 'python_version': '3.7.7',
 'ray_version': '2.0.0.dev0',
 'ray_commit': 'd96a9fa19225b95b51d9d4422ad82324e75ad6d0'}

@barakmich is this a potential ray client bug? Modin does this to initialize Ray.

I’m not sure where this lives; I don’t have enough information to say if it’s a Modin bug or a ray client bug.

Given that Modin is doing ray initialization (and not just a library atop ray) it’s probably up to Modin to connect appropriately (ie, not initialize, but connect)

So I’d ask them, and come back around if they have a need related to detecting client mode (which we can easily do) or similar.

@devin-petersohn do you have any thoughts?

@barakmich @Alex I’ll give context from the Modin side.

Currently we detect Ray’s initialization via ray.is_initialized()(code), then also ignore_reinit_error=True (code).

I don’t really think this is a Modin bug, unless I’m using the builtin utilities wrong. Is there something else I should be doing?

Ok, so it looks like updating to the GitHub master branch will work, another user also asked this in Modin’s Discourse (link).

@Bhavya_Agarwal can you update to the latest Modin on the GitHub master to see if that works for you?

2 Likes

@devin-petersohn The connection error goes away with the master branch. I’ll try that first next time!

@devin-petersohn Follow up question, I am getting error when I try to download a csv file from s3 directly. I am taking 100k rows from the Higgs example here - xgboost_ray/higgs.py at master · ray-project/xgboost_ray · GitHub

This is the code

import ray
import os
import ray.util
ray.util.connect("<service_ip>:50051")
import modin.pandas as pd
pd.DEFAULT_NPARTITIONS = 10
df = pd.read_csv("s3://<bucket>/HIGGS_100k.csv")

This fails with

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-10-9b3c648a226d> in <module>()
----> 1 df = pd.read_csv("s3://<s3_bucket>/HIGGS_100k.csv")

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/pandas/io.py in parser_func(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, escapechar, comment, encoding, dialect, error_bad_lines, warn_bad_lines, skipfooter, doublequote, delim_whitespace, low_memory, memory_map, float_precision)
    114 
    115         kwargs = {k: v for k, v in f_locals.items() if k in _pd_read_csv_signature}
--> 116         return _read(**kwargs)
    117 
    118     return parser_func

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/pandas/io.py in _read(**kwargs)
    133 
    134     Engine.subscribe(_update_engine)
--> 135     pd_obj = EngineDispatcher.read_csv(**kwargs)
    136     # This happens when `read_csv` returns a TextFileReader object for iterating through
    137     if isinstance(pd_obj, pandas.io.parsers.TextFileReader):

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/data_management/factories/dispatcher.py in read_csv(cls, **kwargs)
    102     @classmethod
    103     def read_csv(cls, **kwargs):
--> 104         return cls.__engine._read_csv(**kwargs)
    105 
    106     @classmethod

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/data_management/factories/factories.py in _read_csv(cls, **kwargs)
     85     @classmethod
     86     def _read_csv(cls, **kwargs):
---> 87         return cls.io_cls.read_csv(**kwargs)
     88 
     89     @classmethod

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/file_dispatcher.py in read(cls, *args, **kwargs)
     27     @classmethod
     28     def read(cls, *args, **kwargs):
---> 29         query_compiler = cls._read(*args, **kwargs)
     30         # TODO (devin-petersohn): Make this section more general for non-pandas kernel
     31         # implementations.

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/text/csv_dispatcher.py in _read(cls, filepath_or_buffer, **kwargs)
    192         dtypes = cls.get_dtypes(dtypes_ids) if len(dtypes_ids) > 0 else None
    193 
--> 194         partition_ids = cls.build_partition(partition_ids, row_lengths, column_widths)
    195         # If parse_dates is present, the column names that we have might not be
    196         # the same length as the returned column names. If we do need to modify

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/text/text_file_dispatcher.py in build_partition(cls, partition_ids, row_lengths, column_widths)
     51                     for j in range(len(partition_ids[i]))
     52                 ]
---> 53                 for i in range(len(partition_ids))
     54             ]
     55         )

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/text/text_file_dispatcher.py in <listcomp>(.0)
     51                     for j in range(len(partition_ids[i]))
     52                 ]
---> 53                 for i in range(len(partition_ids))
     54             ]
     55         )

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/base/io/text/text_file_dispatcher.py in <listcomp>(.0)
     49                         width=column_widths[j],
     50                     )
---> 51                     for j in range(len(partition_ids[i]))
     52                 ]
     53                 for i in range(len(partition_ids))

/home/bhavya.agarwal/.local/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition.py in __init__(self, object_id, length, width, ip, call_queue)
     25 class PandasOnRayFramePartition(BaseFramePartition):
     26     def __init__(self, object_id, length=None, width=None, ip=None, call_queue=None):
---> 27         assert type(object_id) is ray.ObjectID
     28 
     29         self.oid = object_id

AssertionError:

We can fix that. Will you raise an issue on Modin’s github? Issues · modin-project/modin · GitHub

Ray changed object names to to ObjectRef and it looks like we missed a place or two in updating the name. Great catch!

Thanks for taking it up, I’ve created Modin fails to load csv from s3 with ray client · Issue #2688 · modin-project/modin · GitHub to track it there.