Ray dataset cannot read and parse image image dataset from S3

Hello I am trying to read JPG images from s3 and however getting below error. I am using
Ray 2.6.0
Pillow 9.5.0
pyarrow: 12.0.1
numpy: 1.21.6

My code:

import ray
from IPython.display import display


if ray.is_initialized():
    ray.shutdown()
ray.init()

print(ray.cluster_resources())
s3_uri = "s3://data-science/data-preparation/test"

ds = ray.data.read_images(s3_uri, include_paths=True)
display(ds)
ds.schema()

This is how images are stored on S3.

/usr/local/bin/python3.7 /Users/anup.rawka/IdeaProjects/data-science/raysdk/streaming_data_train.py
PIL: 9.5.0
2023-08-01 11:57:10,517	INFO worker.py:1621 -- Started a local Ray instance.
{'memory': 5961287680.0, 'CPU': 12.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 2147483648.0, 'node:__internal_head__': 1.0}
2023-08-01 11:57:16,159	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage]
2023-08-01 11:57:16,159	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-08-01 11:57:16,159	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ReadImage pid=72712) Task failed with retryable exception: TaskID(4bd1bc551048d2a5ffffffffffffffffffffffff01000000).
(ReadImage pid=72712) Traceback (most recent call last):
(ReadImage pid=72712)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(ReadImage pid=72712)   File "python/ray/_raylet.pyx", line 3667, in ray._raylet.CoreWorker.store_task_outputs
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(ReadImage pid=72712)     for b_out in fn(iter(blocks), ctx):
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read
(ReadImage pid=72712)     yield from read_task()
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
(ReadImage pid=72712)     for block in result:
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
(ReadImage pid=72712)     for data in read_stream(f, read_path, **reader_args):
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
(ReadImage pid=72712)     yield self._read_file(f, path, **reader_args)
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file
(ReadImage pid=72712)     image = Image.open(io.BytesIO(data))
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/PIL/Image.py", line 3298, in open
(ReadImage pid=72712)     raise UnidentifiedImageError(msg)
(ReadImage pid=72712) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x19c7437d0>
(ReadImage pid=72712) Task failed with retryable exception: TaskID(4bd1bc551048d2a5ffffffffffffffffffffffff01000000).
(ReadImage pid=72712) Traceback (most recent call last):
(ReadImage pid=72712)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(ReadImage pid=72712)   File "python/ray/_raylet.pyx", line 3667, in ray._raylet.CoreWorker.store_task_outputs
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(ReadImage pid=72712)     for b_out in fn(iter(blocks), ctx):
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read
(ReadImage pid=72712)     yield from read_task()
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
(ReadImage pid=72712)     for block in result:
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
(ReadImage pid=72712)     for data in read_stream(f, read_path, **reader_args):
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
(ReadImage pid=72712)     yield self._read_file(f, path, **reader_args)
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file
(ReadImage pid=72712)     image = Image.open(io.BytesIO(data))
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/PIL/Image.py", line 3298, in open
(ReadImage pid=72712)     raise UnidentifiedImageError(msg)
(ReadImage pid=72712) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x19ca14170>
(ReadImage pid=72712) Task failed with retryable exception: TaskID(4bd1bc551048d2a5ffffffffffffffffffffffff01000000).
(ReadImage pid=72712) Traceback (most recent call last):
(ReadImage pid=72712)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(ReadImage pid=72712)   File "python/ray/_raylet.pyx", line 3667, in ray._raylet.CoreWorker.store_task_outputs
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(ReadImage pid=72712)     for b_out in fn(iter(blocks), ctx):
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read
(ReadImage pid=72712)     yield from read_task()
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
(ReadImage pid=72712)     for block in result:
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
(ReadImage pid=72712)     for data in read_stream(f, read_path, **reader_args):
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
(ReadImage pid=72712)     yield self._read_file(f, path, **reader_args)
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file
(ReadImage pid=72712)     image = Image.open(io.BytesIO(data))
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/PIL/Image.py", line 3298, in open
(ReadImage pid=72712)     raise UnidentifiedImageError(msg)
(ReadImage pid=72712) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x19cbe7dd0>
(ReadImage pid=72712) Task failed with retryable exception: TaskID(4bd1bc551048d2a5ffffffffffffffffffffffff01000000). [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)
(ReadImage pid=72712) Traceback (most recent call last): [repeated 2x across cluster]
(ReadImage pid=72712)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs [repeated 2x across cluster]
(ReadImage pid=72712)   File "python/ray/_raylet.pyx", line 3667, in ray._raylet.CoreWorker.store_task_outputs [repeated 2x across cluster]
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task [repeated 2x across cluster]
(ReadImage pid=72712)     for b_out in fn(iter(blocks), ctx): [repeated 2x across cluster]
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read [repeated 2x across cluster]
(ReadImage pid=72712)     yield from read_task() [repeated 2x across cluster]
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 214, in __call__ [repeated 2x across cluster]
(ReadImage pid=72712)     for block in result: [repeated 2x across cluster]
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files [repeated 2x across cluster]
(ReadImage pid=72712)     for data in read_stream(f, read_path, **reader_args): [repeated 2x across cluster]
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream [repeated 2x across cluster]
(ReadImage pid=72712)     yield self._read_file(f, path, **reader_args) [repeated 2x across cluster]
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file [repeated 2x across cluster]
(ReadImage pid=72712)     image = Image.open(io.BytesIO(data)) [repeated 2x across cluster]
(ReadImage pid=72712)   File "/usr/local/lib/python3.7/site-packages/PIL/Image.py", line 3298, in open [repeated 2x across cluster]
(ReadImage pid=72712)     raise UnidentifiedImageError(msg) [repeated 2x across cluster]
(ReadImage pid=72712) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x19cbe7e90> [repeated 2x across cluster]
(ReadImage pid=72705)     builder.add(item)
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/delegating_block_builder.py", line 24, in add
(ReadImage pid=72705)     check.build()
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/table_block.py", line 118, in build
(ReadImage pid=72705)     tables = [self._table_from_pydict(columns)]
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/arrow_block.py", line 123, in _table_from_pydict
(ReadImage pid=72705)     columns[col_name] = ArrowTensorArray.from_numpy(col)
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/air/util/tensor_extensions/arrow.py", line 312, in from_numpy
(ReadImage pid=72705)     return ArrowVariableShapedTensorArray.from_numpy(arr)
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/air/util/tensor_extensions/arrow.py", line 717, in from_numpy
(ReadImage pid=72705)     pa_dtype = pa.from_numpy_dtype(dtype)
(ReadImage pid=72705)   File "pyarrow/types.pxi", line 4911, in pyarrow.lib.from_numpy_dtype
(ReadImage pid=72705)   File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
(ReadImage pid=72705) pyarrow.lib.ArrowNotImplementedError: Unsupported numpy type 17
Traceback (most recent call last):
  File "/Users/anup.rawka/IdeaProjects/data-science/raysdk/streaming_data_train.py", line 70, in <module>
    for batch in ds.iter_batches(batch_size=100, batch_format="numpy", prefetch_batches=10):
  File "/usr/local/lib/python3.7/site-packages/ray/data/iterator.py", line 200, in iter_batches
    prefetch_batches=prefetch_batches,
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 176, in iter_batches
    next_batch = next(async_batch_iter)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 289, in make_async_gen
    raise next_item
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 266, in execute_computation
    for item in fn(thread_safe_generator):
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 167, in _async_iter_batches
    yield from extract_data_from_batch(batch_iter)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
    for batch in batch_iter:
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 306, in restore_original_order
    for batch in batch_iter:
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 289, in make_async_gen
    raise next_item
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 266, in execute_computation
    for item in fn(thread_safe_generator):
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 218, in threadpool_computations_format_collate
    yield from formatted_batch_iter
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 158, in format_batches
    for batch in block_iter:
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 246, in __next__
    return next(self.it)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 117, in blocks_to_batches
    for block in block_iter:
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 54, in resolve_block_refs
    for block_ref in block_ref_iter:
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/iter_batches.py", line 271, in prefetch_batches_locally
    next_block_ref_and_metadata = next(block_ref_iter)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/block_batching/util.py", line 246, in __next__
    return next(self.it)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/legacy_compat.py", line 51, in execute_to_legacy_block_iterator
    for bundle in bundle_iter:
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/interfaces.py", line 548, in __next__
    return self.get_next()
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/streaming_executor.py", line 129, in get_next
    raise item
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/streaming_executor.py", line 187, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/streaming_executor.py", line 235, in _scheduling_loop_step
    process_completed_tasks(topology)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 333, in process_completed_tasks
    op.notify_work_completed(ref)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/operators/task_pool_map_operator.py", line 65, in notify_work_completed
    task.output = self._map_ref_to_ref_bundle(ref)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 357, in _map_ref_to_ref_bundle
    all_refs = list(ray.get(ref))
  File "/usr/local/lib/python3.7/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/ray/_private/worker.py", line 2493, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(UnidentifiedImageError): ray::ReadImage() (pid=72712, ip=127.0.0.1)
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
    for b_out in fn(iter(blocks), ctx):
  File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read
    yield from read_task()
  File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
    for block in result:
  File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
    for data in read_stream(f, read_path, **reader_args):
  File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
    yield self._read_file(f, path, **reader_args)
  File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file
    image = Image.open(io.BytesIO(data))
  File "/usr/local/lib/python3.7/site-packages/PIL/Image.py", line 3298, in open
    raise UnidentifiedImageError(msg)
PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x19cbe7e90>
(ReadImage pid=72705) Task failed with retryable exception: TaskID(82d7af811dbf3277ffffffffffffffffffffffff01000000).
(ReadImage pid=72705) Traceback (most recent call last):
(ReadImage pid=72705)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(ReadImage pid=72705)   File "python/ray/_raylet.pyx", line 3667, in ray._raylet.CoreWorker.store_task_outputs
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(ReadImage pid=72705)     for b_out in fn(iter(blocks), ctx):
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read
(ReadImage pid=72705)     yield from read_task()
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
(ReadImage pid=72705)     for block in result:
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
(ReadImage pid=72705)     for data in read_stream(f, read_path, **reader_args):
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
(ReadImage pid=72705)     yield self._read_file(f, path, **reader_args)
(ReadImage pid=72705)   File "/usr/local/lib/python3.7/site-packages/ray/data/datasource/image_datasource.py", line 98, in _read_file

Process finished with exit code 1

@anuprawka Can you read or open any image without ray_data.read_images() API. Curious.

cc: @bveeramani

Hi @Jules_Damji yes below code works fine,

import boto3
from io import BytesIO
import matplotlib.image as mpimg
import matplotlib.pyplot as plt

resource = boto3.resource('s3', region_name='us-east-1')
bucket = resource.Bucket('data-science-virginia')

