Issue in Ray dataset sharding

High: It blocks me to complete my task.

I am trying to train a Tensorflow model using Ray train. I want to use Ray dataset sharding to implement Distributed Data Ingest with Ray Datasets.

My Training data is in following format.
X_train is 3-dimensional data numpy array of shape (512, 10, 200) and y_train is target variable numpy array of shape (512,)

I have referred ray documentations (link1 , link2 (Tensorflow)) and refactored the code for Ray dataset sharding as below to generate Dataset generator object suitable for my use-case.

def train_func(config):
    CONF, lstm_params= config.values()
     
    start_epoch = 0
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
      model = lstm_model()  # returns a sequential model

      checkpoint = session.get_checkpoint()
      if checkpoint:
        # assume that we have run the session.report() example
        # and successfully save some model weights
        checkpoint_dict = checkpoint.to_dict()
        model.set_weights(checkpoint_dict.get("model_weights"))
        start_epoch = checkpoint_dict.get("epoch", -1) + 1

      model.compile(loss=CONF['lstm_params']['loss'], 
            optimizer=CONF['lstm_params']['optimizer'], 
            metrics=le(CONF['lstm_params']['metrics']))
    
    dataset_shard = session.get_dataset_shard("train")

    def to_tf_dataset(dataset, batch_size):
      def to_tensor_iterator():
        for batch in dataset.iter_tf_batches(batch_size=batch_size, dtypes=tf.float32):
          yield tf.expand_dims(batch["x"], 10, 200), batch["y"]
      
      output_signature = (
      tf.TensorSpec(shape=(None,10,200), dtype=tf.float32),
      tf.TensorSpec(shape=(None,), dtype=tf.float32),)

      tf_dataset = tf.data.Dataset.from_generator(
          to_tensor_iterator, output_signature=output_signature
      )
      return prepare_dataset_shard(tf_dataset)

    for epoch in range(start_epoch, lstm_params['epochs']):
      tf_dataset = to_tf_dataset(dataset=dataset_shard, batch_size= 64)

      model.fit(tf_dataset, shuffle=True)

      checkpoint = Checkpoint.from_dict(dict(epoch=epoch, model_weights=model.get_weights()))
      session.report({}, checkpoint=checkpoint)


config = {...} # hyperparameters for LSTM model 

checkpoint_config = CheckpointConfig(checkpoint_score_attribute="accuracy", checkpoint_score_order="max")

X_train, y_train = read_input_data()  #function returns training data numpy arrays

dataset = ray.data.from_items([{"x": X_train[index,:,:], "y": y_train[index]} for index in range(y_train.shape[0])])

trainer = TensorflowTrainer(train_func, train_loop_config=config,
                            scaling_config=ScalingConfig(num_workers=2),
                            run_config=RunConfig(checkpoint_config=checkpoint_config),
                            datasets={"train": dataset})
result = trainer.fit()

But when I pass this dataset shard object to model.fit(), I couldn’t get desired execution
The model gets train on None Steps and then the execution fails. Please refer below output logs.

(RayTrainWorker pid=8089) Train on None steps
(RayTrainWorker pid=8088) Train on None steps

can you please help me understand what am I missing here and point out the mistake if any?
Also it would be helpful if you can provide some documentation links which can be referred for this.

Hi Team,

I am still waiting for the response. This problem is a blocker for me.
Can you please guide me to resolve this ?
Also, Please let me know if any further elaboration of the issue is required.

Hi @suraj-gade , sorry for the late response. Can you show me two additional info:

  1. After dataset = ray.data.from_items([{"x": X_train[index,:,:], "y": y_train[index]} for index in range(y_train.shape[0])]) , what sample data look like when you do dataset.take(1) ?
  2. In your tensor iterator for batch in dataset.iter_tf_batches(batch_size=batch_size, dtypes=tf.float32): , what’s the type, size and shape of batch ?

Hi @suraj-gade,

