Custom metrics works on local Ray but not on the cluster

I have a fairly simple data collector, that exposes some of the keys as a custom metrics:

class TransformStatisticsRay(TransformStatistics):
    """
    Basic statistics class collecting basic execution statistics.
    It can be extended for specific processors
    """

    def __init__(self, params: dict[str, Any]):
        from ray.util.metrics import Counter

        super().__init__()
        self.data_write_counter = Counter("data_written", "Total data written bytes")
        self.data_read_counter = Counter("data_read", "Total data read bytes")
        self.source_files_counter = Counter("source_files_processed", "Total source files processed")
        self.result_files_counter = Counter("result_files_written", "Total result files written")
        self.source_documents_counter = Counter("source_documents_processed", "Total source document processed")
        self.result_documents_counter = Counter("result_documents_written", "Total result documents written")
        self.empty_table_counter = Counter("empty_tables", "Total empty tables read")
        self.failed_read_counter = Counter("failed_read_files", "Total read failed files")
        self.failed_write_counter = Counter("failed_write_files", "Total write failed files")
        self.transform_exceptions_counter = Counter("transform_exceptions", "Transform exception occurred")
        self.data_retries_counter = Counter("data_access_retries", "Data access retries")

    def add_stats(self, stats=dict[str, Any]) -> None:
        """
        Add statistics
        :param stats - dictionary creating new statistics
        :return: None
        """
        for key, val in stats.items():
            if val > 0:
                self.stats[key] = self.stats.get(key, 0) + val
                if key == "source_files":
                    self.source_files_counter.inc(val)
                if key == "source_size":
                    self.data_read_counter.inc(val)
                if key == "result_files":
                    self.result_files_counter.inc(val)
                if key == "source_doc_count":
                    self.source_documents_counter.inc(val)
                if key == "result_doc_count":
                    self.result_documents_counter.inc(val)
                if key == "skipped empty tables":
                    self.empty_table_counter.inc(val)
                if key == "failed_reads":
                    self.failed_read_counter.inc(val)
                if key == "failed_writes":
                    self.failed_write_counter.inc(val)
                if key == "transform execution exception":
                    self.transform_exceptions_counter.inc(val)
                if key == "data access retries":
                    self.data_retries_counter.inc(val)

When I use this class using Ray local, it works fine, but if I run the same code on Ray on Kubernetes, I see all the “standard” metrics, but not my custom one.

I am sure I am doing something wrong, but I do not see it. Also current Ray version that I am using is 2.9.3. I think it worked for 2.9.1. I also tried it for the latest 2.24.0 and result is the same

1 Like

Hi @blublinsky,

Do you use Ray on kuberay? What’s your set up?

I figured it out. thanks. The documentation is not completely clear, but when I realized that not all metrics is propagated to head, I found my metric in the worker nodes.

1 Like

Glad that you figured it out. Could you tell me which part of the doc is not clear and I can fix it.

I was a bit confused which pod actually has the metrics. It looks like standard metrics is always exposed by the head, but custom metrics is by workers