Ray Job Progress Tracker

Ray Job Progress Tracker

Hello Team, I would like to build a distributed Ray Job progress tracker to keep track of how many records processed so far, how many successful, how many failed and then show it on the console as the Ray job makes progress.

I have implemented the following solution and would love to get you review and feedback on the implementation. Is there a better, simpler way to do it via the Ray APIs ?

1. Console logging with ProgressReporter (progress_tracker.py)

A ProgressReporter background thread polls the ProgressTracker actor at a configurable interval and logs progress to the console. The actor tracks counts internally; the reporter reads them via get_counts().

Track success/failure counts and throughput across distributed Ray Data map_batches workers.

Architecture

┌─────────────────────┐       increment()         ┌──────────────────────┐
│  map_batches worker │ ──── fire-and-forget ──►  │                      │
├─────────────────────┤                           │   ProgressTracker    │
│  map_batches worker │ ──── fire-and-forget ──►  │   (Ray actor)        │
├─────────────────────┤                           │                      │
│  map_batches worker │ ──── fire-and-forget ──►  │   success, failure,  │
└─────────────────────┘                           │   timestamps         │
                                                  └──────────┬───────────┘
                                                             │
                                                  get_counts() every N sec
                                                             │
                                                             v
                                                  ┌──────────────────────┐
                                                  │  ProgressReporter    │
                                                  │  (background thread) │
                                                  │  logs to console     │
                                                  └──────────────────────┘

There are three components:

  • ProgressTracker — a Ray actor that accumulates success/failure counters. Workers call increment() as fire-and-forget (tracker.increment.remote()), so processing is never blocked by progress bookkeeping.

  • ProgressReporter — a local background thread that polls the tracker at a configurable interval and logs progress (count, percentage, throughput, elapsed time).

  • process_batch — a map_batches-compatible function that processes each batch and reports results back to the tracker.

Design Decisions

Concurrency groups for reads and writes

By default, Ray actors process method calls one at a time in FIFO order. When map_batches workers flood the actor with increment() calls, a get_counts() call from the reporter gets queued behind them and cannot be served until the backlog clears. This starves the reporter — it only sees results at the very end.

A naive fix like max_concurrency=2 gives the actor a thread pool of 2 threads, but Ray makes no guarantee about which methods land on which threads. Two increment() calls could run simultaneously, and since += is a read-modify-write across multiple bytecodes, concurrent increments would lose updates.

The solution is Ray’s concurrency groups, which pin methods to dedicated thread pools:

@ray.remote(concurrency_groups={"writes": 1, "reads": 1})
class ProgressTracker:
    @ray.method(concurrency_group="writes")
    def increment(self, ...): ...

    @ray.method(concurrency_group="reads")
    def get_counts(self): ...
  • “writes” group (1 thread) — all increment() calls from all workers are serialized through a single thread. No matter how many workers call increment.remote() concurrently, the calls queue within this group and execute one at a time. No updates are lost.
  • “reads” group (1 thread)get_counts() runs on its own dedicated thread, completely independent of the writes queue. The reporter is never starved.

The two groups run concurrently (separate thread pools), so reads and writes overlap freely. But writes among themselves are always sequential — exactly the guarantee we need.

Logging

Ray Data prints its own progress bars and status lines to the console. Using Python logging outputs on separate lines and coexists cleanly with Ray’s console output.

Implementation

import ray
import time
import logging
import threading
from typing import Dict, Any, Optional

logger = logging.getLogger(__name__)


@ray.remote(concurrency_groups={"writes": 1, "reads": 1})
class ProgressTracker:
    """Actor to track success/failure counts across distributed workers.

    Uses concurrency groups to run increment() and get_counts() on separate
    threads. All increment() calls are serialized in the "writes" group so
    no updates are lost. get_counts() runs in the "reads" group so it is
    never starved behind queued writes.
    Ref: https://docs.ray.io/en/latest/ray-core/actors/concurrency_group_api.html
    """

    def __init__(self):
        self.success = 0
        self.failure = 0
        self.start_time = time.time()
        self.last_update_time = self.start_time

    @ray.method(concurrency_group="writes")
    def increment(self, success: int = 0, failure: int = 0):
        self.success += success
        self.failure += failure
        self.last_update_time = time.time()

    @ray.method(concurrency_group="reads")
    def get_counts(self) -> Dict[str, Any]:
        elapsed = self.last_update_time - self.start_time
        total = self.success + self.failure
        rate = total / elapsed if elapsed > 0 else 0
        return {
            "success": self.success,
            "failure": self.failure,
            "total": total,
            "elapsed_sec": round(elapsed, 1),
            "records_per_sec": round(rate, 1),
        }


class ProgressReporter:
    """Background thread that polls the tracker and logs progress."""

    def __init__(self, tracker, interval: float = 2.0, expected_total: Optional[int] = None):
        self.tracker = tracker
        self.interval = interval
        self.expected_total = expected_total
        self._stop_event = threading.Event()
        self._thread = None

    def _poll_loop(self):
        while not self._stop_event.is_set():
            try:
                counts = ray.get(self.tracker.get_counts.remote())
                self._print_progress(counts)
            except Exception as e:
                logger.warning(f"Progress polling error: {e}")
            time.sleep(self.interval)

    def _print_progress(self, counts: Dict[str, Any]):
        total = counts["total"]
        success = counts["success"]
        failure = counts["failure"]
        rate = counts["records_per_sec"]
        elapsed = counts["elapsed_sec"]

        if self.expected_total is not None:
            pct = (total / self.expected_total) * 100
            logger.info(
                f"Progress: {total:,}/{self.expected_total:,} ({pct:.1f}%) | "
                f"success: {success:,} | failure: {failure:,} | "
                f"{rate:,.0f} rec/s | {elapsed}s elapsed"
            )
        else:
            logger.info(
                f"Processed: {total:,} | success: {success:,} | failure: {failure:,} | "
                f"{rate:,.0f} rec/s | {elapsed}s elapsed"
            )

    def start(self):
        self._thread = threading.Thread(target=self._poll_loop, daemon=True)
        self._thread.start()

    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join(timeout=5)
        # Print final counts
        counts = ray.get(self.tracker.get_counts.remote())
        print()
        print("-" * 50)
        print(f"Final: {counts['total']:,} total | {counts['success']:,} success | {counts['failure']:,} failure")
        print(f"Time: {counts['elapsed_sec']}s | Rate: {counts['records_per_sec']:,.0f} rec/s")
        print("-" * 50)
        return counts


