How to pass the result of a group of tasks to a remote function

The following code will have compile error at the following line: ray::Task(aggregate_results).Remote(futures);
How to correct the code?

#include <ray/api.h>
#include <iostream>
#include <vector>
#include <cmath>

using namespace ray;


double calculate_partial_integral(double a, double b, int steps) {
    const double dx = (b - a) / steps;
    double sum = 0.0;
    
    for(int i = 0; i < steps; ++i) {
        double x = a + (i + 0.5) * dx;
        sum += 4.0 / (1.0 + x*x);
    }
    return sum * dx;
}

double aggregate_results(const std::vector<double>& partial_sums) {
    return std::accumulate(partial_sums.begin(), partial_sums.end(), 0.0);
}

RAY_REMOTE(calculate_partial_integral, aggregate_results);

int main() {
    ray::Init();
    
    const double a = 0.0;      
    const double b = 1.0;      
    const int total_steps = 10000000;  
    const int num_tasks = 8;         
    
    
    const double interval = (b - a) / num_tasks;
    const int steps_per_task = total_steps / num_tasks;
    
    std::vector<ObjectRef<double>> futures;
    
 
    for(int i = 0; i < num_tasks; ++i) {
        double start = a + i * interval;
        double end = start + interval;
        auto future = ray::Task(calculate_partial_integral)
                         .Remote(start, end, steps_per_task);
        futures.push_back(future);
    }
    
  
    auto result_future = ray::Task(aggregate_results).Remote(futures);
    double pi_approx = *result_future.Get();
    
    std::cout.precision(15);
    std::cout << "computed value: " << pi_approx << "\n"
              << "real: " << M_PI << "\n"
              << "gap: " << std::abs(pi_approx - M_PI) << std::endl;
    
    ray::Shutdown();
    return 0;
}

Error message is like:


static_check.h:58:86: error: static assertion failed: arguments not match
   58 |   static_assert(is_invocable<Function, typename FilterArgType<Args>::type...>::value ||
      |                                                                                ~~~~~~^~
   59 |                     is_invocable<Function, Args...>::value,
      |                     ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~              

pi/static_check.h:58:86: note: '(((bool)std::integral_constant<bool, false>::value) || ((bool)std::integral_constant<bool, false>::value))' evaluates to false

Can you share a minimal reproducible example and also what the error message says?

Take a look at this example to see an example of passing the output of one task into another task (in C++). ray/cpp/example/example.cc at master · ray-project/ray · GitHub

There are some more details on the API here ray/cpp/include/ray/api.h at master · ray-project/ray · GitHub

Hi, rkn:
Thanks for you reply.
I have shared my complete code above. I have referred to all the example code. But it seems there is no example for a distributed c++ reducer that pipelines with a vector of mappers. (similar with the python example A Simple MapReduce Example with Ray Core — Ray 2.43.0)

It might work to change the type of aggregate_results to take in a std::vector<ObjectRef<double>> object and then call ray::Get inside, but I’m not 100% sure.

Alternatively, you probably need to call ray::Get first and then pass the result into aggregate_results.

Hi rkh:
Thanks for you prompt reply. I get your point. What I expected is to pass the result of a group tasks to a remote task without modifying the function signature. Similar with the code in ray/cpp/src/ray/test/ray_remote_test.cc at master · ray-project/ray · GitHub , all the PlusXX function are normal functions without any ray object as input.

TEST(RayApiTest, CallWithObjectRef) {
  auto rt0 = ray::Task(Return).Remote();
  auto rt1 = ray::Task(PlusOne).Remote(rt0);
  auto rt2 = ray::Task(PlusTwo).Remote(rt1, 3);
  auto rt3 = ray::Task(PlusOne).Remote(3);
  auto rt4 = ray::Task(PlusTwo).Remote(rt2, rt3);

BTW, calling ray::Get first will transfer the object to the driver node first? I expect the remote tasks can communicate with each other directly.

Thanks