Write Parquet adds new column value

Hi,

So I’m saving a dataset in parquet using the .write_parquet(), however, when I load the resultant parquet file I’m getting one column “value” that has my columns as dictionaries. How can I save without this value col?

Hi @ssamdav what’s the dataset look like? What the schema does it have? It’d be even better to post a repro script for the issue.

When I do .take(2) I receive a list with dictionaries of the type:
{'key1': str, 'key2': string}

So probably the problem is that the dataset isn’t recognize the keys as being columns.

Dataset(num_blocks=7, num_rows=2, schema=<class 'dict'>)>

Do you know how can I solve this?

Even if I process the dataset using pandas to have the columns outside the value column when I try to open it using ray I get the following error:

pyarrow.lib.ArrowInvalid: Schema at index 0 was different: 

vs
factory: string
flow: string

So if I understand correctly, this is the schema before you wrote it out to parquet? In this case, you read it back and it remains as dict typed column, which is expected.

So I’m facing two problems:

  • the first is that I define each datapoint as a dictionary and when a save it I get the schema with column value containing the datapoints.
  • the second is that when I save it I can’t load without providing the schema. If I don’t provide the schema I get the previous error.

@ssamdav Is it possible for you to provide a simple code snippet that we can run and reproduce your error? It doesn’t have to be a huge data set. Fake data reproducible the problem would be sufficient.

Thanks, HTH,
jules

So the first problem can be produced in this snippet

def func(batch):
    return [{'a': d*2, 'b': d**4} for d in batch]

ds = ray.data.range(1000)
ds = ds.map_batches(func, zero_copy_batch=True)
ds.write_parquet(DATA_FOLDER / 'test.parquet')

dataset = ray.data.read_parquet(DATA_FOLDER / 'test.parquet')
result = dataset.take(1)

Where the result is equal to

[{'value': {'a': 0, 'b': 0}}]

However, I was expecting to have

[{'a': 0, 'b': 0}]

@ssamdav I slightly changed the code to create a Pandas dataFrame without doing map_batches

import ray
import pandas as pd

DATA_FOLDER = "fake_data"
OUTFILE = f"{DATA_FOLDER} / 'test.parquet_2'"

def func(batch):
    df = pd.DataFrame([{'a': d*2, 'b': d**4} for d in range(batch)])
    return df

ds = ray.data.from_pandas(func(1000))
print(ds.take(2))
print(ds.schema)

# write to disk 
ds.write_parquet(OUTFILE)
dataset = ray.data.read_parquet(OUTFILE)
result = dataset.take(2)
print(result)

2023-04-14 09:57:52,110 INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8266 
[{'a': 0, 'b': 0}, {'a': 2, 'b': 1}]
<bound method Dataset.schema of Dataset(num_blocks=1, num_rows=1000, schema={a: int64, b: int64})>
Write Progress: 100%|███████████████████████████████████████████| 1/1 [00:00<00:00,  4.39it/s]
Parquet Files Sample:   0%|          | 0/2 [00:00<?, ?it/s]
2023-04-14 09:57:54,558 WARNING read_api.py:330 -- ⚠️  The number of blocks in this dataset (2) limits its parallelism to 2 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
[{'a': 0, 'b': 0}, {'a': 2, 'b': 1}]
Parquet Files Sample: 100%|██████████| 2/2 [00:00<00:00,  8.85it/s]

Followed by taking your code snippet and returning a Pandas DF rather than a dictionary seems to preserve the schema.

mport ray
import pandas as pd

DATA_FOLDER = "fake_data"
OUTFILE = f"{DATA_FOLDER} / 'test.parquet'"

def func(batch):
    df = pd.DataFrame([{'a': d*2, 'b': d**4} for d in batch])
    return df

ds = ray.data.range(1000)
ds = ds.map_batches(func, zero_copy_batch=True)
print(ds.take(2))
print(ds.schema)

# write to disk 
ds.write_parquet(OUTFILE)
dataset = ray.data.read_parquet(OUTFILE)
result = dataset.take(2)
print(result)
print(dataset.schema)

2023-04-14 10:03:20,476 INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8266 
2023-04-14 10:03:22,359 INFO bulk_executor.py:39 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[read->MapBatches(func)]
read->MapBatches(func): 100%|█████████████████████████████████| 20/20 [00:00<00:00, 37.77it/s]
[{'a': 0, 'b': 0}, {'a': 2, 'b': 1}]
<bound method Dataset.schema of Dataset(num_blocks=20, num_rows=1000, schema={a: int64, b: int64})>
Write Progress: 100%|███████████████████████████████████████| 20/20 [00:00<00:00, 1152.00it/s]
2023-04-14 10:03:22,996 WARNING plan.py:523 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune
2023-04-14 10:03:22,999 WARNING plan.py:523 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune
[{'a': 0, 'b': 0}, {'a': 2, 'b': 1}]
<bound method Dataset.schema of Dataset(num_blocks=20, num_rows=12000, schema={a: int64, b: int64})>
Metadata Fetch Progress: 100%|██████████| 40/40 [00:00<00:00, 1427.00it/s]
Parquet Files Sample: 100%|██████████| 2/2 [00:00<00:00, 1085.62it/s]

cc: @jjyao any comments here?

I think I found the error that I was having when loading the parquet.
In my pipeline I have a filter and I suppose that one data block was totally filtered so when I saved and try to load it the parquet file didn’t have any value.

The way I test it was by calling the repartition before saving, and the issue stopped.

1 Like

@ssamdav Excellent. Can we then close this issue?

Yes! Thanks for the help!