Merge pull request #1 from BotChain-Robots/js/rpc-calls

Two way gRPC like calls
This commit is contained in:
2026-02-10 23:40:41 -05:00
committed by GitHub
18 changed files with 432 additions and 18 deletions

12
.clang-format Normal file
View File

@@ -0,0 +1,12 @@
BasedOnStyle: LLVM
IndentWidth: 4
UseTab: Never
ColumnLimit: 100
KeepEmptyLinesAtTheStartOfBlocks: true
MaxEmptyLinesToKeep: 2
AllowShortBlocksOnASingleLine: Never
AllowShortIfStatementsOnASingleLine: Never
AllowShortLoopsOnASingleLine: false
AllowShortFunctionsOnASingleLine: None
ReflowComments: false

View File

@@ -8,7 +8,7 @@ find_package(Threads REQUIRED)
find_package(flatbuffers REQUIRED) find_package(flatbuffers REQUIRED)
find_package(spdlog REQUIRED) find_package(spdlog REQUIRED)
add_library(rpc src/librpc.cpp src/TCPClient.cpp src/UDPClient.cpp src/mDNSDiscoveryService.cpp src/MPIMessageBuilder.cpp add_library(rpc src/librpc.cpp src/TCPClient.cpp src/UDPClient.cpp src/mDNSDiscoveryService.cpp src/MPIMessageBuilder.cpp src/CallBuilder.cpp
include/util/log.h) include/util/log.h)
target_include_directories(rpc target_include_directories(rpc
PUBLIC PUBLIC

View File

@@ -4,6 +4,7 @@
"conan": {} "conan": {}
}, },
"include": [ "include": [
"build/Release/generators/CMakePresets.json" "build/Release/generators/CMakePresets.json",
"build/RelWithDebInfo/generators/CMakePresets.json"
] ]
} }

View File

@@ -1,11 +1,13 @@
from conan import ConanFile
from conan.tools.cmake import CMake, cmake_layout, CMakeToolchain, CMakeDeps
from conan.tools.files import copy
import os import os
from conan import ConanFile
from conan.tools.cmake import CMake, CMakeDeps, CMakeToolchain, cmake_layout
from conan.tools.files import copy
class MyLibraryConan(ConanFile): class MyLibraryConan(ConanFile):
name = "librpc" name = "librpc"
version = "1.1.6" version = "1.1.7"
settings = "os", "compiler", "build_type", "arch" settings = "os", "compiler", "build_type", "arch"
options = {"shared": [True, False], "fPIC": [True, False]} options = {"shared": [True, False], "fPIC": [True, False]}

View File

@@ -12,4 +12,4 @@ class ICommunicationClient {
virtual int send_msg(void *sendbuff, uint32_t len) = 0; virtual int send_msg(void *sendbuff, uint32_t len) = 0;
}; };
#endif //INETWORKCLIENT_H #endif // INETWORKCLIENT_H

View File

@@ -0,0 +1,29 @@
#ifndef CALLBUILDER_H
#define CALLBUILDER_H
#include <vector>
#include "SerializedMessage.h"
#include "flatbuffers/flatbuffers.h"
#include "flatbuffers_generated/ReturnCall_generated.h"
#include "flatbuffers_generated/SendCall_generated.h"
namespace Flatbuffers {
class CallBuilder {
public:
CallBuilder() : builder_(1024) {
}
SerializedMessage build_send_call(uint8_t tag, uint8_t unique_id,
const std::vector<uint8_t> &parameters);
static const Messaging::ReturnCall *parse_return_call(const uint8_t *buffer);
private:
flatbuffers::FlatBufferBuilder builder_;
};
} // namespace Flatbuffers
#endif // CALLBUILDER_H

View File

@@ -30,4 +30,4 @@ class MPIMessageBuilder {
}; };
} // namespace Flatbuffers } // namespace Flatbuffers
#endif //MPIMESSAGEBUILDER_H #endif // MPIMESSAGEBUILDER_H

View File

@@ -12,4 +12,4 @@ struct SerializedMessage {
}; };
} // namespace Flatbuffers } // namespace Flatbuffers
#endif //SERIALIZEDMESSAGE_H #endif // SERIALIZEDMESSAGE_H

View File

