#include <ray/api.h>
#include <iostream>
#include <unistd.h>
#include <vector>
#include <unordered_map>
#include <thread>
#include <chrono>
double integrate(double start, double end) {
char hostname[256];
gethostname(hostname, sizeof(hostname));
pid_t pid = getpid();
std::this_thread::sleep_for(std::chrono::seconds(3));
double sum = 0.0;
const long long steps = 10000000;
double step = (end - start) / steps;
for (long long i = 0; i < steps; ++i) {
double x = start + i * step;
sum += std::sin(x) * std::cos(x);
}
return sum * step;
}
RAY_REMOTE(integrate);
int main() {
try {
ray::RayConfig config;
const char* ray_address = std::getenv("RAY_ADDRESS");
if (ray_address) {
config.address = ray_address;
} else {
config.address = "192.168.0.103:6379";
}
std::cout << "===== connect to Ray: " << config.address << " =====" << std::endl;
ray::Init(config);
std::cout << "===== Ray connect success =====" << std::endl;
std::vector<ray::ObjectRef<double>> results;
std::cout << "\n===== sub task =====" << std::endl;
for (int i = 0; i < 100; ++i) {
double start = i * 10000.0;
double end = (i + 1) * 10000.0;
auto task = ray::Task(integrate).Remote(start, end);
results.push_back(task);
}
std::cout << "\n===== wait =====" << std::endl;
std::vector<double> task_results;
for (auto& ref : results) {
std::shared_ptr<double> result_ptr = ray::Get(ref);
task_results.push_back(*result_ptr);
}
double total = 0.0;
for (int i = 0; i < task_results.size(); i++) {
total += task_results[i];
}
std::cout << "\n===== resulet =====" << std::endl;
std::cout << "sum: " << total << std::endl;
ray::Shutdown();
return 0;
} catch (const std::exception& e) {
std::cerr << "\n===== error =====" << std::endl;
std::cerr << "error message: " << e.what() << std::endl;
ray::Shutdown();
return 1;
}
}
// the code
I have three machine but only the .104 server get the task