image_object = bucket.Object('data-preparation/test/1000010.jpg')
image = mpimg.imread(BytesIO(image_object.get()['Body'].read()), 'jpg')

plt.figure(0)
plt.imshow(image)
plt.show()
![1000010|666x500](upload://f8GHV3MolydL1H2vKZhdD5dL5w9.jpeg)

All the images are from this dataset: laion/laion-high-resolution · Datasets at Hugging Face
Downloaded and uploaded to s3.

What happens if you use Pillow rather than mping to load the image? Does that work?

@bveeramani Below code using PIL works fine too

import PIL
import boto3
from PIL import Image
print(f"PIL: {PIL.__version__}")
import io

s3 = boto3.resource('s3', region_name='us-east-1')

def image_from_s3(bucket, key):
    bucket = s3.Bucket(bucket)
    image = bucket.Object(key)
    img_data = image.get().get('Body').read()
    return Image.open(io.BytesIO(img_data))

# call the function
image_from_s3("di2-data-science", "data-preparation/test/1000010.jpg").show()

Adding more context, so I downloaded some of the above mentioned hugging face dataset using below mentioned python script and uploaded to s3 and trying to read it back.

Hmm…that’s weird. I suspect that PIL is failing to open specific images.

@anuprawka Could you try using the latest version of Ray and re-running read_images? It should tell you the name of the files that error (see this PR).

Once we know which images are problematic, it’ll be easier for us to reproduce.

Hi @bveeramani sure I will try today.

@bveeramani thanks for following up.

Sorry for late response @bveeramani I ran below code with the latest version. I have added the error stack for it.

import ray
print(f"ray: {ray.__version__}")
import PIL
print(f"PIL: {PIL.__version__}")
import pyarrow
print(f"pyarrow: {pyarrow.__version__}")
import numpy as np
print(f"numpy: {np.__version__}")


if ray.is_initialized():
    ray.shutdown()
ray.init()

print(ray.cluster_resources())

s3_uri = "s3://data-science-virginia/data-preparation/test"

ds = ray.data.read_images(s3_uri, include_paths=True)

print(f"Number of rows after transformation: {ds.count()}")
ds.schema()

for batch in ds.iter_batches(batch_size=100, batch_format="numpy", prefetch_batches=10):
    # print(batch)
    print("Image shape", batch["image"].shape)
[ec2-user@ip-10-0-11-148 ~]$ python3 test.py 
ray: 2.6.2
PIL: 9.5.0
pyarrow: 12.0.1
numpy: 1.23.5
2023-08-14 04:35:35,944	INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 10.0.11.148:6379...
2023-08-14 04:35:35,988	INFO worker.py:1621 -- Connected to Ray cluster.
{'node:__internal_head__': 1.0, 'CPU': 32.0, 'object_store_memory': 39443417088.0, 'node:10.0.11.148': 1.0, 'memory': 82034639872.0}
Number of rows after transformation: 613
(_execute_read_task_split pid=7817) Task failed with retryable exception: TaskID(39088be3736e590affffffffffffffffffffffff02000000).
(_execute_read_task_split pid=7817) Traceback (most recent call last):
(_execute_read_task_split pid=7817)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(_execute_read_task_split pid=7817)   File "python/ray/_raylet.pyx", line 3681, in ray._raylet.CoreWorker.store_task_outputs
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/lazy_block_list.py", line 695, in _execute_read_task_split
(_execute_read_task_split pid=7817)     for block in blocks:
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
(_execute_read_task_split pid=7817)     for block in result:
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
(_execute_read_task_split pid=7817)     for data in read_stream(f, read_path, **reader_args):
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
(_execute_read_task_split pid=7817)     yield self._read_file(f, path, **reader_args)
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file
(_execute_read_task_split pid=7817)     image = Image.open(io.BytesIO(data))
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/PIL/Image.py", line 3298, in open
(_execute_read_task_split pid=7817)     raise UnidentifiedImageError(msg)
(_execute_read_task_split pid=7817) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x7ef9421beca0>
(_execute_read_task_split pid=7817) Task failed with retryable exception: TaskID(39088be3736e590affffffffffffffffffffffff02000000).
(_execute_read_task_split pid=7817) Traceback (most recent call last):
(_execute_read_task_split pid=7817)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(_execute_read_task_split pid=7817)   File "python/ray/_raylet.pyx", line 3681, in ray._raylet.CoreWorker.store_task_outputs
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/lazy_block_list.py", line 695, in _execute_read_task_split
(_execute_read_task_split pid=7817)     for block in blocks:
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
(_execute_read_task_split pid=7817)     for block in result:
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
(_execute_read_task_split pid=7817)     for data in read_stream(f, read_path, **reader_args):
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
(_execute_read_task_split pid=7817)     yield self._read_file(f, path, **reader_args)
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file
(_execute_read_task_split pid=7817)     image = Image.open(io.BytesIO(data))
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/PIL/Image.py", line 3298, in open
(_execute_read_task_split pid=7817)     raise UnidentifiedImageError(msg)
(_execute_read_task_split pid=7817) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x7ef8e0165850>
(_execute_read_task_split pid=7817) Task failed with retryable exception: TaskID(39088be3736e590affffffffffffffffffffffff02000000).
(_execute_read_task_split pid=7817) Traceback (most recent call last):
(_execute_read_task_split pid=7817)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(_execute_read_task_split pid=7817)   File "python/ray/_raylet.pyx", line 3681, in ray._raylet.CoreWorker.store_task_outputs
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/lazy_block_list.py", line 695, in _execute_read_task_split
(_execute_read_task_split pid=7817)     for block in blocks:
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
(_execute_read_task_split pid=7817)     for block in result:
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
(_execute_read_task_split pid=7817)     for data in read_stream(f, read_path, **reader_args):
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
(_execute_read_task_split pid=7817)     yield self._read_file(f, path, **reader_args)
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file
(_execute_read_task_split pid=7817)     image = Image.open(io.BytesIO(data))
(_execute_read_task_split pid=7817)   File "/opt/tensorflow/lib/python3.10/site-packages/PIL/Image.py", line 3298, in open
(_execute_read_task_split pid=7817)     raise UnidentifiedImageError(msg)
(_execute_read_task_split pid=7817) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x7ef8c81da700>
Traceback (most recent call last):
  File "/home/ec2-user/test.py", line 43, in <module>
    ds.schema()
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/dataset.py", line 2296, in schema
    base_schema = self._plan.schema(fetch_if_missing=False)
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 424, in schema
    self._schema = self._get_unified_blocks_schema(blocks, fetch_if_missing)
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 444, in _get_unified_blocks_schema
    blocks.ensure_metadata_for_first_block()
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/lazy_block_list.py", line 398, in ensure_metadata_for_first_block
    generator = ray.get(block_partition_ref)
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/_private/worker.py", line 2520, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(UnidentifiedImageError): ray::_execute_read_task_split() (pid=7817, ip=10.0.11.148)
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/lazy_block_list.py", line 695, in _execute_read_task_split
    for block in blocks:
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/datasource.py", line 214, in __call__
    for block in result:
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files
    for data in read_stream(f, read_path, **reader_args):
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream
    yield self._read_file(f, path, **reader_args)
  File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file
    image = Image.open(io.BytesIO(data))
  File "/opt/tensorflow/lib/python3.10/site-packages/PIL/Image.py", line 3298, in open
    raise UnidentifiedImageError(msg)

@bveeramani more details:
I also printed the path of s3 file for which we are getting the error and also attached the file.

import ray
print(f"ray: {ray.__version__}")
import PIL
print(f"PIL: {PIL.__version__}")
import pyarrow
print(f"pyarrow: {pyarrow.__version__}")
import numpy as np
print(f"numpy: {np.__version__}")


if ray.is_initialized():
    ray.shutdown()
ray.init()

print(ray.cluster_resources())

s3_uri = "s3://vd-dev-smarttvdata-di2-data-science-virginia/data-preparation/test"

ds = ray.data.read_images(s3_uri, include_paths=True)

print("Path: "+ds.take(1)[0]["path"])
print("Image: "+ ds.take(1)[0]["image"])

print(f"Number of rows after transformation: {ds.count()}")
ds.schema()

for batch in ds.iter_batches(batch_size=100, batch_format="numpy", prefetch_batches=10):
    # print(batch)
    print("S3 path: {}", batch["path"])
    print("Image shape", batch["image"].shape)
ray: 2.6.2
PIL: 9.5.0
pyarrow: 12.0.1
numpy: 1.23.5
2023-08-14 04:55:41,635 INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 10.0.11.148:6379...
2023-08-14 04:55:41,679 INFO worker.py:1621 -- Connected to Ray cluster.
{'CPU': 32.0, 'object_store_memory': 39443417088.0, 'node:__internal_head__': 1.0, 'node:10.0.11.148': 1.0, 'memory': 82034639872.0}
2023-08-14 04:55:43,277 INFO dataset.py:2180 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-08-14 04:55:43,278 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage]
2023-08-14 04:55:43,278 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-08-14 04:55:43,278 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ReadImage pid=18859) Task failed with retryable exception: TaskID(28e8e8a8dec79d58ffffffffffffffffffffffff08000000).                                                                                                             
(ReadImage pid=18859) Traceback (most recent call last):                                                                                                                                                                          
(ReadImage pid=18859)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs                                                                                                 
(ReadImage pid=18859)   File "python/ray/_raylet.pyx", line 3681, in ray._raylet.CoreWorker.store_task_outputs                                                                                                                    
(ReadImage pid=18859)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task                                                                        
(ReadImage pid=18859)     for b_out in fn(iter(blocks), ctx):                                                                                                                                                                     
(ReadImage pid=18859)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read                                                                                       
(ReadImage pid=18859)     yield from read_task()                                                                                                                                                                                  
(ReadImage pid=18859)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/datasource.py", line 214, in __call__                                                                                              
(ReadImage pid=18859)     for block in result:                                                                                                                                                                                    
(ReadImage pid=18859)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files                                                                                 
(ReadImage pid=18859)     for data in read_stream(f, read_path, **reader_args):                                                                                                                                                   
(ReadImage pid=18859)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream                                                                               
(ReadImage pid=18859)     yield self._read_file(f, path, **reader_args)                                                                                                                                                           
(ReadImage pid=18859)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file                                                                                       
(ReadImage pid=18859)     image = Image.open(io.BytesIO(data))                                                                                                                                                                    
(ReadImage pid=18859)   File "/opt/tensorflow/lib/python3.10/site-packages/PIL/Image.py", line 3298, in open                                                                                                                      
(ReadImage pid=18859)     raise UnidentifiedImageError(msg)                                                                                                                                                                       
(ReadImage pid=18859) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x7f0c881e3e20>                                                                                                               
Path: di2-data-science-virginia/data-preparation/test/1000005.jpg                                                                                                                                              