def process_batch(batch: Dict[str, Any], tracker) -> Dict[str, Any]:
    """Example batch processor - replace with your logic."""
    success, failure = 0, 0
    results = []

    for item in batch["data"]:
        try:
            # === Your processing logic here ===
            processed = item * 2  # Example transformation
            results.append(processed)
            success += 1
        except Exception:
            results.append(None)
            failure += 1

    # Update tracker (fire-and-forget)
    tracker.increment.remote(success=success, failure=failure)

    return {"data": results}


# =============================================================================
# Main execution
# =============================================================================

if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )

    try:
        ray.init()

        # Create tracker actor
        tracker = ProgressTracker.remote()

        # Create sample dataset (replace with your actual data source)
        ds = ray.data.from_items([{"data": list(range(1000))} for _ in range(100000)])

        # Start progress reporter
        reporter = ProgressReporter(
            tracker=tracker,
            interval=0.5,  # Print every 0.5 seconds
            expected_total=100_000,
        )
        reporter.start()

        # Process the dataset
        result_ds = ds.map_batches(
            process_batch,
            fn_kwargs={"tracker": tracker},
            batch_size=100,
        )

        # Trigger execution (write to sink or consume)
        result_ds.write_parquet("~/ray-experiments/progress-tracker/output.parquet")

    finally:
        # Always stop reporter to get final counts
        final_counts = reporter.stop()

        if ray.is_initialized():
            ray.shutdown()

How to run it

Setup

uv sync

Run

uv run python src/progress_tracker.py
python3 src/progress_tracker.py 
2026-02-08 00:40:19,675 INFO worker.py:1998 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
/home/asaha/ray-experiments/progress-tracker/.venv/lib/python3.12/site-packages/ray/_private/worker.py:2046: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
2026-02-08 00:40:33,134 INFO logging.py:397 -- Registered dataset logger for dataset dataset_3_0
2026-02-08 00:40:33,143 INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_3_0. Full logs are in /tmp/ray/session_2026-02-08_00-40-17_130068_2917011/logs/ray-data
2026-02-08 00:40:33,143 INFO streaming_executor.py:179 -- Execution plan of Dataset dataset_3_0: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(process_batch)->Write]
2026-02-08 00:40:33,143 INFO streaming_executor.py:686 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
[dataset]: Run `pip install tqdm` to enable progress reporting.
2026-02-08 00:40:33,144 WARNING resource_manager.py:136 -- ⚠️  Ray's object store is configured to use only 42.9% of available memory (39.9GiB out of 93.2GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
2026-02-08 00:40:33,260 INFO __main__: Progress: 0/100,000 (0.0%) | success: 0 | failure: 0 | 0 rec/s | 0.0s elapsed
2026-02-08 00:40:33,763 INFO __main__: Progress: 0/100,000 (0.0%) | success: 0 | failure: 0 | 0 rec/s | 0.0s elapsed
2026-02-08 00:40:34,269 INFO __main__: Progress: 49,800/100,000 (49.8%) | success: 49,800 | failure: 0 | 3,658 rec/s | 13.6s elapsed
2026-02-08 00:40:34,679 INFO streaming_executor.py:304 -- ✔️  Dataset dataset_3_0 execution finished in 1.54 seconds
2026-02-08 00:40:34,827 INFO __main__: Progress: 100,000/100,000 (100.0%) | success: 100,000 | failure: 0 | 7,148 rec/s | 14.0s elapsed
2026-02-08 00:40:34,827 INFO dataset.py:5344 -- Data sink Parquet finished. 100000 rows and 763.7MiB data written.

--------------------------------------------------
Final: 100,000 total | 100,000 success | 0 failure
Time: 14.0s | Rate: 7,148 rec/s
--------------------------------------------------

Your approach is sound and leverages Ray’s concurrency groups to ensure that increment (write) and get_counts (read) operations on the ProgressTracker actor do not block each other, which is the recommended way to avoid read starvation and lost updates in high-concurrency scenarios. This pattern is not only correct but also aligns with best practices for distributed progress tracking in Ray, as discussed in the Ray codebase and community forums. Logging progress from a background thread is also compatible with Ray Data’s own progress bars and logging output, as Ray Data logs its own progress to the console and to log files, and your approach will not interfere with that output (monitoring-your-workload.rst, logging_progress.py).

Your use of fire-and-forget increment.remote() calls ensures that worker throughput is not bottlenecked by progress reporting, and the separation of read/write concurrency groups guarantees that the reporter can always fetch up-to-date counts. This is a robust and scalable solution for distributed progress tracking in Ray jobs. Would you like a step-by-step breakdown of the concurrency group mechanism or suggestions for further enhancements?

Sources:

Hint: Mention @RunLLM in the post for followups.