From 880e7ab454f870f3b0d79884ed2f6464838b6118 Mon Sep 17 00:00:00 2001 From: Johnathon Slightham Date: Tue, 10 Feb 2026 23:11:23 -0500 Subject: [PATCH] Finish implementation --- .clang-format | 12 ++ CMakeLists.txt | 2 +- CMakeUserPresets.json | 3 +- conanfile.py | 10 +- include/ICommunicationClient.h | 2 +- include/flatbuffers/CallBuilder.h | 29 +++ include/flatbuffers/MPIMessageBuilder.h | 2 +- include/flatbuffers/SerializedMessage.h | 2 +- .../MPIMessage_generated.h | 1 - .../ReturnCall_generated.h | 152 +++++++-------- .../RobotModule_generated.h | 3 +- .../SendCall_generated.h | 174 ++++++++---------- include/librpc.h | 21 ++- include/mDNSRobotModule.h | 2 +- include/util/log.h | 2 +- include/util/string.h | 2 +- src/CallBuilder.cpp | 23 +++ src/librpc.cpp | 100 +++++++++- 18 files changed, 344 insertions(+), 198 deletions(-) create mode 100644 .clang-format create mode 100644 include/flatbuffers/CallBuilder.h create mode 100644 src/CallBuilder.cpp diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..77e61eb --- /dev/null +++ b/.clang-format @@ -0,0 +1,12 @@ +BasedOnStyle: LLVM +IndentWidth: 4 +UseTab: Never +ColumnLimit: 100 + +KeepEmptyLinesAtTheStartOfBlocks: true +MaxEmptyLinesToKeep: 2 +AllowShortBlocksOnASingleLine: Never +AllowShortIfStatementsOnASingleLine: Never +AllowShortLoopsOnASingleLine: false +AllowShortFunctionsOnASingleLine: None +ReflowComments: false diff --git a/CMakeLists.txt b/CMakeLists.txt index 7d15333..154e5fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ find_package(Threads REQUIRED) find_package(flatbuffers REQUIRED) find_package(spdlog REQUIRED) -add_library(rpc src/librpc.cpp src/TCPClient.cpp src/UDPClient.cpp src/mDNSDiscoveryService.cpp src/MPIMessageBuilder.cpp +add_library(rpc src/librpc.cpp src/TCPClient.cpp src/UDPClient.cpp src/mDNSDiscoveryService.cpp src/MPIMessageBuilder.cpp src/CallBuilder.cpp include/util/log.h) target_include_directories(rpc PUBLIC diff --git a/CMakeUserPresets.json b/CMakeUserPresets.json index 71aeace..6fc3dc2 100644 --- a/CMakeUserPresets.json +++ b/CMakeUserPresets.json @@ -4,6 +4,7 @@ "conan": {} }, "include": [ - "build/Release/generators/CMakePresets.json" + "build/Release/generators/CMakePresets.json", + "build/RelWithDebInfo/generators/CMakePresets.json" ] } \ No newline at end of file diff --git a/conanfile.py b/conanfile.py index d5e87fa..c62be91 100644 --- a/conanfile.py +++ b/conanfile.py @@ -1,11 +1,13 @@ -from conan import ConanFile -from conan.tools.cmake import CMake, cmake_layout, CMakeToolchain, CMakeDeps -from conan.tools.files import copy import os +from conan import ConanFile +from conan.tools.cmake import CMake, CMakeDeps, CMakeToolchain, cmake_layout +from conan.tools.files import copy + + class MyLibraryConan(ConanFile): name = "librpc" - version = "1.1.6" + version = "1.1.7" settings = "os", "compiler", "build_type", "arch" options = {"shared": [True, False], "fPIC": [True, False]} diff --git a/include/ICommunicationClient.h b/include/ICommunicationClient.h index 65a42e9..74e26b7 100644 --- a/include/ICommunicationClient.h +++ b/include/ICommunicationClient.h @@ -12,4 +12,4 @@ class ICommunicationClient { virtual int send_msg(void *sendbuff, uint32_t len) = 0; }; -#endif //INETWORKCLIENT_H +#endif // INETWORKCLIENT_H diff --git a/include/flatbuffers/CallBuilder.h b/include/flatbuffers/CallBuilder.h new file mode 100644 index 0000000..36b8fc2 --- /dev/null +++ b/include/flatbuffers/CallBuilder.h @@ -0,0 +1,29 @@ + +#ifndef CALLBUILDER_H +#define CALLBUILDER_H + +#include + +#include "SerializedMessage.h" +#include "flatbuffers/flatbuffers.h" +#include "flatbuffers_generated/ReturnCall_generated.h" +#include "flatbuffers_generated/SendCall_generated.h" + +namespace Flatbuffers { + +class CallBuilder { + public: + CallBuilder() : builder_(1024) { + } + + SerializedMessage build_send_call(uint8_t tag, uint8_t unique_id, + const std::vector ¶meters); + + static const Messaging::ReturnCall *parse_return_call(const uint8_t *buffer); + + private: + flatbuffers::FlatBufferBuilder builder_; +}; +} // namespace Flatbuffers + +#endif // CALLBUILDER_H diff --git a/include/flatbuffers/MPIMessageBuilder.h b/include/flatbuffers/MPIMessageBuilder.h index 6b2567d..08e1c79 100644 --- a/include/flatbuffers/MPIMessageBuilder.h +++ b/include/flatbuffers/MPIMessageBuilder.h @@ -30,4 +30,4 @@ class MPIMessageBuilder { }; } // namespace Flatbuffers -#endif //MPIMESSAGEBUILDER_H +#endif // MPIMESSAGEBUILDER_H diff --git a/include/flatbuffers/SerializedMessage.h b/include/flatbuffers/SerializedMessage.h index 8e19cc9..e797262 100644 --- a/include/flatbuffers/SerializedMessage.h +++ b/include/flatbuffers/SerializedMessage.h @@ -12,4 +12,4 @@ struct SerializedMessage { }; } // namespace Flatbuffers -#endif //SERIALIZEDMESSAGE_H +#endif // SERIALIZEDMESSAGE_H diff --git a/include/flatbuffers_generated/MPIMessage_generated.h b/include/flatbuffers_generated/MPIMessage_generated.h index 8dccc95..2d9ddfd 100644 --- a/include/flatbuffers_generated/MPIMessage_generated.h +++ b/include/flatbuffers_generated/MPIMessage_generated.h @@ -1,6 +1,5 @@ // automatically generated by the FlatBuffers compiler, do not modify - #ifndef FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_ #define FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_ diff --git a/include/flatbuffers_generated/ReturnCall_generated.h b/include/flatbuffers_generated/ReturnCall_generated.h index 0f4f951..f40ed08 100644 --- a/include/flatbuffers_generated/ReturnCall_generated.h +++ b/include/flatbuffers_generated/ReturnCall_generated.h @@ -1,6 +1,5 @@ // automatically generated by the FlatBuffers compiler, do not modify - #ifndef FLATBUFFERS_GENERATED_RETURNCALL_MESSAGING_H_ #define FLATBUFFERS_GENERATED_RETURNCALL_MESSAGING_H_ @@ -19,110 +18,95 @@ struct ReturnCall; struct ReturnCallBuilder; struct ReturnCall FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table { - typedef ReturnCallBuilder Builder; - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_UNIQUE_ID = 4, - VT_LENGTH = 6, - VT_RETURN_VALUE = 8 - }; - uint8_t unique_id() const { - return GetField(VT_UNIQUE_ID, 0); - } - uint16_t length() const { - return GetField(VT_LENGTH, 0); - } - const ::flatbuffers::Vector *return_value() const { - return GetPointer *>(VT_RETURN_VALUE); - } - bool Verify(::flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyField(verifier, VT_UNIQUE_ID, 1) && - VerifyField(verifier, VT_LENGTH, 2) && - VerifyOffset(verifier, VT_RETURN_VALUE) && - verifier.VerifyVector(return_value()) && - verifier.EndTable(); - } + typedef ReturnCallBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_UNIQUE_ID = 4, + VT_LENGTH = 6, + VT_RETURN_VALUE = 8 + }; + uint8_t unique_id() const { + return GetField(VT_UNIQUE_ID, 0); + } + uint16_t length() const { + return GetField(VT_LENGTH, 0); + } + const ::flatbuffers::Vector *return_value() const { + return GetPointer *>(VT_RETURN_VALUE); + } + bool Verify(::flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && VerifyField(verifier, VT_UNIQUE_ID, 1) && + VerifyField(verifier, VT_LENGTH, 2) && + VerifyOffset(verifier, VT_RETURN_VALUE) && verifier.VerifyVector(return_value()) && + verifier.EndTable(); + } }; struct ReturnCallBuilder { - typedef ReturnCall Table; - ::flatbuffers::FlatBufferBuilder &fbb_; - ::flatbuffers::uoffset_t start_; - void add_unique_id(uint8_t unique_id) { - fbb_.AddElement(ReturnCall::VT_UNIQUE_ID, unique_id, 0); - } - void add_length(uint16_t length) { - fbb_.AddElement(ReturnCall::VT_LENGTH, length, 0); - } - void add_return_value(::flatbuffers::Offset<::flatbuffers::Vector> return_value) { - fbb_.AddOffset(ReturnCall::VT_RETURN_VALUE, return_value); - } - explicit ReturnCallBuilder(::flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - ::flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = ::flatbuffers::Offset(end); - return o; - } + typedef ReturnCall Table; + ::flatbuffers::FlatBufferBuilder &fbb_; + ::flatbuffers::uoffset_t start_; + void add_unique_id(uint8_t unique_id) { + fbb_.AddElement(ReturnCall::VT_UNIQUE_ID, unique_id, 0); + } + void add_length(uint16_t length) { + fbb_.AddElement(ReturnCall::VT_LENGTH, length, 0); + } + void add_return_value(::flatbuffers::Offset<::flatbuffers::Vector> return_value) { + fbb_.AddOffset(ReturnCall::VT_RETURN_VALUE, return_value); + } + explicit ReturnCallBuilder(::flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + ::flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = ::flatbuffers::Offset(end); + return o; + } }; -inline ::flatbuffers::Offset CreateReturnCall( - ::flatbuffers::FlatBufferBuilder &_fbb, - uint8_t unique_id = 0, - uint16_t length = 0, - ::flatbuffers::Offset<::flatbuffers::Vector> return_value = 0) { - ReturnCallBuilder builder_(_fbb); - builder_.add_return_value(return_value); - builder_.add_length(length); - builder_.add_unique_id(unique_id); - return builder_.Finish(); +inline ::flatbuffers::Offset +CreateReturnCall(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t unique_id = 0, uint16_t length = 0, + ::flatbuffers::Offset<::flatbuffers::Vector> return_value = 0) { + ReturnCallBuilder builder_(_fbb); + builder_.add_return_value(return_value); + builder_.add_length(length); + builder_.add_unique_id(unique_id); + return builder_.Finish(); } -inline ::flatbuffers::Offset CreateReturnCallDirect( - ::flatbuffers::FlatBufferBuilder &_fbb, - uint8_t unique_id = 0, - uint16_t length = 0, - const std::vector *return_value = nullptr) { - auto return_value__ = return_value ? _fbb.CreateVector(*return_value) : 0; - return Messaging::CreateReturnCall( - _fbb, - unique_id, - length, - return_value__); +inline ::flatbuffers::Offset +CreateReturnCallDirect(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t unique_id = 0, + uint16_t length = 0, const std::vector *return_value = nullptr) { + auto return_value__ = return_value ? _fbb.CreateVector(*return_value) : 0; + return Messaging::CreateReturnCall(_fbb, unique_id, length, return_value__); } inline const Messaging::ReturnCall *GetReturnCall(const void *buf) { - return ::flatbuffers::GetRoot(buf); + return ::flatbuffers::GetRoot(buf); } inline const Messaging::ReturnCall *GetSizePrefixedReturnCall(const void *buf) { - return ::flatbuffers::GetSizePrefixedRoot(buf); + return ::flatbuffers::GetSizePrefixedRoot(buf); } -inline bool VerifyReturnCallBuffer( - ::flatbuffers::Verifier &verifier) { - return verifier.VerifyBuffer(nullptr); +inline bool VerifyReturnCallBuffer(::flatbuffers::Verifier &verifier) { + return verifier.VerifyBuffer(nullptr); } -inline bool VerifySizePrefixedReturnCallBuffer( - ::flatbuffers::Verifier &verifier) { - return verifier.VerifySizePrefixedBuffer(nullptr); +inline bool VerifySizePrefixedReturnCallBuffer(::flatbuffers::Verifier &verifier) { + return verifier.VerifySizePrefixedBuffer(nullptr); } -inline void FinishReturnCallBuffer( - ::flatbuffers::FlatBufferBuilder &fbb, - ::flatbuffers::Offset root) { - fbb.Finish(root); +inline void FinishReturnCallBuffer(::flatbuffers::FlatBufferBuilder &fbb, + ::flatbuffers::Offset root) { + fbb.Finish(root); } -inline void FinishSizePrefixedReturnCallBuffer( - ::flatbuffers::FlatBufferBuilder &fbb, - ::flatbuffers::Offset root) { - fbb.FinishSizePrefixed(root); +inline void FinishSizePrefixedReturnCallBuffer(::flatbuffers::FlatBufferBuilder &fbb, + ::flatbuffers::Offset root) { + fbb.FinishSizePrefixed(root); } -} // namespace Messaging +} // namespace Messaging -#endif // FLATBUFFERS_GENERATED_RETURNCALL_MESSAGING_H_ +#endif // FLATBUFFERS_GENERATED_RETURNCALL_MESSAGING_H_ diff --git a/include/flatbuffers_generated/RobotModule_generated.h b/include/flatbuffers_generated/RobotModule_generated.h index 47071c6..342aeb9 100644 --- a/include/flatbuffers_generated/RobotModule_generated.h +++ b/include/flatbuffers_generated/RobotModule_generated.h @@ -1,6 +1,5 @@ // automatically generated by the FlatBuffers compiler, do not modify - #ifndef FLATBUFFERS_GENERATED_ROBOTMODULE_H_ #define FLATBUFFERS_GENERATED_ROBOTMODULE_H_ @@ -8,7 +7,7 @@ // Ensure the included flatbuffers.h is the same version as when this file was // generated, otherwise it may not be compatible. -//static_assert(FLATBUFFERS_VERSION_MAJOR == 25 && +// static_assert(FLATBUFFERS_VERSION_MAJOR == 25 && // FLATBUFFERS_VERSION_MINOR == 2 && // FLATBUFFERS_VERSION_REVISION == 10, // "Non-compatible flatbuffers version included"); diff --git a/include/flatbuffers_generated/SendCall_generated.h b/include/flatbuffers_generated/SendCall_generated.h index 8f2e626..d96091b 100644 --- a/include/flatbuffers_generated/SendCall_generated.h +++ b/include/flatbuffers_generated/SendCall_generated.h @@ -1,6 +1,5 @@ // automatically generated by the FlatBuffers compiler, do not modify - #ifndef FLATBUFFERS_GENERATED_SENDCALL_MESSAGING_H_ #define FLATBUFFERS_GENERATED_SENDCALL_MESSAGING_H_ @@ -19,122 +18,105 @@ struct SendCall; struct SendCallBuilder; struct SendCall FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table { - typedef SendCallBuilder Builder; - enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_TAG = 4, - VT_UNIQUE_ID = 6, - VT_LENGTH = 8, - VT_PARAMETERS = 10 - }; - uint8_t tag() const { - return GetField(VT_TAG, 0); - } - uint8_t unique_id() const { - return GetField(VT_UNIQUE_ID, 0); - } - uint16_t length() const { - return GetField(VT_LENGTH, 0); - } - const ::flatbuffers::Vector *parameters() const { - return GetPointer *>(VT_PARAMETERS); - } - bool Verify(::flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyField(verifier, VT_TAG, 1) && - VerifyField(verifier, VT_UNIQUE_ID, 1) && - VerifyField(verifier, VT_LENGTH, 2) && - VerifyOffset(verifier, VT_PARAMETERS) && - verifier.VerifyVector(parameters()) && - verifier.EndTable(); - } + typedef SendCallBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_TAG = 4, + VT_UNIQUE_ID = 6, + VT_LENGTH = 8, + VT_PARAMETERS = 10 + }; + uint8_t tag() const { + return GetField(VT_TAG, 0); + } + uint8_t unique_id() const { + return GetField(VT_UNIQUE_ID, 0); + } + uint16_t length() const { + return GetField(VT_LENGTH, 0); + } + const ::flatbuffers::Vector *parameters() const { + return GetPointer *>(VT_PARAMETERS); + } + bool Verify(::flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && VerifyField(verifier, VT_TAG, 1) && + VerifyField(verifier, VT_UNIQUE_ID, 1) && + VerifyField(verifier, VT_LENGTH, 2) && + VerifyOffset(verifier, VT_PARAMETERS) && verifier.VerifyVector(parameters()) && + verifier.EndTable(); + } }; struct SendCallBuilder { - typedef SendCall Table; - ::flatbuffers::FlatBufferBuilder &fbb_; - ::flatbuffers::uoffset_t start_; - void add_tag(uint8_t tag) { - fbb_.AddElement(SendCall::VT_TAG, tag, 0); - } - void add_unique_id(uint8_t unique_id) { - fbb_.AddElement(SendCall::VT_UNIQUE_ID, unique_id, 0); - } - void add_length(uint16_t length) { - fbb_.AddElement(SendCall::VT_LENGTH, length, 0); - } - void add_parameters(::flatbuffers::Offset<::flatbuffers::Vector> parameters) { - fbb_.AddOffset(SendCall::VT_PARAMETERS, parameters); - } - explicit SendCallBuilder(::flatbuffers::FlatBufferBuilder &_fbb) - : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - ::flatbuffers::Offset Finish() { - const auto end = fbb_.EndTable(start_); - auto o = ::flatbuffers::Offset(end); - return o; - } + typedef SendCall Table; + ::flatbuffers::FlatBufferBuilder &fbb_; + ::flatbuffers::uoffset_t start_; + void add_tag(uint8_t tag) { + fbb_.AddElement(SendCall::VT_TAG, tag, 0); + } + void add_unique_id(uint8_t unique_id) { + fbb_.AddElement(SendCall::VT_UNIQUE_ID, unique_id, 0); + } + void add_length(uint16_t length) { + fbb_.AddElement(SendCall::VT_LENGTH, length, 0); + } + void add_parameters(::flatbuffers::Offset<::flatbuffers::Vector> parameters) { + fbb_.AddOffset(SendCall::VT_PARAMETERS, parameters); + } + explicit SendCallBuilder(::flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + ::flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = ::flatbuffers::Offset(end); + return o; + } }; -inline ::flatbuffers::Offset CreateSendCall( - ::flatbuffers::FlatBufferBuilder &_fbb, - uint8_t tag = 0, - uint8_t unique_id = 0, - uint16_t length = 0, - ::flatbuffers::Offset<::flatbuffers::Vector> parameters = 0) { - SendCallBuilder builder_(_fbb); - builder_.add_parameters(parameters); - builder_.add_length(length); - builder_.add_unique_id(unique_id); - builder_.add_tag(tag); - return builder_.Finish(); +inline ::flatbuffers::Offset +CreateSendCall(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t tag = 0, uint8_t unique_id = 0, + uint16_t length = 0, + ::flatbuffers::Offset<::flatbuffers::Vector> parameters = 0) { + SendCallBuilder builder_(_fbb); + builder_.add_parameters(parameters); + builder_.add_length(length); + builder_.add_unique_id(unique_id); + builder_.add_tag(tag); + return builder_.Finish(); } -inline ::flatbuffers::Offset CreateSendCallDirect( - ::flatbuffers::FlatBufferBuilder &_fbb, - uint8_t tag = 0, - uint8_t unique_id = 0, - uint16_t length = 0, - const std::vector *parameters = nullptr) { - auto parameters__ = parameters ? _fbb.CreateVector(*parameters) : 0; - return Messaging::CreateSendCall( - _fbb, - tag, - unique_id, - length, - parameters__); +inline ::flatbuffers::Offset +CreateSendCallDirect(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t tag = 0, uint8_t unique_id = 0, + uint16_t length = 0, const std::vector *parameters = nullptr) { + auto parameters__ = parameters ? _fbb.CreateVector(*parameters) : 0; + return Messaging::CreateSendCall(_fbb, tag, unique_id, length, parameters__); } inline const Messaging::SendCall *GetSendCall(const void *buf) { - return ::flatbuffers::GetRoot(buf); + return ::flatbuffers::GetRoot(buf); } inline const Messaging::SendCall *GetSizePrefixedSendCall(const void *buf) { - return ::flatbuffers::GetSizePrefixedRoot(buf); + return ::flatbuffers::GetSizePrefixedRoot(buf); } -inline bool VerifySendCallBuffer( - ::flatbuffers::Verifier &verifier) { - return verifier.VerifyBuffer(nullptr); +inline bool VerifySendCallBuffer(::flatbuffers::Verifier &verifier) { + return verifier.VerifyBuffer(nullptr); } -inline bool VerifySizePrefixedSendCallBuffer( - ::flatbuffers::Verifier &verifier) { - return verifier.VerifySizePrefixedBuffer(nullptr); +inline bool VerifySizePrefixedSendCallBuffer(::flatbuffers::Verifier &verifier) { + return verifier.VerifySizePrefixedBuffer(nullptr); } -inline void FinishSendCallBuffer( - ::flatbuffers::FlatBufferBuilder &fbb, - ::flatbuffers::Offset root) { - fbb.Finish(root); +inline void FinishSendCallBuffer(::flatbuffers::FlatBufferBuilder &fbb, + ::flatbuffers::Offset root) { + fbb.Finish(root); } -inline void FinishSizePrefixedSendCallBuffer( - ::flatbuffers::FlatBufferBuilder &fbb, - ::flatbuffers::Offset root) { - fbb.FinishSizePrefixed(root); +inline void FinishSizePrefixedSendCallBuffer(::flatbuffers::FlatBufferBuilder &fbb, + ::flatbuffers::Offset root) { + fbb.FinishSizePrefixed(root); } -} // namespace Messaging +} // namespace Messaging -#endif // FLATBUFFERS_GENERATED_SENDCALL_MESSAGING_H_ +#endif // FLATBUFFERS_GENERATED_SENDCALL_MESSAGING_H_ diff --git a/include/librpc.h b/include/librpc.h index 576b0dc..8e04406 100644 --- a/include/librpc.h +++ b/include/librpc.h @@ -3,15 +3,18 @@ #include #include +#include #include #include #include "BlockingQueue.h" #include "constants.h" +#include "flatbuffers/CallBuilder.h" #include "mDNSDiscoveryService.h" - constexpr auto RX_QUEUE_SIZE = 100; +constexpr auto FN_CALL_TAG = 100; // reserved tag for RPC functionality +constexpr auto FN_CALL_TIMEOUT = std::chrono::seconds(3); struct SizeAndSource { size_t bytes_written; @@ -22,6 +25,7 @@ class MessagingInterface { public: MessagingInterface() : m_stop_flag(false), m_rx_thread(std::thread(&MessagingInterface::handle_recv, this)), + m_fn_rx_thread(std::thread(&MessagingInterface::handle_fn_recv, this)), m_rx_queue(std::make_shared>>>( RX_QUEUE_SIZE)) { #ifdef _WIN32 @@ -37,23 +41,36 @@ class MessagingInterface { int broadcast(uint8_t *buffer, size_t size, bool durable); // todo std::optional recv(uint8_t *buffer, size_t size, uint8_t tag); int sendrecv(uint8_t *send_buffer, size_t send_size, uint8_t dest, uint8_t send_tag, - uint8_t *recv_buffer, size_t recv_size, uint8_t recv_tag); // todo + uint8_t *recv_buffer, size_t recv_size, + uint8_t recv_tag); // todo + std::optional>> + remote_call(uint8_t function_tag, uint8_t module, const std::vector ¶meters); std::unordered_set find_connected_modules(std::chrono::duration scan_duration); private: void handle_recv(); + void handle_fn_recv(); uint16_t m_sequence_number = 0; + uint8_t unique_fn_call_id = 0; // this is designed to overflow, change to uint16_t if we plan on + // having way more calls per second. std::unordered_map> m_id_to_lossless_client; std::unordered_map> m_id_to_lossy_client; std::unordered_map>>>> m_tag_to_queue_map; + // The semaphore needs to be in a unique_ptr, since it is not copyable or + // movable unordered_maps need to copy/move to reshuffle. + std::unordered_map> m_fn_call_to_semaphore; + std::unordered_map>> m_fn_call_to_result; std::unique_ptr m_discovery_service; std::atomic m_stop_flag; std::thread m_rx_thread; + std::thread m_fn_rx_thread; std::shared_ptr>>> m_rx_queue; std::shared_mutex m_client_mutex; std::shared_mutex m_scan_mutex; + std::mutex m_fn_call_mutex; + std::mutex m_tag_queue_mutex; }; #endif // RPC_LIBRARY_H diff --git a/include/mDNSRobotModule.h b/include/mDNSRobotModule.h index 595e96d..7e80f27 100644 --- a/include/mDNSRobotModule.h +++ b/include/mDNSRobotModule.h @@ -16,4 +16,4 @@ struct mDNSRobotModule { std::vector connected_module_ids; }; -#endif //ROBOTMODULEINSTANCE_H +#endif // ROBOTMODULEINSTANCE_H diff --git a/include/util/log.h b/include/util/log.h index ab94a9c..31a69ac 100644 --- a/include/util/log.h +++ b/include/util/log.h @@ -29,4 +29,4 @@ void print_errno() { #endif -#endif //LOG_H +#endif // LOG_H diff --git a/include/util/string.h b/include/util/string.h index 37cf16e..45b8ef9 100644 --- a/include/util/string.h +++ b/include/util/string.h @@ -20,4 +20,4 @@ inline std::vector split(const std::string &str, const char delimit return result; } -#endif //STRING_H +#endif // STRING_H diff --git a/src/CallBuilder.cpp b/src/CallBuilder.cpp new file mode 100644 index 0000000..ffcbcf4 --- /dev/null +++ b/src/CallBuilder.cpp @@ -0,0 +1,23 @@ +#include "flatbuffers/CallBuilder.h" +#include "flatbuffers/SerializedMessage.h" + +namespace Flatbuffers { +SerializedMessage CallBuilder::build_send_call(uint8_t tag, uint8_t unique_id, + const std::vector ¶meters) { + builder_.Clear(); + + const auto parameters_vector = builder_.CreateVector(parameters); + + const auto message = Messaging::CreateSendCall( + builder_, tag, unique_id, static_cast(parameters.size()), parameters_vector); + + builder_.Finish(message); + + return {builder_.GetBufferPointer(), builder_.GetSize()}; +} + +const Messaging::ReturnCall *CallBuilder::parse_return_call(const uint8_t *buffer) { + return flatbuffers::GetRoot(buffer); +} + +} // namespace Flatbuffers diff --git a/src/librpc.cpp b/src/librpc.cpp index 7dfcea0..ffbf627 100644 --- a/src/librpc.cpp +++ b/src/librpc.cpp @@ -1,12 +1,19 @@ #include "librpc.h" +#include "flatbuffers_generated/ReturnCall_generated.h" +#include +#include +#include +#include #include #include +#include #include #undef min #include +#include "flatbuffers/CallBuilder.h" #include "flatbuffers/MPIMessageBuilder.h" #include "spdlog/spdlog.h" @@ -14,10 +21,12 @@ constexpr auto MAX_RECV_WAIT_TIME = std::chrono::seconds(3); constexpr auto PER_TAG_MAX_QUEUE_SIZE = 50; constexpr auto MAX_WAIT_TIME_TAG_ENQUEUE = std::chrono::milliseconds(250); constexpr auto MAX_WAIT_TIME_RX_THREAD_DEQUEUE = std::chrono::milliseconds(250); +constexpr auto FN_RETURN_BUFFER_SIZE = 1024; MessagingInterface::~MessagingInterface() { m_stop_flag = true; m_rx_thread.join(); + m_fn_rx_thread.join(); #ifdef _WIN32 WSACleanup(); @@ -51,11 +60,13 @@ int MessagingInterface::broadcast(uint8_t *buffer, size_t size, bool durable) { std::optional MessagingInterface::recv(uint8_t *buffer, const size_t size, uint8_t tag) { + std::unique_lock lock(m_tag_queue_mutex); if (!m_tag_to_queue_map.contains(tag)) { m_tag_to_queue_map.insert( {tag, std::make_unique>>>( PER_TAG_MAX_QUEUE_SIZE)}); } + lock.unlock(); const auto data = m_tag_to_queue_map[tag]->dequeue(MAX_RECV_WAIT_TIME); @@ -82,7 +93,8 @@ int MessagingInterface::sendrecv(uint8_t *send_buffer, size_t send_size, uint8_t std::unordered_set MessagingInterface::find_connected_modules(const std::chrono::duration scan_duration) { - // Cannot just skip the call if already running, since the caller needs the list of modules. + // Cannot just skip the call if already running, since the caller needs the + // list of modules. std::unique_lock scan_lock(m_scan_mutex); const auto foundModules = this->m_discovery_service->find_modules(scan_duration); scan_lock.unlock(); @@ -120,15 +132,101 @@ void MessagingInterface::handle_recv() { const auto &mpi_message = Flatbuffers::MPIMessageBuilder::parse_mpi_message(data.value()->data()); + std::unique_lock lock(m_tag_queue_mutex); if (!m_tag_to_queue_map.contains(mpi_message->tag())) { m_tag_to_queue_map.insert( {mpi_message->tag(), std::make_unique>>>( PER_TAG_MAX_QUEUE_SIZE)}); } + lock.unlock(); m_tag_to_queue_map[mpi_message->tag()]->enqueue(std::move(data.value()), MAX_WAIT_TIME_TAG_ENQUEUE); } } } + +std::optional>> +MessagingInterface::remote_call(uint8_t function_tag, uint8_t module_id, + const std::vector ¶meters) { + std::unique_lock lock(m_fn_call_mutex); + const auto unique_id = unique_fn_call_id++; + auto sem = std::make_unique>(0); + m_fn_call_to_semaphore.insert_or_assign(unique_id, std::move(sem)); + lock.unlock(); + + Flatbuffers::CallBuilder builder{}; + auto [data, size] = builder.build_send_call(function_tag, unique_id, parameters); + + // Assume durable transmission, non-durable RPC calls do not make sense. + // If a message is lost, we will block unnecessarily until the timeout. + // todo: is this thread safe? especially if other threads will be calling send + // themselves. + send((uint8_t *)data, size, module_id, FN_CALL_TAG, true); + + if (m_fn_call_to_semaphore[unique_id]->try_acquire_for(FN_CALL_TIMEOUT)) { + lock.lock(); + + if (!m_fn_call_to_result[unique_id]) { + m_fn_call_to_semaphore.erase(unique_id); + return std::nullopt; + } + + auto result = std::move(m_fn_call_to_result[unique_id]); + m_fn_call_to_result.erase(unique_id); + m_fn_call_to_semaphore.erase(unique_id); + return std::move(result); + } + + lock.lock(); + m_fn_call_to_semaphore.erase(unique_id); + + return std::nullopt; +} + +void MessagingInterface::handle_fn_recv() { + while (!m_stop_flag) { + auto buffer = std::make_unique>(); + buffer->resize(FN_RETURN_BUFFER_SIZE); + + std::optional maybe_result; + SizeAndSource result; + do { + maybe_result = recv(buffer->data(), FN_RETURN_BUFFER_SIZE, FN_CALL_TAG); + } while (!m_stop_flag && !maybe_result); + if (m_stop_flag) { + return; + } + + Flatbuffers::CallBuilder builder{}; + std::unique_lock lock(m_fn_call_mutex); + result = *maybe_result; + + flatbuffers::Verifier verifier(buffer->data(), result.bytes_written); + bool ok = Messaging::VerifyReturnCallBuffer(verifier); + if (!ok) { + spdlog::error("[LibRPC] Got an invalid return buffer"); + continue; + } + + const auto return_data = builder.parse_return_call(buffer->data()); + if (return_data->length() > FN_RETURN_BUFFER_SIZE) { + spdlog::warn("[LibRPC] Got a return buffer with return data that is too large"); + continue; + } + + auto it = m_fn_call_to_semaphore.find(return_data->unique_id()); + if (it == m_fn_call_to_semaphore.end() || !it->second) { + spdlog::warn("[LibRPC] Previously timed out RPC call completed, " + "discarding result"); + continue; + } + + auto raw_data = std::make_unique>(); + raw_data->resize(return_data->length()); + std::memcpy(raw_data->data(), return_data->return_value()->data(), return_data->length()); + m_fn_call_to_result[return_data->unique_id()] = std::move(raw_data); + it->second->release(); + } +}