@@ -1,6 +1,5 @@
// automatically generated by the FlatBuffers compiler, do not modify // automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_ #ifndef FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_
#define FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_ #define FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_

View File

@@ -0,0 +1,112 @@
// automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_RETURNCALL_MESSAGING_H_
#define FLATBUFFERS_GENERATED_RETURNCALL_MESSAGING_H_
#include "flatbuffers/flatbuffers.h"
// Ensure the included flatbuffers.h is the same version as when this file was
// generated, otherwise it may not be compatible.
// static_assert(FLATBUFFERS_VERSION_MAJOR == 25 &&
// FLATBUFFERS_VERSION_MINOR == 2 &&
// FLATBUFFERS_VERSION_REVISION == 10,
// "Non-compatible flatbuffers version included");
namespace Messaging {
struct ReturnCall;
struct ReturnCallBuilder;
struct ReturnCall FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table {
typedef ReturnCallBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_UNIQUE_ID = 4,
VT_LENGTH = 6,
VT_RETURN_VALUE = 8
};
uint8_t unique_id() const {
return GetField<uint8_t>(VT_UNIQUE_ID, 0);
}
uint16_t length() const {
return GetField<uint16_t>(VT_LENGTH, 0);
}
const ::flatbuffers::Vector<uint8_t> *return_value() const {
return GetPointer<const ::flatbuffers::Vector<uint8_t> *>(VT_RETURN_VALUE);
}
bool Verify(::flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) && VerifyField<uint8_t>(verifier, VT_UNIQUE_ID, 1) &&
VerifyField<uint16_t>(verifier, VT_LENGTH, 2) &&
VerifyOffset(verifier, VT_RETURN_VALUE) && verifier.VerifyVector(return_value()) &&
verifier.EndTable();
}
};
struct ReturnCallBuilder {
typedef ReturnCall Table;
::flatbuffers::FlatBufferBuilder &fbb_;
::flatbuffers::uoffset_t start_;
void add_unique_id(uint8_t unique_id) {
fbb_.AddElement<uint8_t>(ReturnCall::VT_UNIQUE_ID, unique_id, 0);
}
void add_length(uint16_t length) {
fbb_.AddElement<uint16_t>(ReturnCall::VT_LENGTH, length, 0);
}
void add_return_value(::flatbuffers::Offset<::flatbuffers::Vector<uint8_t>> return_value) {
fbb_.AddOffset(ReturnCall::VT_RETURN_VALUE, return_value);
}
explicit ReturnCallBuilder(::flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) {
start_ = fbb_.StartTable();
}
::flatbuffers::Offset<ReturnCall> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = ::flatbuffers::Offset<ReturnCall>(end);
return o;
}
};
inline ::flatbuffers::Offset<ReturnCall>
CreateReturnCall(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t unique_id = 0, uint16_t length = 0,
::flatbuffers::Offset<::flatbuffers::Vector<uint8_t>> return_value = 0) {
ReturnCallBuilder builder_(_fbb);
builder_.add_return_value(return_value);
builder_.add_length(length);
builder_.add_unique_id(unique_id);
return builder_.Finish();
}
inline ::flatbuffers::Offset<ReturnCall>
CreateReturnCallDirect(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t unique_id = 0,
uint16_t length = 0, const std::vector<uint8_t> *return_value = nullptr) {
auto return_value__ = return_value ? _fbb.CreateVector<uint8_t>(*return_value) : 0;
return Messaging::CreateReturnCall(_fbb, unique_id, length, return_value__);
}
inline const Messaging::ReturnCall *GetReturnCall(const void *buf) {
return ::flatbuffers::GetRoot<Messaging::ReturnCall>(buf);
}
inline const Messaging::ReturnCall *GetSizePrefixedReturnCall(const void *buf) {
return ::flatbuffers::GetSizePrefixedRoot<Messaging::ReturnCall>(buf);
}
inline bool VerifyReturnCallBuffer(::flatbuffers::Verifier &verifier) {
return verifier.VerifyBuffer<Messaging::ReturnCall>(nullptr);
}
inline bool VerifySizePrefixedReturnCallBuffer(::flatbuffers::Verifier &verifier) {
return verifier.VerifySizePrefixedBuffer<Messaging::ReturnCall>(nullptr);
}
inline void FinishReturnCallBuffer(::flatbuffers::FlatBufferBuilder &fbb,
::flatbuffers::Offset<Messaging::ReturnCall> root) {
fbb.Finish(root);
}
inline void FinishSizePrefixedReturnCallBuffer(::flatbuffers::FlatBufferBuilder &fbb,
::flatbuffers::Offset<Messaging::ReturnCall> root) {
fbb.FinishSizePrefixed(root);
}
} // namespace Messaging
#endif // FLATBUFFERS_GENERATED_RETURNCALL_MESSAGING_H_