I ran your code with some dummy data and it seems like the error is in this logic tf.expand_dims(batch["x"], 10, 200). I replaced it with just batch["x"] (and you could also do tf.reshape(batch["x"], (-1, 10, 200)) if you need to reshape) and it seems to be working fine.

Let me know if this fixes it for you, too. Definitely interested in hearing more about your debugging experience and what you think should be improved the most.

Hi @Jiao_Dong ,
Thanks for your response.
Providing the info you asked for below.

  1. what sample data look like when you do dataset.take(1) ?:
    Ans:
    image

  2. In your tensor iterator for batch in dataset.iter_tf_batches(batch_size=batch_size, dtypes=tf.float32): , what’s the type, size and shape of batch ?
    Ans:
    The batch inside tensor iterator is a dictionary with 2 items as below

batch =  {'x': <tf.Tensor 'Const_6:0' shape=(64, 10, 200) dtype=float32>, 'y': <tf.Tensor 'Const_7:0' shape=(64,) dtype=float32>}
Type of batch : <class 'dict'>
shape of batch : 2

I hope this answers your questions.

Hi @justinvyu ,
Thanks for your response.
I tried your suggestion but it didn’t resolve the issue for me

@suraj-gade what you if you just replace

yield tf.expand_dims(batch["x"], 10, 200), batch["y"]
with
batch["x"], batch["y"]
?

In our data’s generator loop you already had a dictionary as

batch =  {
   'x': <tf.Tensor 'Const_6:0' shape=(64, 10, 200) dtype=float32>, 
   'y': <tf.Tensor 'Const_7:0' shape=(64,) dtype=float32>
}

that has right data type as well as outer dimension of 64 that represents batch_size which matches on your input of to_tf_dataset(dataset=dataset_shard, batch_size= 64), these should match your output signature of

x -> shape=(None,10,200), dtype=tf.float32
y -> shape=(None,), dtype=tf.float32

where None represents batch dimension. No extra expand_dims needed.

Hi @Jiao_Dong ,
I tried your suggestion but got same error again.

Train on None steps
(RayTrainWorker pid=1323) Train on None steps
(RayTrainWorker pid=1323) Error From /job:worker/replica:0/task:1:
(RayTrainWorker pid=1323) TypeError: 'NoneType' object is not iterable
(RayTrainWorker pid=1323) Traceback (most recent call last):
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 851, in get_iterator
(RayTrainWorker pid=1323)     return self._iterators[iterator_id]
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323) KeyError: 0
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323) During handling of the above exception, another exception occurred:
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323) Traceback (most recent call last):
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/ops/script_ops.py", line 271, in __call__
(RayTrainWorker pid=1323)     ret = func(*args)
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/autograph/impl/api.py", line 642, in wrapper
(RayTrainWorker pid=1323)     return func(*args, **kwargs)
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 1035, in generator_py_func
(RayTrainWorker pid=1323)     values = next(generator_state.get_iterator(iterator_id))
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 853, in get_iterator
(RayTrainWorker pid=1323)     iterator = iter(self._generator(*self._args.pop(iterator_id)))
(RayTrainWorker pid=1323) 
(RayTrainWorker pid=1323) TypeError: 'NoneType' object is not iterable

Hi @Jiao_Dong ,

Sharing my development notebook for this issue here Notebook

For better visibility of the issue, You can run it and reproduce the error.

Please let me know, if you get any findings.

Thanks.

Thanks for providing the notebook, this is much easier to debug and I can reproduce what you’re seeing. Let me hack around your workload a bit and get back to you later today.

@suraj-gade ok i got it working now on your Colab notebook.

So the error is thrown from Ray dataset iterator, in your case you’ll need

dataset_shard = session.get_dataset_shard("train")
train_dataset_iterator = dataset_shard.iter_epochs()

Then in training loop access current training_dataset based on the dataset iterator:

