Filter and Pivot Large Datasets on S3 by Symbol

I have a large number of market quote data files on S3 with one file per minute of the trading day, for each trading day of the year, adding up to about 100K files. Each file is a .csv.gz, about 100MB in size, that contains data for all stocks (around 5000) for that minute.

I want to filter down the data in each file by stock for a set of 50-100 stocks of interest, and then group the stock data across all minutes in a given day by stock instead of by minute as it currently is. For example, right now the data might look like:

DAY 1 (directory)
File_Minute1: [(TSLA Minute 1) (AAPL Minute 1) … (MSFT Minute 1)] (5000 stocks)
File_Minute2: [(TSLA Minute 2) (AAPL Minute 2) … (MSFT Minute 2)]

File_Minute390: [(TSLA Minute 390) (AAPL Minute 390) … (MSFT Minute 390)]

I would like to pivot it to:

DAY 1 (directory)
File_TSLA: [(TSLA Minute 1) (TSLA Minute 1) … (TSLA Minute 390)]
File_AAPL: [(AAPL Minute 1) (AAPL Minute 2) … (AAPL Minute 390)]

File_MSFT: [(MSFT Minute 390) (MSFT Minute 390) … (MSFT Minute 390)]

Thanks!

A screenshot of a pared down sample file (doesn’t actually have the ellipsis but that represents missing rows) is shown below. There are also more columns to the right past Column S.

Hi @bonnyjain, thanks for the question! This is nearly doable in Datasets with something like

class TickerBlockWritePathProvider(BlockWritePathProvider):
    def _get_write_path_for_block(
        self,
        base_path: str,
        *,
        filesystem: Optional["pyarrow.fs.FileSystem"] = None,
        dataset_uuid: Optional[str] = None,
        block: Optional[ObjectRef[Block]] = None,
        block_index: Optional[int] = None,
        file_format: Optional[str] = None,
    ) -> str:
        block = ray.get(block)
        return f"{block["ticker"]}.csv"

stocks_of_interest = ["TSLA", "AAPL", ..., "MSFT"]
ds = ray.data.read_csv("s3://your/bucket") \
    .filter(lambda row: row["ticker"] in stocks_of_interest) \
    .groupby("ticker") \
    .map_groups(lambda key, df: df.assign(ticker=key), block_per_group=True)
    .write_csv("s3://your/new/bucket", block_path_provider=TickerBlockWritePathProvider())

The only thing that Datasets is missing here is that .map_groups() hasn’t been implemented yet (currently slated for being implemented in the next week or so), and we weren’t planning on .map_groups() guaranteeing a block per group key (stock/ticker, in your case), so that would be a new requirement for that API. I think that we could expose something like .map_groups(..., block_per_group=True), which we should be able to support.

This is a very useful use case for us to hear about, right before we’re about to implement group mapping! Could you bump the open feature request for .map_groups() and leave a comment about your use case?

Following up here, the canonical way to do this would be to use ds.groupby(key).map_groups(write_file), and have a function that writes to the file in each group. Something like this should work:

def write_file(records):
   write_to_disk(records)
   return []

ds.groupby("company").map_groups(write_file)