Ray Job Progress Tracker
Hello Team, I would like to build a distributed Ray Job progress tracker to keep track of how many records processed so far, how many successful, how many failed and then show it on the console as the Ray job makes progress.
I have implemented the following solution and would love to get you review and feedback on the implementation. Is there a better, simpler way to do it via the Ray APIs ?
1. Console logging with ProgressReporter (progress_tracker.py)
A ProgressReporter background thread polls the ProgressTracker actor at a configurable interval and logs progress to the console. The actor tracks counts internally; the reporter reads them via get_counts().
Track success/failure counts and throughput across distributed Ray Data map_batches workers.
Architecture
┌─────────────────────┐ increment() ┌──────────────────────┐
│ map_batches worker │ ──── fire-and-forget ──► │ │
├─────────────────────┤ │ ProgressTracker │
│ map_batches worker │ ──── fire-and-forget ──► │ (Ray actor) │
├─────────────────────┤ │ │
│ map_batches worker │ ──── fire-and-forget ──► │ success, failure, │
└─────────────────────┘ │ timestamps │
└──────────┬───────────┘
│
get_counts() every N sec
│
v
┌──────────────────────┐
│ ProgressReporter │
│ (background thread) │
│ logs to console │
└──────────────────────┘
There are three components:
-
ProgressTracker— a Ray actor that accumulates success/failure counters. Workers callincrement()as fire-and-forget (tracker.increment.remote()), so processing is never blocked by progress bookkeeping. -
ProgressReporter— a local background thread that polls the tracker at a configurable interval and logs progress (count, percentage, throughput, elapsed time). -
process_batch— amap_batches-compatible function that processes each batch and reports results back to the tracker.
Design Decisions
Concurrency groups for reads and writes
By default, Ray actors process method calls one at a time in FIFO order. When map_batches workers flood the actor with increment() calls, a get_counts() call from the reporter gets queued behind them and cannot be served until the backlog clears. This starves the reporter — it only sees results at the very end.
A naive fix like max_concurrency=2 gives the actor a thread pool of 2 threads, but Ray makes no guarantee about which methods land on which threads. Two increment() calls could run simultaneously, and since += is a read-modify-write across multiple bytecodes, concurrent increments would lose updates.
The solution is Ray’s concurrency groups, which pin methods to dedicated thread pools:
@ray.remote(concurrency_groups={"writes": 1, "reads": 1})
class ProgressTracker:
@ray.method(concurrency_group="writes")
def increment(self, ...): ...
@ray.method(concurrency_group="reads")
def get_counts(self): ...
- “writes” group (1 thread) — all
increment()calls from all workers are serialized through a single thread. No matter how many workers callincrement.remote()concurrently, the calls queue within this group and execute one at a time. No updates are lost. - “reads” group (1 thread) —
get_counts()runs on its own dedicated thread, completely independent of the writes queue. The reporter is never starved.
The two groups run concurrently (separate thread pools), so reads and writes overlap freely. But writes among themselves are always sequential — exactly the guarantee we need.
Logging
Ray Data prints its own progress bars and status lines to the console. Using Python logging outputs on separate lines and coexists cleanly with Ray’s console output.
Implementation
import ray
import time
import logging
import threading
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
@ray.remote(concurrency_groups={"writes": 1, "reads": 1})
class ProgressTracker:
"""Actor to track success/failure counts across distributed workers.
Uses concurrency groups to run increment() and get_counts() on separate
threads. All increment() calls are serialized in the "writes" group so
no updates are lost. get_counts() runs in the "reads" group so it is
never starved behind queued writes.
Ref: https://docs.ray.io/en/latest/ray-core/actors/concurrency_group_api.html
"""
def __init__(self):
self.success = 0
self.failure = 0
self.start_time = time.time()
self.last_update_time = self.start_time
@ray.method(concurrency_group="writes")
def increment(self, success: int = 0, failure: int = 0):
self.success += success
self.failure += failure
self.last_update_time = time.time()
@ray.method(concurrency_group="reads")
def get_counts(self) -> Dict[str, Any]:
elapsed = self.last_update_time - self.start_time
total = self.success + self.failure
rate = total / elapsed if elapsed > 0 else 0
return {
"success": self.success,
"failure": self.failure,
"total": total,
"elapsed_sec": round(elapsed, 1),
"records_per_sec": round(rate, 1),
}
class ProgressReporter:
"""Background thread that polls the tracker and logs progress."""
def __init__(self, tracker, interval: float = 2.0, expected_total: Optional[int] = None):
self.tracker = tracker
self.interval = interval
self.expected_total = expected_total
self._stop_event = threading.Event()
self._thread = None
def _poll_loop(self):
while not self._stop_event.is_set():
try:
counts = ray.get(self.tracker.get_counts.remote())
self._print_progress(counts)
except Exception as e:
logger.warning(f"Progress polling error: {e}")
time.sleep(self.interval)
def _print_progress(self, counts: Dict[str, Any]):
total = counts["total"]
success = counts["success"]
failure = counts["failure"]
rate = counts["records_per_sec"]
elapsed = counts["elapsed_sec"]
if self.expected_total is not None:
pct = (total / self.expected_total) * 100
logger.info(
f"Progress: {total:,}/{self.expected_total:,} ({pct:.1f}%) | "
f"success: {success:,} | failure: {failure:,} | "
f"{rate:,.0f} rec/s | {elapsed}s elapsed"
)
else:
logger.info(
f"Processed: {total:,} | success: {success:,} | failure: {failure:,} | "
f"{rate:,.0f} rec/s | {elapsed}s elapsed"
)
def start(self):
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
# Print final counts
counts = ray.get(self.tracker.get_counts.remote())
print()
print("-" * 50)
print(f"Final: {counts['total']:,} total | {counts['success']:,} success | {counts['failure']:,} failure")
print(f"Time: {counts['elapsed_sec']}s | Rate: {counts['records_per_sec']:,.0f} rec/s")
print("-" * 50)
return counts
def process_batch(batch: Dict[str, Any], tracker) -> Dict[str, Any]:
"""Example batch processor - replace with your logic."""
success, failure = 0, 0
results = []
for item in batch["data"]:
try:
# === Your processing logic here ===
processed = item * 2 # Example transformation
results.append(processed)
success += 1
except Exception:
results.append(None)
failure += 1
# Update tracker (fire-and-forget)
tracker.increment.remote(success=success, failure=failure)
return {"data": results}
# =============================================================================
# Main execution
# =============================================================================
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
try:
ray.init()
# Create tracker actor
tracker = ProgressTracker.remote()
# Create sample dataset (replace with your actual data source)
ds = ray.data.from_items([{"data": list(range(1000))} for _ in range(100000)])
# Start progress reporter
reporter = ProgressReporter(
tracker=tracker,
interval=0.5, # Print every 0.5 seconds
expected_total=100_000,
)
reporter.start()
# Process the dataset
result_ds = ds.map_batches(
process_batch,
fn_kwargs={"tracker": tracker},
batch_size=100,
)
# Trigger execution (write to sink or consume)
result_ds.write_parquet("~/ray-experiments/progress-tracker/output.parquet")
finally:
# Always stop reporter to get final counts
final_counts = reporter.stop()
if ray.is_initialized():
ray.shutdown()
How to run it
Setup
uv sync
Run
uv run python src/progress_tracker.py
python3 src/progress_tracker.py
2026-02-08 00:40:19,675 INFO worker.py:1998 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
/home/asaha/ray-experiments/progress-tracker/.venv/lib/python3.12/site-packages/ray/_private/worker.py:2046: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
warnings.warn(
2026-02-08 00:40:33,134 INFO logging.py:397 -- Registered dataset logger for dataset dataset_3_0
2026-02-08 00:40:33,143 INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_3_0. Full logs are in /tmp/ray/session_2026-02-08_00-40-17_130068_2917011/logs/ray-data
2026-02-08 00:40:33,143 INFO streaming_executor.py:179 -- Execution plan of Dataset dataset_3_0: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(process_batch)->Write]
2026-02-08 00:40:33,143 INFO streaming_executor.py:686 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
[dataset]: Run `pip install tqdm` to enable progress reporting.
2026-02-08 00:40:33,144 WARNING resource_manager.py:136 -- ⚠️ Ray's object store is configured to use only 42.9% of available memory (39.9GiB out of 93.2GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
2026-02-08 00:40:33,260 INFO __main__: Progress: 0/100,000 (0.0%) | success: 0 | failure: 0 | 0 rec/s | 0.0s elapsed
2026-02-08 00:40:33,763 INFO __main__: Progress: 0/100,000 (0.0%) | success: 0 | failure: 0 | 0 rec/s | 0.0s elapsed
2026-02-08 00:40:34,269 INFO __main__: Progress: 49,800/100,000 (49.8%) | success: 49,800 | failure: 0 | 3,658 rec/s | 13.6s elapsed
2026-02-08 00:40:34,679 INFO streaming_executor.py:304 -- ✔️ Dataset dataset_3_0 execution finished in 1.54 seconds
2026-02-08 00:40:34,827 INFO __main__: Progress: 100,000/100,000 (100.0%) | success: 100,000 | failure: 0 | 7,148 rec/s | 14.0s elapsed
2026-02-08 00:40:34,827 INFO dataset.py:5344 -- Data sink Parquet finished. 100000 rows and 763.7MiB data written.
--------------------------------------------------
Final: 100,000 total | 100,000 success | 0 failure
Time: 14.0s | Rate: 7,148 rec/s
--------------------------------------------------