2023-08-14 04:55:45,612 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage]
2023-08-14 04:55:45,612 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-08-14 04:55:45,612 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-08-14 04:55:46,371 WARNING worker.py:2033 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: e692f617874353bda0285ee39cb769af2861763d08000000 Worker ID: 793b2958d37fbd18325385694c7ca3238283132ec924704cea171298 Node ID: 88f9e34e9365dee872c6aee68821ccb0cbd3f2f223700f49a5ed936e Worker IP address: 10.0.11.148 Worker port: 10079 Worker PID: 18972 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
Traceback (most recent call last):
  File "/home/ec2-user/test.py", line 43, in <module>
    print("Image: "+ ds.take(1)[0]["image"])
numpy.core._exceptions._UFuncNoLoopError: ufunc 'add' did not contain a loop with signature matching types (dtype('<U7'), dtype('uint8')) -> None
(ReadImage pid=18960) Task failed with retryable exception: TaskID(c113a36f8234970fffffffffffffffffffffffff08000000). [repeated 4x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)
(ReadImage pid=18960) Traceback (most recent call last): [repeated 4x across cluster]
(ReadImage pid=18960)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs [repeated 4x across cluster]
(ReadImage pid=18960)   File "python/ray/_raylet.pyx", line 3681, in ray._raylet.CoreWorker.store_task_outputs [repeated 4x across cluster]
(ReadImage pid=18960)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task [repeated 4x across cluster]
(ReadImage pid=18960)     for b_out in fn(iter(blocks), ctx): [repeated 4x across cluster]
(ReadImage pid=18960)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read [repeated 4x across cluster]
(ReadImage pid=18960)     yield from read_task() [repeated 4x across cluster]
(ReadImage pid=18960)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/datasource.py", line 214, in __call__ [repeated 4x across cluster]
(ReadImage pid=18960)     for block in result: [repeated 4x across cluster]
(ReadImage pid=18960)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 478, in read_files [repeated 4x across cluster]
(ReadImage pid=18960)     for data in read_stream(f, read_path, **reader_args): [repeated 4x across cluster]
(ReadImage pid=18960)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 242, in _read_stream [repeated 4x across cluster]
(ReadImage pid=18960)     yield self._read_file(f, path, **reader_args) [repeated 4x across cluster]
(ReadImage pid=18960)   File "/opt/tensorflow/lib/python3.10/site-packages/ray/data/datasource/image_datasource.py", line 85, in _read_file [repeated 4x across cluster]
(ReadImage pid=18960)     image = Image.open(io.BytesIO(data)) [repeated 4x across cluster]
(ReadImage pid=18960)   File "/opt/tensorflow/lib/python3.10/site-packages/PIL/Image.py", line 3298, in open [repeated 4x across cluster]
(ReadImage pid=18960)     raise UnidentifiedImageError(msg) [repeated 4x across cluster]
(ReadImage pid=18960) PIL.UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x7f23ade1bdd0> [repeated 4x across cluster]
[ec2-user@ip-10-0-11-148 ~]$ 

