Problem regarding doing a vector search on a Lance Dataset generated using ray.data.Dataset.write_lance

1. Severity of the issue:
High: Completely blocks me.

2. Environment:

  • Ray version: 2.48.0
  • Python version: 3.10.12
  • OS: Ubuntu 22.04.5 LTS
  • Other libs/tools (if relevant): pylance 0.32.0

3. What happened vs. what you expected:
I’m currently using Lance to test its vector search ability and just encountered a problem. Hopefully I can get some help here.

I am using this dataset of my own which is a large collection of essays and articles and consists of the following columns:

  • id, the id of the article
  • title, the title of the article
  • abstract, the abstract of the article
  • abstract_embedding, embedding representation of the abstract, generated using an embedding model and has a dimension of 4096 and data type of float32
  • some other irrelevant columns

The steps are as follows:

  1. Use Ray Data API and the QWen embedding model to generate embeddings and write the result data as parquet files for intermediate storage. The embeddings generated are torch.Tensor and Ray treats them as numpy.ndarray.
  2. Read the data back from the parquet files and write them out to a lance dataset using ray.data.Dataset.write_lance:
import ray

ds = ray.data.read_parquet("/path/to/parquet-files")
ds.write_lance("/path/to/lance-dataset.lance",
                mode="create",
                max_rows_per_file=20000)
  1. Use Lance API to perform vector search (No index involved yet):
import lance
import time
import pyarrow as pa

# embedding of 4096 dimensions generated using the same QWen model
embedding = [0.005702990107238293, -0.012437371537089348, 0.033975258469581604, -0.0012740722158923745, ......]

ds = lance.dataset("/path/to/lance-dataset.lance")

start = time.time()
tbl = ds.to_table(columns=["title", "abstract"], 
                    nearest={
                        "column": "abstract_embedding",
                        "q": embedding,
                        "k": 10,
                        "metric": "cosine",
                        "minimum_nprobes": 20,
                        "maximum_nprobes": 50,
                        "refine_factor": 1
                    },
                    scan_in_order=False,
                )
end = time.time()
print(f"Time(sec): {end-start}")
print(tbl.to_pandas())

The first 2 steps completed successfully and when I was doing the 3rd step I got the following exception:

Traceback (most recent call last):
  File "/....../read_lance.py", line 10, in <module>
    tbl = ds.to_table(columns=["title", "abstract"], 
  File "/usr/local/lib/python3.10/dist-packages/lance/dataset.py", line 741, in to_table
    return self.scanner(
  File "/usr/local/lib/python3.10/dist-packages/lance/dataset.py", line 593, in scanner
    builder = builder.nearest(**nearest)
  File "/usr/local/lib/python3.10/dist-packages/lance/dataset.py", line 3731, in nearest
    raise TypeError(
TypeError: Query column abstract_embedding must be a vector. Got large_list<item: float>.

I tried to enforce the schema when writing the lance dataset using Ray as the following:

import ray

ds = ray.data.read_parquet("/path/to/parquet-files")
ds.write_lance("/path/to/lance-dataset.lance",
                mode="create",
                schema=pa.schema([
                    pa.field("id", pa.string()),
                    pa.field("abstract", pa.string()),
                    pa.field("title", pa.string()),
                    pa.field("abstract_embedding", pa.list_(pa.float32()))
                ]),
                max_rows_per_file=20000)

but I got the following exception:

ray.exceptions.RayTaskError(OSError): ray::Write() (pid=28210, ip=172.20.40.0)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/operators/map_transformer.py", line 377, in __call__
    yield from self._block_fn(input, ctx)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/planner/plan_write_op.py", line 68, in fn
    block_accessors = [BlockAccessor.for_block(block) for block in blocks]
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/planner/plan_write_op.py", line 68, in <listcomp>
    block_accessors = [BlockAccessor.for_block(block) for block in blocks]
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/execution/operators/map_transformer.py", line 377, in __call__
    yield from self._block_fn(input, ctx)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/planner/plan_write_op.py", line 48, in fn
    ctx.kwargs["_datasink_write_return"] = datasink_or_legacy_datasource.write(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/datasource/lance_datasink.py", line 227, in write
    fragments_and_schema = _write_fragment(
  File "/usr/local/lib/python3.10/dist-packages/ray/data/_internal/datasource/lance_datasink.py", line 62, in _write_fragment
    fragments = write_fragments(
  File "/usr/local/lib/python3.10/dist-packages/lance/fragment.py", line 836, in write_fragments
    return function(
OSError: Append with different schema: `abstract_embedding` should have type large_list but type was list, location: /home/runner/work/lance/lance/rust/lance-core/src/datatypes/schema.rs:149:27
ray.data.exceptions.SystemException