I have a fairly simple data collector, that exposes some of the keys as a custom metrics:
class TransformStatisticsRay(TransformStatistics):
"""
Basic statistics class collecting basic execution statistics.
It can be extended for specific processors
"""
def __init__(self, params: dict[str, Any]):
from ray.util.metrics import Counter
super().__init__()
self.data_write_counter = Counter("data_written", "Total data written bytes")
self.data_read_counter = Counter("data_read", "Total data read bytes")
self.source_files_counter = Counter("source_files_processed", "Total source files processed")
self.result_files_counter = Counter("result_files_written", "Total result files written")
self.source_documents_counter = Counter("source_documents_processed", "Total source document processed")
self.result_documents_counter = Counter("result_documents_written", "Total result documents written")
self.empty_table_counter = Counter("empty_tables", "Total empty tables read")
self.failed_read_counter = Counter("failed_read_files", "Total read failed files")
self.failed_write_counter = Counter("failed_write_files", "Total write failed files")
self.transform_exceptions_counter = Counter("transform_exceptions", "Transform exception occurred")
self.data_retries_counter = Counter("data_access_retries", "Data access retries")
def add_stats(self, stats=dict[str, Any]) -> None:
"""
Add statistics
:param stats - dictionary creating new statistics
:return: None
"""
for key, val in stats.items():
if val > 0:
self.stats[key] = self.stats.get(key, 0) + val
if key == "source_files":
self.source_files_counter.inc(val)
if key == "source_size":
self.data_read_counter.inc(val)
if key == "result_files":
self.result_files_counter.inc(val)
if key == "source_doc_count":
self.source_documents_counter.inc(val)
if key == "result_doc_count":
self.result_documents_counter.inc(val)
if key == "skipped empty tables":
self.empty_table_counter.inc(val)
if key == "failed_reads":
self.failed_read_counter.inc(val)
if key == "failed_writes":
self.failed_write_counter.inc(val)
if key == "transform execution exception":
self.transform_exceptions_counter.inc(val)
if key == "data access retries":
self.data_retries_counter.inc(val)
When I use this class using Ray local, it works fine, but if I run the same code on Ray on Kubernetes, I see all the “standard” metrics, but not my custom one.