Ah, sorry. I meant could you use the nightly version of Ray?

I tried loading the image locally and it works fine, so I suspect that the attached image isn’t the problematic one. When you call ds.take(1)[0], Ray Data might load more than one image.

1 Like

I ran with the latest nightly release and below is the error:

ray: 3.0.0.dev0
PIL: 9.5.0
pyarrow: 12.0.1
numpy: 1.21.6
2023-08-14 21:25:29,840	INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 10.0.11.171:6379...
2023-08-14 21:25:29,915	INFO worker.py:1618 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265 
{'object_store_memory': 1096546713.0, 'node:10.0.11.171': 1.0, 'memory': 2193093428.0, 'CPU': 2.0, 'node:__internal_head__': 1.0}
2023-08-14 21:25:31,137	INFO dataset.py:2356 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-08-14 21:25:31,138	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage]
2023-08-14 21:25:31,138	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-08-14 21:25:31,139	INFO streaming_executor.py:97 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[dataset]: Run `pip install tqdm` to enable progress reporting.
Path: di2-data-science/data-preparation/test/1000005.jpg
2023-08-14 21:25:33,752	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage]
2023-08-14 21:25:33,752	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-08-14 21:25:33,753	INFO streaming_executor.py:97 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
Traceback (most recent call last):
  File "test.py", line 22, in <module>
    print("Image: "+ ds.take(1)[0]["image"])
numpy.core._exceptions.UFuncTypeError: ufunc 'add' did not contain a loop with signature matching types (dtype('<U7'), dtype('uint8')) -> None
[ec2-user@ip-10-0-11-171 ~]$