View File

@@ -1,6 +1,5 @@
// automatically generated by the FlatBuffers compiler, do not modify // automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_ROBOTMODULE_H_ #ifndef FLATBUFFERS_GENERATED_ROBOTMODULE_H_
#define FLATBUFFERS_GENERATED_ROBOTMODULE_H_ #define FLATBUFFERS_GENERATED_ROBOTMODULE_H_
@@ -8,7 +7,7 @@
// Ensure the included flatbuffers.h is the same version as when this file was // Ensure the included flatbuffers.h is the same version as when this file was
// generated, otherwise it may not be compatible. // generated, otherwise it may not be compatible.
//static_assert(FLATBUFFERS_VERSION_MAJOR == 25 && // static_assert(FLATBUFFERS_VERSION_MAJOR == 25 &&
// FLATBUFFERS_VERSION_MINOR == 2 && // FLATBUFFERS_VERSION_MINOR == 2 &&
// FLATBUFFERS_VERSION_REVISION == 10, // FLATBUFFERS_VERSION_REVISION == 10,
// "Non-compatible flatbuffers version included"); // "Non-compatible flatbuffers version included");

View File

@@ -0,0 +1,122 @@
// automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_SENDCALL_MESSAGING_H_
#define FLATBUFFERS_GENERATED_SENDCALL_MESSAGING_H_
#include "flatbuffers/flatbuffers.h"
// Ensure the included flatbuffers.h is the same version as when this file was
// generated, otherwise it may not be compatible.
// static_assert(FLATBUFFERS_VERSION_MAJOR == 25 &&
// FLATBUFFERS_VERSION_MINOR == 2 &&
// FLATBUFFERS_VERSION_REVISION == 10,
// "Non-compatible flatbuffers version included");
namespace Messaging {
struct SendCall;
struct SendCallBuilder;
struct SendCall FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table {
typedef SendCallBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_TAG = 4,
VT_UNIQUE_ID = 6,
VT_LENGTH = 8,
VT_PARAMETERS = 10
};
uint8_t tag() const {
return GetField<uint8_t>(VT_TAG, 0);
}
uint8_t unique_id() const {
return GetField<uint8_t>(VT_UNIQUE_ID, 0);
}
uint16_t length() const {
return GetField<uint16_t>(VT_LENGTH, 0);
}
const ::flatbuffers::Vector<uint8_t> *parameters() const {
return GetPointer<const ::flatbuffers::Vector<uint8_t> *>(VT_PARAMETERS);
}
bool Verify(::flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) && VerifyField<uint8_t>(verifier, VT_TAG, 1) &&
VerifyField<uint8_t>(verifier, VT_UNIQUE_ID, 1) &&
VerifyField<uint16_t>(verifier, VT_LENGTH, 2) &&
VerifyOffset(verifier, VT_PARAMETERS) && verifier.VerifyVector(parameters()) &&
verifier.EndTable();
}
};
struct SendCallBuilder {
typedef SendCall Table;
::flatbuffers::FlatBufferBuilder &fbb_;
::flatbuffers::uoffset_t start_;
void add_tag(uint8_t tag) {
fbb_.AddElement<uint8_t>(SendCall::VT_TAG, tag, 0);
}
void add_unique_id(uint8_t unique_id) {
fbb_.AddElement<uint8_t>(SendCall::VT_UNIQUE_ID, unique_id, 0);
}
void add_length(uint16_t length) {
fbb_.AddElement<uint16_t>(SendCall::VT_LENGTH, length, 0);
}
void add_parameters(::flatbuffers::Offset<::flatbuffers::Vector<uint8_t>> parameters) {
fbb_.AddOffset(SendCall::VT_PARAMETERS, parameters);
}
explicit SendCallBuilder(::flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) {
start_ = fbb_.StartTable();
}
::flatbuffers::Offset<SendCall> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = ::flatbuffers::Offset<SendCall>(end);
return o;
}
};
inline ::flatbuffers::Offset<SendCall>
CreateSendCall(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t tag = 0, uint8_t unique_id = 0,
uint16_t length = 0,
::flatbuffers::Offset<::flatbuffers::Vector<uint8_t>> parameters = 0) {
SendCallBuilder builder_(_fbb);
builder_.add_parameters(parameters);
builder_.add_length(length);
builder_.add_unique_id(unique_id);
builder_.add_tag(tag);
return builder_.Finish();
}
inline ::flatbuffers::Offset<SendCall>
CreateSendCallDirect(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t tag = 0, uint8_t unique_id = 0,
uint16_t length = 0, const std::vector<uint8_t> *parameters = nullptr) {
auto parameters__ = parameters ? _fbb.CreateVector<uint8_t>(*parameters) : 0;
return Messaging::CreateSendCall(_fbb, tag, unique_id, length, parameters__);
}
inline const Messaging::SendCall *GetSendCall(const void *buf) {
return ::flatbuffers::GetRoot<Messaging::SendCall>(buf);
}
inline const Messaging::SendCall *GetSizePrefixedSendCall(const void *buf) {
return ::flatbuffers::GetSizePrefixedRoot<Messaging::SendCall>(buf);
}
inline bool VerifySendCallBuffer(::flatbuffers::Verifier &verifier) {
return verifier.VerifyBuffer<Messaging::SendCall>(nullptr);
}
inline bool VerifySizePrefixedSendCallBuffer(::flatbuffers::Verifier &verifier) {
return verifier.VerifySizePrefixedBuffer<Messaging::SendCall>(nullptr);
}
inline void FinishSendCallBuffer(::flatbuffers::FlatBufferBuilder &fbb,
::flatbuffers::Offset<Messaging::SendCall> root) {
fbb.Finish(root);
}
inline void FinishSizePrefixedSendCallBuffer(::flatbuffers::FlatBufferBuilder &fbb,
::flatbuffers::Offset<Messaging::SendCall> root) {
fbb.FinishSizePrefixed(root);
}
} // namespace Messaging
#endif // FLATBUFFERS_GENERATED_SENDCALL_MESSAGING_H_