for epoch in range(start_epoch, lstm_params['epochs']):
  print(f"Starting epoch {epoch}. Current memory usage - {psutil.virtual_memory()[2]}%")
  train_dataset = next(train_dataset_iterator)
  tf_dataset = to_tf_dataset(dataset=train_dataset, batch_size=lstm_params['batch_size'])

Full working script snippet is

dataset_shard = session.get_dataset_shard("train")
train_dataset_iterator = dataset_shard.iter_epochs()

def to_tf_dataset(dataset, batch_size):
  def to_tensor_iterator():
    for batch in dataset.iter_tf_batches(batch_size=batch_size, dtypes=tf.float32):
      print(f"x: {batch['x'].shape}")
      print(f"y: {batch['y'].shape}")
      batch["x"], batch["y"]
  
  output_signature = (
  tf.TensorSpec(shape=(None,10,1016),dtype=tf.float32),
  tf.TensorSpec(shape=(None,),dtype=tf.float32))

  tf_dataset = tf.data.Dataset.from_generator(
      to_tensor_iterator, output_signature=output_signature
  )
  return prepare_dataset_shard(tf_dataset)

results = []
for epoch in range(start_epoch, lstm_params['epochs']):
  print(f"Starting epoch {epoch}. Current memory usage - {psutil.virtual_memory()[2]}%")
  train_dataset = next(train_dataset_iterator)
  tf_dataset = to_tf_dataset(dataset=train_dataset, batch_size=lstm_params['batch_size'])

  print("******tf_dataset:  ", type(tf_dataset),"\n", tf_dataset,"\n**********************")

  history = model.fit(tf_dataset, shuffle=lstm_params['shuffle'])

Hi @Jiao_Dong

Thank you for your reply !!!

I tried your solution using train_dataset_iterator = dataset_shard.iter_epochs() , but I am getting below error.

Error 'Dataset' object has no attribute 'iter_epochs'

I think the iter_epochs() is a method of Ray Dataset_Pipeline, hence I tried with

train_dataset_iterator = dataset_shard.repeat().iter_epochs()

But, with that also, I am getting below error.

(RayTrainWorker pid=1836) 2022-10-13 05:48:11.099279: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
(RayTrainWorker pid=1837) 2022-10-13 05:48:11.102215: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
(RayTrainWorker pid=1836) Error Graph execution error:
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836) TypeError: iter_tf_batches() got an unexpected keyword argument 'dtypes'
(RayTrainWorker pid=1836) Traceback (most recent call last):
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 846, in get_iterator
(RayTrainWorker pid=1836)     return self._iterators[iterator_id]
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836) KeyError: 0
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836) During handling of the above exception, another exception occurred:
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836) Traceback (most recent call last):
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/ops/script_ops.py", line 270, in __call__
(RayTrainWorker pid=1836)     ret = func(*args)
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/autograph/impl/api.py", line 642, in wrapper
(RayTrainWorker pid=1836)     return func(*args, **kwargs)
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 1030, in generator_py_func
(RayTrainWorker pid=1836)     values = next(generator_state.get_iterator(iterator_id))
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836)   File "/usr/local/lib/python3.7/dist-packages/tensorflow/python/data/ops/dataset_ops.py", line 848, in get_iterator
(RayTrainWorker pid=1836)     iterator = iter(self._generator(*self._args.pop(iterator_id)))
(RayTrainWorker pid=1836) 
(RayTrainWorker pid=1836)   File "<ipython-input-7-4cccbfb874d9>", line 31, in to_tensor_iterator

Please have a look, Let me know if am missing something.
You can refer the Notebook

@suraj-gade I just realized i made a simple mistake of forgetting to yield output from each batch … iter_epoches is not the key here, essentially all we need is to

simply return yield batch["x"], batch["y"] since the batch we get already have right dimension, as opposed to calling another expand_dims.

You can see the full working gist in lstm_with_ray_train.ipynb · GitHub