View File

@@ -3,15 +3,18 @@
#include <chrono> #include <chrono>
#include <memory> #include <memory>
#include <semaphore>
#include <shared_mutex> #include <shared_mutex>
#include <thread> #include <thread>
#include "BlockingQueue.h" #include "BlockingQueue.h"
#include "constants.h" #include "constants.h"
#include "flatbuffers/CallBuilder.h"
#include "mDNSDiscoveryService.h" #include "mDNSDiscoveryService.h"
constexpr auto RX_QUEUE_SIZE = 100; constexpr auto RX_QUEUE_SIZE = 100;
constexpr auto FN_CALL_TAG = 100; // reserved tag for RPC functionality
constexpr auto FN_CALL_TIMEOUT = std::chrono::seconds(3);
struct SizeAndSource { struct SizeAndSource {
size_t bytes_written; size_t bytes_written;
@@ -22,6 +25,7 @@ class MessagingInterface {
public: public:
MessagingInterface() MessagingInterface()
: m_stop_flag(false), m_rx_thread(std::thread(&MessagingInterface::handle_recv, this)), : m_stop_flag(false), m_rx_thread(std::thread(&MessagingInterface::handle_recv, this)),
m_fn_rx_thread(std::thread(&MessagingInterface::handle_fn_recv, this)),
m_rx_queue(std::make_shared<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>>( m_rx_queue(std::make_shared<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>>(
RX_QUEUE_SIZE)) { RX_QUEUE_SIZE)) {
#ifdef _WIN32 #ifdef _WIN32
@@ -37,23 +41,36 @@ class MessagingInterface {
int broadcast(uint8_t *buffer, size_t size, bool durable); // todo int broadcast(uint8_t *buffer, size_t size, bool durable); // todo
std::optional<SizeAndSource> recv(uint8_t *buffer, size_t size, uint8_t tag); std::optional<SizeAndSource> recv(uint8_t *buffer, size_t size, uint8_t tag);
int sendrecv(uint8_t *send_buffer, size_t send_size, uint8_t dest, uint8_t send_tag, int sendrecv(uint8_t *send_buffer, size_t send_size, uint8_t dest, uint8_t send_tag,
uint8_t *recv_buffer, size_t recv_size, uint8_t recv_tag); // todo uint8_t *recv_buffer, size_t recv_size,
uint8_t recv_tag); // todo
std::optional<std::unique_ptr<std::vector<uint8_t>>>
remote_call(uint8_t function_tag, uint8_t module, const std::vector<uint8_t> &parameters);
std::unordered_set<uint8_t> find_connected_modules(std::chrono::duration<double> scan_duration); std::unordered_set<uint8_t> find_connected_modules(std::chrono::duration<double> scan_duration);
private: private:
void handle_recv(); void handle_recv();
void handle_fn_recv();
uint16_t m_sequence_number = 0; uint16_t m_sequence_number = 0;
uint8_t unique_fn_call_id = 0; // this is designed to overflow, change to uint16_t if we plan on
// having way more calls per second.
std::unordered_map<uint8_t, std::shared_ptr<ICommunicationClient>> m_id_to_lossless_client; std::unordered_map<uint8_t, std::shared_ptr<ICommunicationClient>> m_id_to_lossless_client;
std::unordered_map<uint8_t, std::shared_ptr<ICommunicationClient>> m_id_to_lossy_client; std::unordered_map<uint8_t, std::shared_ptr<ICommunicationClient>> m_id_to_lossy_client;
std::unordered_map<int, std::unique_ptr<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>>> std::unordered_map<int, std::unique_ptr<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>>>
m_tag_to_queue_map; m_tag_to_queue_map;
// The semaphore needs to be in a unique_ptr, since it is not copyable or
// movable unordered_maps need to copy/move to reshuffle.
std::unordered_map<uint8_t, std::unique_ptr<std::binary_semaphore>> m_fn_call_to_semaphore;
std::unordered_map<uint8_t, std::unique_ptr<std::vector<uint8_t>>> m_fn_call_to_result;
std::unique_ptr<IDiscoveryService> m_discovery_service; std::unique_ptr<IDiscoveryService> m_discovery_service;
std::atomic<bool> m_stop_flag; std::atomic<bool> m_stop_flag;
std::thread m_rx_thread; std::thread m_rx_thread;
std::thread m_fn_rx_thread;
std::shared_ptr<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>> m_rx_queue; std::shared_ptr<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>> m_rx_queue;
std::shared_mutex m_client_mutex; std::shared_mutex m_client_mutex;
std::shared_mutex m_scan_mutex; std::shared_mutex m_scan_mutex;
std::mutex m_fn_call_mutex;
std::mutex m_tag_queue_mutex;
}; };
#endif // RPC_LIBRARY_H #endif // RPC_LIBRARY_H

View File

@@ -16,4 +16,4 @@ struct mDNSRobotModule {
std::vector<int> connected_module_ids; std::vector<int> connected_module_ids;
}; };
#endif //ROBOTMODULEINSTANCE_H #endif // ROBOTMODULEINSTANCE_H

View File

@@ -29,4 +29,4 @@ void print_errno() {
#endif #endif
#endif //LOG_H #endif // LOG_H

View File

@@ -20,4 +20,4 @@ inline std::vector<std::string> split(const std::string &str, const char delimit
return result; return result;
} }
#endif //STRING_H #endif // STRING_H

23
src/CallBuilder.cpp Normal file
View File

@@ -0,0 +1,23 @@
#include "flatbuffers/CallBuilder.h"
#include "flatbuffers/SerializedMessage.h"
namespace Flatbuffers {
SerializedMessage CallBuilder::build_send_call(uint8_t tag, uint8_t unique_id,
const std::vector<uint8_t> &parameters) {
builder_.Clear();
const auto parameters_vector = builder_.CreateVector(parameters);
const auto message = Messaging::CreateSendCall(
builder_, tag, unique_id, static_cast<int>(parameters.size()), parameters_vector);
builder_.Finish(message);
return {builder_.GetBufferPointer(), builder_.GetSize()};
}
const Messaging::ReturnCall *CallBuilder::parse_return_call(const uint8_t *buffer) {
return flatbuffers::GetRoot<Messaging::ReturnCall>(buffer);
}
} // namespace Flatbuffers

View File

@@ -1,12 +1,19 @@
#include "librpc.h" #include "librpc.h"
#include "flatbuffers_generated/ReturnCall_generated.h"
#include <cstring>
#include <functional>
#include <iostream>
#include <memory>
#include <mutex> #include <mutex>
#include <optional> #include <optional>
#include <semaphore>
#include <vector> #include <vector>
#undef min #undef min
#include <algorithm> #include <algorithm>
#include "flatbuffers/CallBuilder.h"
#include "flatbuffers/MPIMessageBuilder.h" #include "flatbuffers/MPIMessageBuilder.h"
#include "spdlog/spdlog.h" #include "spdlog/spdlog.h"
@@ -14,10 +21,12 @@ constexpr auto MAX_RECV_WAIT_TIME = std::chrono::seconds(3);
constexpr auto PER_TAG_MAX_QUEUE_SIZE = 50; constexpr auto PER_TAG_MAX_QUEUE_SIZE = 50;
constexpr auto MAX_WAIT_TIME_TAG_ENQUEUE = std::chrono::milliseconds(250); constexpr auto MAX_WAIT_TIME_TAG_ENQUEUE = std::chrono::milliseconds(250);
constexpr auto MAX_WAIT_TIME_RX_THREAD_DEQUEUE = std::chrono::milliseconds(250); constexpr auto MAX_WAIT_TIME_RX_THREAD_DEQUEUE = std::chrono::milliseconds(250);
constexpr auto FN_RETURN_BUFFER_SIZE = 1024;
MessagingInterface::~MessagingInterface() { MessagingInterface::~MessagingInterface() {
m_stop_flag = true; m_stop_flag = true;
m_rx_thread.join(); m_rx_thread.join();
m_fn_rx_thread.join();
#ifdef _WIN32 #ifdef _WIN32
WSACleanup(); WSACleanup();
@@ -51,11 +60,13 @@ int MessagingInterface::broadcast(uint8_t *buffer, size_t size, bool durable) {
std::optional<SizeAndSource> MessagingInterface::recv(uint8_t *buffer, const size_t size, std::optional<SizeAndSource> MessagingInterface::recv(uint8_t *buffer, const size_t size,
uint8_t tag) { uint8_t tag) {
std::unique_lock lock(m_tag_queue_mutex);
if (!m_tag_to_queue_map.contains(tag)) { if (!m_tag_to_queue_map.contains(tag)) {
m_tag_to_queue_map.insert( m_tag_to_queue_map.insert(
{tag, std::make_unique<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>>( {tag, std::make_unique<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>>(
PER_TAG_MAX_QUEUE_SIZE)}); PER_TAG_MAX_QUEUE_SIZE)});
} }
lock.unlock();
const auto data = m_tag_to_queue_map[tag]->dequeue(MAX_RECV_WAIT_TIME); const auto data = m_tag_to_queue_map[tag]->dequeue(MAX_RECV_WAIT_TIME);
@@ -82,7 +93,8 @@ int MessagingInterface::sendrecv(uint8_t *send_buffer, size_t send_size, uint8_t
std::unordered_set<uint8_t> std::unordered_set<uint8_t>
MessagingInterface::find_connected_modules(const std::chrono::duration<double> scan_duration) { MessagingInterface::find_connected_modules(const std::chrono::duration<double> scan_duration) {
// Cannot just skip the call if already running, since the caller needs the list of modules. // Cannot just skip the call if already running, since the caller needs the
// list of modules.
std::unique_lock scan_lock(m_scan_mutex); std::unique_lock scan_lock(m_scan_mutex);
const auto foundModules = this->m_discovery_service->find_modules(scan_duration); const auto foundModules = this->m_discovery_service->find_modules(scan_duration);
scan_lock.unlock(); scan_lock.unlock();
@@ -120,15 +132,101 @@ void MessagingInterface::handle_recv() {
const auto &mpi_message = const auto &mpi_message =
Flatbuffers::MPIMessageBuilder::parse_mpi_message(data.value()->data()); Flatbuffers::MPIMessageBuilder::parse_mpi_message(data.value()->data());
std::unique_lock lock(m_tag_queue_mutex);
if (!m_tag_to_queue_map.contains(mpi_message->tag())) { if (!m_tag_to_queue_map.contains(mpi_message->tag())) {
m_tag_to_queue_map.insert( m_tag_to_queue_map.insert(
{mpi_message->tag(), {mpi_message->tag(),
std::make_unique<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>>( std::make_unique<BlockingQueue<std::unique_ptr<std::vector<uint8_t>>>>(
PER_TAG_MAX_QUEUE_SIZE)}); PER_TAG_MAX_QUEUE_SIZE)});
} }
lock.unlock();
m_tag_to_queue_map[mpi_message->tag()]->enqueue(std::move(data.value()), m_tag_to_queue_map[mpi_message->tag()]->enqueue(std::move(data.value()),
MAX_WAIT_TIME_TAG_ENQUEUE); MAX_WAIT_TIME_TAG_ENQUEUE);
} }
} }
} }
std::optional<std::unique_ptr<std::vector<uint8_t>>>
MessagingInterface::remote_call(uint8_t function_tag, uint8_t module_id,
const std::vector<uint8_t> &parameters) {
std::unique_lock lock(m_fn_call_mutex);
const auto unique_id = unique_fn_call_id++;
auto sem = std::make_unique<std::counting_semaphore<1>>(0);
m_fn_call_to_semaphore.insert_or_assign(unique_id, std::move(sem));
lock.unlock();
Flatbuffers::CallBuilder builder{};
auto [data, size] = builder.build_send_call(function_tag, unique_id, parameters);
// Assume durable transmission, non-durable RPC calls do not make sense.
// If a message is lost, we will block unnecessarily until the timeout.
// todo: is this thread safe? especially if other threads will be calling send
// themselves.
send((uint8_t *)data, size, module_id, FN_CALL_TAG, true);
if (m_fn_call_to_semaphore[unique_id]->try_acquire_for(FN_CALL_TIMEOUT)) {
lock.lock();
if (!m_fn_call_to_result[unique_id]) {
m_fn_call_to_semaphore.erase(unique_id);
return std::nullopt;
}
auto result = std::move(m_fn_call_to_result[unique_id]);
m_fn_call_to_result.erase(unique_id);
m_fn_call_to_semaphore.erase(unique_id);
return std::move(result);
}
lock.lock();
m_fn_call_to_semaphore.erase(unique_id);
return std::nullopt;
}
void MessagingInterface::handle_fn_recv() {
while (!m_stop_flag) {
auto buffer = std::make_unique<std::vector<uint8_t>>();
buffer->resize(FN_RETURN_BUFFER_SIZE);
std::optional<SizeAndSource> maybe_result;
SizeAndSource result;
do {
maybe_result = recv(buffer->data(), FN_RETURN_BUFFER_SIZE, FN_CALL_TAG);
} while (!m_stop_flag && !maybe_result);
if (m_stop_flag) {
return;
}
Flatbuffers::CallBuilder builder{};
std::unique_lock lock(m_fn_call_mutex);
result = *maybe_result;
flatbuffers::Verifier verifier(buffer->data(), result.bytes_written);
bool ok = Messaging::VerifyReturnCallBuffer(verifier);
if (!ok) {
spdlog::error("[LibRPC] Got an invalid return buffer");
continue;
}
const auto return_data = builder.parse_return_call(buffer->data());
if (return_data->length() > FN_RETURN_BUFFER_SIZE) {
spdlog::warn("[LibRPC] Got a return buffer with return data that is too large");
continue;
}
auto it = m_fn_call_to_semaphore.find(return_data->unique_id());
if (it == m_fn_call_to_semaphore.end() || !it->second) {
spdlog::warn("[LibRPC] Previously timed out RPC call completed, "
"discarding result");
continue;
}
auto raw_data = std::make_unique<std::vector<uint8_t>>();
raw_data->resize(return_data->length());
std::memcpy(raw_data->data(), return_data->return_value()->data(), return_data->length());
m_fn_call_to_result[return_data->unique_id()] = std::move(raw_data);
it->second->release();
}
}