From d89c636e2f6436c43665dfe5b2655fd9406e78e3 Mon Sep 17 00:00:00 2001 From: Johnathon Slightham Date: Sat, 24 Jan 2026 10:26:37 -0500 Subject: [PATCH] Prepare files for public release --- .gitignore | 5 + CMakeLists.txt | 25 ++ CMakeUserPresets.json | 9 + README.md | 93 +++++ build_rpc_library.sh | 62 +++ compile_commands.json | 1 + conanfile.py | 44 +++ include/BlockingQueue.h | 53 +++ include/ICommunicationClient.h | 15 + include/IDiscoveryService.h | 24 ++ include/TCPClient.h | 54 +++ include/UDPClient.h | 53 +++ include/constants.h | 7 + include/flatbuffers/MPIMessageBuilder.h | 33 ++ include/flatbuffers/SerializedMessage.h | 15 + .../MPIMessage_generated.h | 188 +++++++++ .../RobotModule_generated.h | 286 ++++++++++++++ include/librpc.h | 59 +++ include/mDNSDiscoveryService.h | 56 +++ include/mDNSRobotModule.h | 19 + include/util/ip.h | 24 ++ include/util/log.h | 32 ++ include/util/string.h | 23 ++ src/MPIMessageBuilder.cpp | 31 ++ src/TCPClient.cpp | 114 ++++++ src/UDPClient.cpp | 187 +++++++++ src/librpc.cpp | 134 +++++++ src/mDNSDiscoveryService.cpp | 367 ++++++++++++++++++ 28 files changed, 2013 insertions(+) create mode 100644 .gitignore create mode 100644 CMakeLists.txt create mode 100644 CMakeUserPresets.json create mode 100644 README.md create mode 100755 build_rpc_library.sh create mode 120000 compile_commands.json create mode 100644 conanfile.py create mode 100644 include/BlockingQueue.h create mode 100644 include/ICommunicationClient.h create mode 100644 include/IDiscoveryService.h create mode 100644 include/TCPClient.h create mode 100644 include/UDPClient.h create mode 100644 include/constants.h create mode 100644 include/flatbuffers/MPIMessageBuilder.h create mode 100644 include/flatbuffers/SerializedMessage.h create mode 100644 include/flatbuffers_generated/MPIMessage_generated.h create mode 100644 include/flatbuffers_generated/RobotModule_generated.h create mode 100644 include/librpc.h create mode 100644 include/mDNSDiscoveryService.h create mode 100644 include/mDNSRobotModule.h create mode 100644 include/util/ip.h create mode 100644 include/util/log.h create mode 100644 include/util/string.h create mode 100644 src/MPIMessageBuilder.cpp create mode 100644 src/TCPClient.cpp create mode 100644 src/UDPClient.cpp create mode 100644 src/librpc.cpp create mode 100644 src/mDNSDiscoveryService.cpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7b274aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/build +.DS_Store +/cmake-build-debug +/.cache +**/.DS_Store diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..7d15333 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.15) +project(librpc) + +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +set(CMAKE_COMPILE_WARNING_AS_ERROR ON) + +find_package(Threads REQUIRED) +find_package(flatbuffers REQUIRED) +find_package(spdlog REQUIRED) + +add_library(rpc src/librpc.cpp src/TCPClient.cpp src/UDPClient.cpp src/mDNSDiscoveryService.cpp src/MPIMessageBuilder.cpp + include/util/log.h) +target_include_directories(rpc + PUBLIC + $ + $ +) + +target_link_libraries(rpc PUBLIC flatbuffers::flatbuffers spdlog::spdlog) + +set_property(TARGET rpc PROPERTY CXX_STANDARD 23) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +install(TARGETS rpc DESTINATION lib) +install(DIRECTORY include/ DESTINATION include) diff --git a/CMakeUserPresets.json b/CMakeUserPresets.json new file mode 100644 index 0000000..71aeace --- /dev/null +++ b/CMakeUserPresets.json @@ -0,0 +1,9 @@ +{ + "version": 4, + "vendor": { + "conan": {} + }, + "include": [ + "build/Release/generators/CMakePresets.json" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..02b9539 --- /dev/null +++ b/README.md @@ -0,0 +1,93 @@ +# RPC Library +The RPC library provides an interface to interact directly with the BotChain devices. The library is managed by the [conan](https://conan.io) package manager, and is consumed by BotChain's higher level libraries. This library provides the following features: +- mDNS discovery of modules +- TCP connection to modules +- UDP connection to modules +- An MPI like messaging interface + +The latest releases of the RPC library can be found in our [artifactory](http://jslightham.com:8082), or on [Jenkins](https://jenkins.jslightham.com/job/Botchain/job/librpc/). + +## Platform Support +- MacOS (Apple silicon) +- MacoS (x86) +- Ubuntu (x86) +- Windows (x86) + +## Setup +### MacOS +Install xcode command line tools (if you do not already have them) +``` +xcode-select --install +``` + +Install conan and dependencies +``` +brew install conan cmake ninja +``` + +Generate a conan profile +``` +conan profile detect --force +``` + +### Ubuntu +On newer versions of Ubuntu, the package manager is responsible for managing python packages. We use `pipx` to create a virtual environment. + +Install `pipx` and dependencies +``` +sudo apt install pipx cmake ninja-build +``` + +Install conan with pipx +``` +pipx install conan +``` + +Generate a conan profile +``` +conan profile detect --force +``` + +### Artifactory Setup (optional) +These instructions should only be followed after you have completed all setup steps for your platform. + +This is an optional section that is only required if you plan on uploading releases to the artifactory manually. +Releases tagged with new versions in `conanfile.py` that are merged into the main branch are automatically uploaded to the artifactory by [Jenkins](https://jenkins.jslightham.com/job/Botchain/job/librpc/). + +Add the artifactory +``` +conan remote add artifactory http://jslightham.com:8081/artifactory/api/conan/botchain +``` + +Add credentials to connect to the remote artifactory +``` +conan remote login artifactory -p +``` + +Contact Johnathon to get login credentials for the artifactory. + +## Development +``` +# On macos or Linux, you can run +./build_rpc_library + +# Building manually +build_type=Release # change to the build type you want (ex. Debug, RelWithDebInfo). +conan install . --build=missing --output-folder=. -s build_type="${build_type}" +cmake -S . -B "build/${build_type}" -DCMAKE_TOOLCHAIN_FILE="$build/${build_type}/generators/conan_toolchain.cmake" -DCMAKE_BUILD_TYPE="${build_type}" +cmake --build "build/${build_type}" --config "${build_type}" +conan create . +``` + +## Building For Release +Bump the version in `conanfile.py`. + +Create the package +``` +conan create . +``` + +Upload to the artifactory +``` +conan upload librpc -r=artifactory +``` diff --git a/build_rpc_library.sh b/build_rpc_library.sh new file mode 100755 index 0000000..15fd27f --- /dev/null +++ b/build_rpc_library.sh @@ -0,0 +1,62 @@ +#!/bin/bash +set -e + +function usage() { + echo "Usage:" + echo "${SCRIPT_NAME} [-b ] [-h]" + echo " -b | --build-type - The build type (ie. Release, Debug, RelWithDebInfo)" + echo " -h | --help - Print usage" + echo "Example:" + echo "${SCRIPT_NAME} -b Release" + exit 1 +} + +function parse_args() { + while [ -n "${1}" ]; do + case "${1}" in + -h | --help) + usage + ;; + -b | --build-type) + [ -n "${2}" ] || usage || echo "ERROR: Not enough parameters" + build_type="${2}" + shift 2 + ;; + -d | --disable-format) + disable_format=true + shift 1 + ;; + *) + echo "ERROR: Invalid parameter. Exiting..." + usage + exit 1 + ;; + esac + done +} + +function check_pre_req() { + if [ "${build_type}" != "Debug" ] && [ "${build_type}" != "Release" ] && [ "${build_type}" != "RelWithDebInfo" ]; then + usage + echo "ERROR: Build type must be one of: Release, Debug, RelWithDebInfo" + fi +} + +SCRIPT_NAME="$(basename "${0}")" +ROOT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +build_type="" +disable_format=false +parse_args "${@}" +check_pre_req + +if [ "$disable_format" != "true" ]; then + echo "Formatting with clang-format..." + find "${ROOT_DIR}" -iname '*.h' -o -iname '*.cpp' | xargs clang-format -i -style=file +fi + +echo "Building..." +conan install "${ROOT_DIR}" --build=missing --output-folder="${ROOT_DIR}" -s build_type="${build_type}" +cmake -S "${ROOT_DIR}" -B "${ROOT_DIR}/build/${build_type}" -DCMAKE_TOOLCHAIN_FILE="${ROOT_DIR}/build/${build_type}/generators/conan_toolchain.cmake" -DCMAKE_BUILD_TYPE="${build_type}" +cmake --build "${ROOT_DIR}/build/${build_type}" --config "${build_type}" +conan create . diff --git a/compile_commands.json b/compile_commands.json new file mode 120000 index 0000000..195eeaf --- /dev/null +++ b/compile_commands.json @@ -0,0 +1 @@ +build/Release/compile_commands.json \ No newline at end of file diff --git a/conanfile.py b/conanfile.py new file mode 100644 index 0000000..d5e87fa --- /dev/null +++ b/conanfile.py @@ -0,0 +1,44 @@ +from conan import ConanFile +from conan.tools.cmake import CMake, cmake_layout, CMakeToolchain, CMakeDeps +from conan.tools.files import copy +import os + +class MyLibraryConan(ConanFile): + name = "librpc" + version = "1.1.6" + + settings = "os", "compiler", "build_type", "arch" + options = {"shared": [True, False], "fPIC": [True, False]} + default_options = {"shared": False, "fPIC": True} + + exports_sources = "CMakeLists.txt", "src/*", "include/*" + + def layout(self): + cmake_layout(self) + + def generate(self): + deps = CMakeDeps(self) + deps.generate() + tc = CMakeToolchain(self) + tc.generate() + + def build(self): + cmake = CMake(self) + cmake.configure() + cmake.build() + + def package(self): + cmake = CMake(self) + cmake.install() + + def package_info(self): + self.cpp_info.libs = ["rpc"] + self.cpp_info.includedirs = ["include"] + + def requirements(self): + self.requires("flatbuffers/24.12.23") + self.requires("spdlog/1.16.0") + + def configure(self): + if self.settings.os == "Linux": + self.options.fPIC = True diff --git a/include/BlockingQueue.h b/include/BlockingQueue.h new file mode 100644 index 0000000..1f37761 --- /dev/null +++ b/include/BlockingQueue.h @@ -0,0 +1,53 @@ +// +// Created by Johnathon Slightham on 2025-07-10. +// + +#ifndef BLOCKINGQUEUE_H +#define BLOCKINGQUEUE_H + +#include +#include +#include +#include +#include + +template class BlockingQueue { + public: + explicit BlockingQueue(const size_t capacity) : m_capacity(capacity) { + } + + // Enqueue with timeout. Returns true on success, false on timeout. + bool enqueue(T &&item, std::chrono::milliseconds max_wait) { + std::unique_lock lock(m_mutex); + if (!m_cond_not_full.wait_for(lock, max_wait, + [this]() { return m_queue.size() < m_capacity; })) { + return false; + } + + m_queue.push(std::move(item)); + m_cond_not_empty.notify_one(); + return true; + } + + // Dequeue with timeout. Returns optional (empty on timeout). + std::optional dequeue(std::chrono::milliseconds max_wait) { + std::unique_lock lock(m_mutex); + if (!m_cond_not_empty.wait_for(lock, max_wait, [this]() { return !m_queue.empty(); })) { + return std::nullopt; + } + + T item = std::move(m_queue.front()); + m_queue.pop(); + m_cond_not_full.notify_one(); + return item; + } + + private: + std::queue m_queue; + size_t m_capacity; + std::mutex m_mutex; + std::condition_variable m_cond_not_empty; + std::condition_variable m_cond_not_full; +}; + +#endif // BLOCKINGQUEUE_H diff --git a/include/ICommunicationClient.h b/include/ICommunicationClient.h new file mode 100644 index 0000000..65a42e9 --- /dev/null +++ b/include/ICommunicationClient.h @@ -0,0 +1,15 @@ +// +// Created by Johnathon Slightham on 2025-06-10. +// + +#ifndef INETWORKCLIENT_H +#define INETWORKCLIENT_H + +class ICommunicationClient { + public: + virtual ~ICommunicationClient() = default; + virtual int init() = 0; + virtual int send_msg(void *sendbuff, uint32_t len) = 0; +}; + +#endif //INETWORKCLIENT_H diff --git a/include/IDiscoveryService.h b/include/IDiscoveryService.h new file mode 100644 index 0000000..96c3d63 --- /dev/null +++ b/include/IDiscoveryService.h @@ -0,0 +1,24 @@ +// +// Created by Johnathon Slightham on 2025-06-10. +// + +#ifndef IDISCOVERYSERVICE_H +#define IDISCOVERYSERVICE_H +#include + +#include "ICommunicationClient.h" +#include "mDNSRobotModule.h" + +class IDiscoveryService { + public: + virtual ~IDiscoveryService() = default; + virtual std::unordered_set find_modules(std::chrono::duration wait_time) = 0; + virtual std::unordered_map> get_lossy_clients( + const std::shared_ptr>>> &rx_queue, + std::vector &skip_modules) = 0; + virtual std::unordered_map> get_lossless_clients( + const std::shared_ptr>>> &rx_queue, + std::vector &skip_modules) = 0; +}; + +#endif // IDISCOVERYSERVICE_H diff --git a/include/TCPClient.h b/include/TCPClient.h new file mode 100644 index 0000000..0ad718c --- /dev/null +++ b/include/TCPClient.h @@ -0,0 +1,54 @@ +// +// Created by Johnathon Slightham on 2025-06-10. +// + +#ifndef TCPCLIENT_H +#define TCPCLIENT_H +#include +#include + +#include "ICommunicationClient.h" + +#ifdef _WIN32 +#include +#include +#define CLOSE_SOCKET closesocket +#pragma comment(lib, "ws2_32.lib") +typedef SOCKET socket_t; +#else +#include +#include +#include +#include +#define CLOSE_SOCKET close +typedef int socket_t; +#endif + +#include "BlockingQueue.h" + +class TCPClient final : public ICommunicationClient { + + public: + TCPClient(std::string ip, + const std::shared_ptr>>> &rx_queue) + : port{3000}, m_ip{std::move(ip)}, m_stop_flag(false), + m_thread(std::thread(&TCPClient::rx_thread, this)), m_rx_queue(rx_queue) { + } + ~TCPClient() override; + int init() override; + int send_msg(void *sendbuff, uint32_t len) override; + + private: + void deinit(); + void rx_thread() const; + + socket_t m_socket = -1; + int port; + bool m_initialized = false; + std::string m_ip; + std::atomic m_stop_flag; + std::thread m_thread; + std::shared_ptr>>> m_rx_queue; +}; + +#endif // TCPCLIENT_H diff --git a/include/UDPClient.h b/include/UDPClient.h new file mode 100644 index 0000000..97d6059 --- /dev/null +++ b/include/UDPClient.h @@ -0,0 +1,53 @@ +// +// Created by Johnathon Slightham on 2025-12-27. +// + +#ifndef UDPCLIENT_H +#define UDPCLIENT_H +#include +#include + +#include "ICommunicationClient.h" + +#ifdef _WIN32 +#include +#include +#define CLOSE_SOCKET closesocket +#pragma comment(lib, "ws2_32.lib") +typedef SOCKET socket_t; +#else +#include +#include +#include +#include +#define CLOSE_SOCKET close +typedef int socket_t; +#endif + +#include "BlockingQueue.h" + +class UDPClient final : public ICommunicationClient { + + public: + UDPClient(std::string /* ip */, + const std::shared_ptr>>> &rx_queue) + : m_stop_flag(false), m_thread(std::thread(&UDPClient::rx_thread, this)), + m_rx_queue(rx_queue) { + } + ~UDPClient() override; + int init() override; + int send_msg(void *sendbuff, uint32_t len) override; + + private: + void deinit(); + void rx_thread() const; + + socket_t m_tx_socket = -1; + socket_t m_rx_socket = -1; + bool m_initialized = false; + std::atomic m_stop_flag; + std::thread m_thread; + std::shared_ptr>>> m_rx_queue; +}; + +#endif // UDPCLIENT_H diff --git a/include/constants.h b/include/constants.h new file mode 100644 index 0000000..c60ffdc --- /dev/null +++ b/include/constants.h @@ -0,0 +1,7 @@ +#ifndef CONSTANTS_H +#define CONSTANTS_H + +constexpr auto PC_MODULE_ID = 1; +constexpr auto MAX_BUFFER_SIZE = 1024; + +#endif // CONSTANTS_H diff --git a/include/flatbuffers/MPIMessageBuilder.h b/include/flatbuffers/MPIMessageBuilder.h new file mode 100644 index 0000000..6b2567d --- /dev/null +++ b/include/flatbuffers/MPIMessageBuilder.h @@ -0,0 +1,33 @@ +// +// Created by Johnathon Slightham on 2025-06-30. +// + +#ifndef MPIMESSAGEBUILDER_H +#define MPIMESSAGEBUILDER_H + +#include +#include + +#include "../flatbuffers_generated/MPIMessage_generated.h" +#include "SerializedMessage.h" +#include "flatbuffers/flatbuffers.h" + +namespace Flatbuffers { +class MPIMessageBuilder { + public: + MPIMessageBuilder() : builder_(1024) { + } + + SerializedMessage build_mpi_message(Messaging::MessageType type, uint8_t sender, + uint8_t destination, uint16_t sequence_number, + bool is_durable, uint8_t tag, + const std::vector &payload); + + static const Messaging::MPIMessage *parse_mpi_message(const uint8_t *buffer); + + private: + flatbuffers::FlatBufferBuilder builder_; +}; +} // namespace Flatbuffers + +#endif //MPIMESSAGEBUILDER_H diff --git a/include/flatbuffers/SerializedMessage.h b/include/flatbuffers/SerializedMessage.h new file mode 100644 index 0000000..8e19cc9 --- /dev/null +++ b/include/flatbuffers/SerializedMessage.h @@ -0,0 +1,15 @@ +// +// Created by Johnathon Slightham on 2025-07-05. +// + +#ifndef SERIALIZEDMESSAGE_H +#define SERIALIZEDMESSAGE_H + +namespace Flatbuffers { +struct SerializedMessage { + void *data; + size_t size; +}; +} // namespace Flatbuffers + +#endif //SERIALIZEDMESSAGE_H diff --git a/include/flatbuffers_generated/MPIMessage_generated.h b/include/flatbuffers_generated/MPIMessage_generated.h new file mode 100644 index 0000000..8dccc95 --- /dev/null +++ b/include/flatbuffers_generated/MPIMessage_generated.h @@ -0,0 +1,188 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +#ifndef FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_ +#define FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_ + +#include "flatbuffers/flatbuffers.h" + +// Ensure the included flatbuffers.h is the same version as when this file was +// generated, otherwise it may not be compatible. +// static_assert(FLATBUFFERS_VERSION_MAJOR == 25 && +// FLATBUFFERS_VERSION_MINOR == 2 && +// FLATBUFFERS_VERSION_REVISION == 10, +// "Non-compatible flatbuffers version included"); + +namespace Messaging { + +struct MPIMessage; +struct MPIMessageBuilder; + +enum MessageType : int8_t { + MessageType_BROADCAST = 0, + MessageType_PTP = 1, + MessageType_MIN = MessageType_BROADCAST, + MessageType_MAX = MessageType_PTP +}; + +inline const MessageType (&EnumValuesMessageType())[2] { + static const MessageType values[] = {MessageType_BROADCAST, MessageType_PTP}; + return values; +} + +inline const char *const *EnumNamesMessageType() { + static const char *const names[3] = {"BROADCAST", "PTP", nullptr}; + return names; +} + +inline const char *EnumNameMessageType(MessageType e) { + if (::flatbuffers::IsOutRange(e, MessageType_BROADCAST, MessageType_PTP)) + return ""; + const size_t index = static_cast(e); + return EnumNamesMessageType()[index]; +} + +struct MPIMessage FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table { + typedef MPIMessageBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_TYPE = 4, + VT_SENDER = 6, + VT_DESTINATION = 8, + VT_SEQUENCE_NUMBER = 10, + VT_IS_DURABLE = 12, + VT_LENGTH = 14, + VT_TAG = 16, + VT_PAYLOAD = 18 + }; + Messaging::MessageType type() const { + return static_cast(GetField(VT_TYPE, 0)); + } + uint8_t sender() const { + return GetField(VT_SENDER, 0); + } + uint8_t destination() const { + return GetField(VT_DESTINATION, 0); + } + uint16_t sequence_number() const { + return GetField(VT_SEQUENCE_NUMBER, 0); + } + bool is_durable() const { + return GetField(VT_IS_DURABLE, 0) != 0; + } + uint16_t length() const { + return GetField(VT_LENGTH, 0); + } + uint8_t tag() const { + return GetField(VT_TAG, 0); + } + const ::flatbuffers::Vector *payload() const { + return GetPointer *>(VT_PAYLOAD); + } + bool Verify(::flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && VerifyField(verifier, VT_TYPE, 1) && + VerifyField(verifier, VT_SENDER, 1) && + VerifyField(verifier, VT_DESTINATION, 1) && + VerifyField(verifier, VT_SEQUENCE_NUMBER, 2) && + VerifyField(verifier, VT_IS_DURABLE, 1) && + VerifyField(verifier, VT_LENGTH, 2) && + VerifyField(verifier, VT_TAG, 1) && VerifyOffset(verifier, VT_PAYLOAD) && + verifier.VerifyVector(payload()) && verifier.EndTable(); + } +}; + +struct MPIMessageBuilder { + typedef MPIMessage Table; + ::flatbuffers::FlatBufferBuilder &fbb_; + ::flatbuffers::uoffset_t start_; + void add_type(Messaging::MessageType type) { + fbb_.AddElement(MPIMessage::VT_TYPE, static_cast(type), 0); + } + void add_sender(uint8_t sender) { + fbb_.AddElement(MPIMessage::VT_SENDER, sender, 0); + } + void add_destination(uint8_t destination) { + fbb_.AddElement(MPIMessage::VT_DESTINATION, destination, 0); + } + void add_sequence_number(uint16_t sequence_number) { + fbb_.AddElement(MPIMessage::VT_SEQUENCE_NUMBER, sequence_number, 0); + } + void add_is_durable(bool is_durable) { + fbb_.AddElement(MPIMessage::VT_IS_DURABLE, static_cast(is_durable), 0); + } + void add_length(uint16_t length) { + fbb_.AddElement(MPIMessage::VT_LENGTH, length, 0); + } + void add_tag(uint8_t tag) { + fbb_.AddElement(MPIMessage::VT_TAG, tag, 0); + } + void add_payload(::flatbuffers::Offset<::flatbuffers::Vector> payload) { + fbb_.AddOffset(MPIMessage::VT_PAYLOAD, payload); + } + explicit MPIMessageBuilder(::flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + ::flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = ::flatbuffers::Offset(end); + return o; + } +}; + +inline ::flatbuffers::Offset +CreateMPIMessage(::flatbuffers::FlatBufferBuilder &_fbb, + Messaging::MessageType type = Messaging::MessageType_BROADCAST, uint8_t sender = 0, + uint8_t destination = 0, uint16_t sequence_number = 0, bool is_durable = false, + uint16_t length = 0, uint8_t tag = 0, + ::flatbuffers::Offset<::flatbuffers::Vector> payload = 0) { + MPIMessageBuilder builder_(_fbb); + builder_.add_payload(payload); + builder_.add_length(length); + builder_.add_sequence_number(sequence_number); + builder_.add_tag(tag); + builder_.add_is_durable(is_durable); + builder_.add_destination(destination); + builder_.add_sender(sender); + builder_.add_type(type); + return builder_.Finish(); +} + +inline ::flatbuffers::Offset +CreateMPIMessageDirect(::flatbuffers::FlatBufferBuilder &_fbb, + Messaging::MessageType type = Messaging::MessageType_BROADCAST, + uint8_t sender = 0, uint8_t destination = 0, uint16_t sequence_number = 0, + bool is_durable = false, uint16_t length = 0, uint8_t tag = 0, + const std::vector *payload = nullptr) { + auto payload__ = payload ? _fbb.CreateVector(*payload) : 0; + return Messaging::CreateMPIMessage(_fbb, type, sender, destination, sequence_number, is_durable, + length, tag, payload__); +} + +inline const Messaging::MPIMessage *GetMPIMessage(const void *buf) { + return ::flatbuffers::GetRoot(buf); +} + +inline const Messaging::MPIMessage *GetSizePrefixedMPIMessage(const void *buf) { + return ::flatbuffers::GetSizePrefixedRoot(buf); +} + +inline bool VerifyMPIMessageBuffer(::flatbuffers::Verifier &verifier) { + return verifier.VerifyBuffer(nullptr); +} + +inline bool VerifySizePrefixedMPIMessageBuffer(::flatbuffers::Verifier &verifier) { + return verifier.VerifySizePrefixedBuffer(nullptr); +} + +inline void FinishMPIMessageBuffer(::flatbuffers::FlatBufferBuilder &fbb, + ::flatbuffers::Offset root) { + fbb.Finish(root); +} + +inline void FinishSizePrefixedMPIMessageBuffer(::flatbuffers::FlatBufferBuilder &fbb, + ::flatbuffers::Offset root) { + fbb.FinishSizePrefixed(root); +} + +} // namespace Messaging + +#endif // FLATBUFFERS_GENERATED_MPIMESSAGE_MESSAGING_H_ diff --git a/include/flatbuffers_generated/RobotModule_generated.h b/include/flatbuffers_generated/RobotModule_generated.h new file mode 100644 index 0000000..47071c6 --- /dev/null +++ b/include/flatbuffers_generated/RobotModule_generated.h @@ -0,0 +1,286 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +#ifndef FLATBUFFERS_GENERATED_ROBOTMODULE_H_ +#define FLATBUFFERS_GENERATED_ROBOTMODULE_H_ + +#include "flatbuffers/flatbuffers.h" + +// Ensure the included flatbuffers.h is the same version as when this file was +// generated, otherwise it may not be compatible. +//static_assert(FLATBUFFERS_VERSION_MAJOR == 25 && +// FLATBUFFERS_VERSION_MINOR == 2 && +// FLATBUFFERS_VERSION_REVISION == 10, +// "Non-compatible flatbuffers version included"); + +struct MotorState; +struct MotorStateBuilder; + +struct RobotModule; +struct RobotModuleBuilder; + +enum ModuleType : int8_t { + ModuleType_SPLITTER = 0, + ModuleType_SERVO_1 = 1, + ModuleType_DC_MOTOR = 2, + ModuleType_BATTERY = 3, + ModuleType_MIN = ModuleType_SPLITTER, + ModuleType_MAX = ModuleType_BATTERY +}; + +inline const ModuleType (&EnumValuesModuleType())[4] { + static const ModuleType values[] = {ModuleType_SPLITTER, ModuleType_SERVO_1, + ModuleType_DC_MOTOR, ModuleType_BATTERY}; + return values; +} + +inline const char *const *EnumNamesModuleType() { + static const char *const names[5] = {"SPLITTER", "SERVO_1", "DC_MOTOR", "BATTERY", nullptr}; + return names; +} + +inline const char *EnumNameModuleType(ModuleType e) { + if (::flatbuffers::IsOutRange(e, ModuleType_SPLITTER, ModuleType_BATTERY)) + return ""; + const size_t index = static_cast(e); + return EnumNamesModuleType()[index]; +} + +enum Orientation : int8_t { + Orientation_Deg0 = 0, + Orientation_Deg90 = 1, + Orientation_Deg180 = 2, + Orientation_Deg270 = 3, + Orientation_MIN = Orientation_Deg0, + Orientation_MAX = Orientation_Deg270 +}; + +inline const Orientation (&EnumValuesOrientation())[4] { + static const Orientation values[] = {Orientation_Deg0, Orientation_Deg90, Orientation_Deg180, + Orientation_Deg270}; + return values; +} + +inline const char *const *EnumNamesOrientation() { + static const char *const names[5] = {"Deg0", "Deg90", "Deg180", "Deg270", nullptr}; + return names; +} + +inline const char *EnumNameOrientation(Orientation e) { + if (::flatbuffers::IsOutRange(e, Orientation_Deg0, Orientation_Deg270)) + return ""; + const size_t index = static_cast(e); + return EnumNamesOrientation()[index]; +} + +enum ModuleState : uint8_t { + ModuleState_NONE = 0, + ModuleState_MotorState = 1, + ModuleState_MIN = ModuleState_NONE, + ModuleState_MAX = ModuleState_MotorState +}; + +inline const ModuleState (&EnumValuesModuleState())[2] { + static const ModuleState values[] = {ModuleState_NONE, ModuleState_MotorState}; + return values; +} + +inline const char *const *EnumNamesModuleState() { + static const char *const names[3] = {"NONE", "MotorState", nullptr}; + return names; +} + +inline const char *EnumNameModuleState(ModuleState e) { + if (::flatbuffers::IsOutRange(e, ModuleState_NONE, ModuleState_MotorState)) + return ""; + const size_t index = static_cast(e); + return EnumNamesModuleState()[index]; +} + +template struct ModuleStateTraits { + static const ModuleState enum_value = ModuleState_NONE; +}; + +template <> struct ModuleStateTraits { + static const ModuleState enum_value = ModuleState_MotorState; +}; + +bool VerifyModuleState(::flatbuffers::Verifier &verifier, const void *obj, ModuleState type); +bool VerifyModuleStateVector(::flatbuffers::Verifier &verifier, + const ::flatbuffers::Vector<::flatbuffers::Offset> *values, + const ::flatbuffers::Vector *types); + +struct MotorState FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table { + typedef MotorStateBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_ANGLE = 4 }; + int32_t angle() const { + return GetField(VT_ANGLE, 0); + } + bool Verify(::flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && VerifyField(verifier, VT_ANGLE, 4) && + verifier.EndTable(); + } +}; + +struct MotorStateBuilder { + typedef MotorState Table; + ::flatbuffers::FlatBufferBuilder &fbb_; + ::flatbuffers::uoffset_t start_; + void add_angle(int32_t angle) { + fbb_.AddElement(MotorState::VT_ANGLE, angle, 0); + } + explicit MotorStateBuilder(::flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + ::flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = ::flatbuffers::Offset(end); + return o; + } +}; + +inline ::flatbuffers::Offset CreateMotorState(::flatbuffers::FlatBufferBuilder &_fbb, + int32_t angle = 0) { + MotorStateBuilder builder_(_fbb); + builder_.add_angle(angle); + return builder_.Finish(); +} + +struct RobotModule FLATBUFFERS_FINAL_CLASS : private ::flatbuffers::Table { + typedef RobotModuleBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_ID = 4, + VT_MODULE_TYPE = 6, + VT_CONFIGURATION_TYPE = 8, + VT_CONFIGURATION = 10 + }; + uint8_t id() const { + return GetField(VT_ID, 0); + } + ModuleType module_type() const { + return static_cast(GetField(VT_MODULE_TYPE, 0)); + } + ModuleState configuration_type() const { + return static_cast(GetField(VT_CONFIGURATION_TYPE, 0)); + } + const void *configuration() const { + return GetPointer(VT_CONFIGURATION); + } + template const T *configuration_as() const; + const MotorState *configuration_as_MotorState() const { + return configuration_type() == ModuleState_MotorState + ? static_cast(configuration()) + : nullptr; + } + bool Verify(::flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && VerifyField(verifier, VT_ID, 1) && + VerifyField(verifier, VT_MODULE_TYPE, 1) && + VerifyField(verifier, VT_CONFIGURATION_TYPE, 1) && + VerifyOffset(verifier, VT_CONFIGURATION) && + VerifyModuleState(verifier, configuration(), configuration_type()) && + verifier.EndTable(); + } +}; + +template <> inline const MotorState *RobotModule::configuration_as() const { + return configuration_as_MotorState(); +} + +struct RobotModuleBuilder { + typedef RobotModule Table; + ::flatbuffers::FlatBufferBuilder &fbb_; + ::flatbuffers::uoffset_t start_; + void add_id(uint8_t id) { + fbb_.AddElement(RobotModule::VT_ID, id, 0); + } + void add_module_type(ModuleType module_type) { + fbb_.AddElement(RobotModule::VT_MODULE_TYPE, static_cast(module_type), 0); + } + void add_configuration_type(ModuleState configuration_type) { + fbb_.AddElement(RobotModule::VT_CONFIGURATION_TYPE, + static_cast(configuration_type), 0); + } + void add_configuration(::flatbuffers::Offset configuration) { + fbb_.AddOffset(RobotModule::VT_CONFIGURATION, configuration); + } + explicit RobotModuleBuilder(::flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + ::flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = ::flatbuffers::Offset(end); + return o; + } +}; + +inline ::flatbuffers::Offset +CreateRobotModule(::flatbuffers::FlatBufferBuilder &_fbb, uint8_t id = 0, + ModuleType module_type = ModuleType_SPLITTER, + ModuleState configuration_type = ModuleState_NONE, + ::flatbuffers::Offset configuration = 0) { + RobotModuleBuilder builder_(_fbb); + builder_.add_configuration(configuration); + builder_.add_configuration_type(configuration_type); + builder_.add_module_type(module_type); + builder_.add_id(id); + return builder_.Finish(); +} + +inline bool VerifyModuleState(::flatbuffers::Verifier &verifier, const void *obj, + ModuleState type) { + switch (type) { + case ModuleState_NONE: { + return true; + } + case ModuleState_MotorState: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } + default: + return true; + } +} + +inline bool +VerifyModuleStateVector(::flatbuffers::Verifier &verifier, + const ::flatbuffers::Vector<::flatbuffers::Offset> *values, + const ::flatbuffers::Vector *types) { + if (!values || !types) + return !values && !types; + if (values->size() != types->size()) + return false; + for (::flatbuffers::uoffset_t i = 0; i < values->size(); ++i) { + if (!VerifyModuleState(verifier, values->Get(i), types->GetEnum(i))) { + return false; + } + } + return true; +} + +inline const RobotModule *GetRobotModule(const void *buf) { + return ::flatbuffers::GetRoot(buf); +} + +inline const RobotModule *GetSizePrefixedRobotModule(const void *buf) { + return ::flatbuffers::GetSizePrefixedRoot(buf); +} + +inline bool VerifyRobotModuleBuffer(::flatbuffers::Verifier &verifier) { + return verifier.VerifyBuffer(nullptr); +} + +inline bool VerifySizePrefixedRobotModuleBuffer(::flatbuffers::Verifier &verifier) { + return verifier.VerifySizePrefixedBuffer(nullptr); +} + +inline void FinishRobotModuleBuffer(::flatbuffers::FlatBufferBuilder &fbb, + ::flatbuffers::Offset root) { + fbb.Finish(root); +} + +inline void FinishSizePrefixedRobotModuleBuffer(::flatbuffers::FlatBufferBuilder &fbb, + ::flatbuffers::Offset root) { + fbb.FinishSizePrefixed(root); +} + +#endif // FLATBUFFERS_GENERATED_ROBOTMODULE_H_ diff --git a/include/librpc.h b/include/librpc.h new file mode 100644 index 0000000..576b0dc --- /dev/null +++ b/include/librpc.h @@ -0,0 +1,59 @@ +#ifndef RPC_LIBRARY_H +#define RPC_LIBRARY_H + +#include +#include +#include +#include + +#include "BlockingQueue.h" +#include "constants.h" +#include "mDNSDiscoveryService.h" + + +constexpr auto RX_QUEUE_SIZE = 100; + +struct SizeAndSource { + size_t bytes_written; + uint8_t sender; +}; + +class MessagingInterface { + public: + MessagingInterface() + : m_stop_flag(false), m_rx_thread(std::thread(&MessagingInterface::handle_recv, this)), + m_rx_queue(std::make_shared>>>( + RX_QUEUE_SIZE)) { +#ifdef _WIN32 + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); +#endif + // Initialization must be after call to WSAStartup + m_discovery_service = std::make_unique(); + } + + ~MessagingInterface(); + int send(uint8_t *buffer, size_t size, uint8_t destination, uint8_t tag, bool durable); + int broadcast(uint8_t *buffer, size_t size, bool durable); // todo + std::optional recv(uint8_t *buffer, size_t size, uint8_t tag); + int sendrecv(uint8_t *send_buffer, size_t send_size, uint8_t dest, uint8_t send_tag, + uint8_t *recv_buffer, size_t recv_size, uint8_t recv_tag); // todo + std::unordered_set find_connected_modules(std::chrono::duration scan_duration); + + private: + void handle_recv(); + + uint16_t m_sequence_number = 0; + std::unordered_map> m_id_to_lossless_client; + std::unordered_map> m_id_to_lossy_client; + std::unordered_map>>>> + m_tag_to_queue_map; + std::unique_ptr m_discovery_service; + std::atomic m_stop_flag; + std::thread m_rx_thread; + std::shared_ptr>>> m_rx_queue; + std::shared_mutex m_client_mutex; + std::shared_mutex m_scan_mutex; +}; + +#endif // RPC_LIBRARY_H diff --git a/include/mDNSDiscoveryService.h b/include/mDNSDiscoveryService.h new file mode 100644 index 0000000..8e1ccff --- /dev/null +++ b/include/mDNSDiscoveryService.h @@ -0,0 +1,56 @@ +// +// Created by Johnathon Slightham on 2025-06-10. +// + +#ifndef MDNSDISCOVERYSERVICE_H +#define MDNSDISCOVERYSERVICE_H + +#include +#include + +#include "BlockingQueue.h" +#include "ICommunicationClient.h" +#include "IDiscoveryService.h" +#include "mDNSRobotModule.h" + +#ifdef _WIN32 +#include +#include +#define CLOSE_SOCKET closesocket +#pragma comment(lib, "ws2_32.lib") +typedef SOCKET socket_t; +#else +#include +#include +#include +#include +#define CLOSE_SOCKET close +typedef int socket_t; +#endif + +class mDNSDiscoveryService final : public IDiscoveryService { + + public: + mDNSDiscoveryService(); + ~mDNSDiscoveryService() override; + std::unordered_set find_modules(std::chrono::duration wait_time) override; + std::unordered_map> get_lossy_clients( + const std::shared_ptr>>> &rx_queue, + std::vector &skip_modules) override; + std::unordered_map> get_lossless_clients( + const std::shared_ptr>>> &rx_queue, + std::vector &skip_modules) override; + + private: + template + std::unordered_map> create_clients( + const std::shared_ptr>>> &rx_queue, + std::vector &skip_modules); + static void send_mdns_query(socket_t sock, const sockaddr_in &addr); + static std::optional parse_response(uint8_t *buffer, int size); + static std::tuple read_mdns_name(const uint8_t *buffer, int size, int ptr); + + std::unordered_map module_to_mdns{}; +}; + +#endif // MDNSDISCOVERYSERVICE_H diff --git a/include/mDNSRobotModule.h b/include/mDNSRobotModule.h new file mode 100644 index 0000000..595e96d --- /dev/null +++ b/include/mDNSRobotModule.h @@ -0,0 +1,19 @@ +// +// Created by Johnathon Slightham on 2025-07-05. +// + +#ifndef ROBOTMODULEINSTANCE_H +#define ROBOTMODULEINSTANCE_H + +#include "flatbuffers_generated/RobotModule_generated.h" +#include + +struct mDNSRobotModule { + int id; + std::string ip; + std::string hostname; + ModuleType module_type; + std::vector connected_module_ids; +}; + +#endif //ROBOTMODULEINSTANCE_H diff --git a/include/util/ip.h b/include/util/ip.h new file mode 100644 index 0000000..556fb8a --- /dev/null +++ b/include/util/ip.h @@ -0,0 +1,24 @@ +#ifndef IP_UTIL_H +#define IP_UTIL_H + +#ifdef _WIN32 +#include +#include +#define CLOSE_SOCKET closesocket +#pragma comment(lib, "ws2_32.lib") +typedef SOCKET socket_t; +#else +#include +#include +#include +#include +#define CLOSE_SOCKET close +typedef int socket_t; +#endif + +bool is_valid_ipv4(const std::string &ip) { + struct in_addr addr; + return inet_pton(AF_INET, ip.c_str(), &addr) == 1; +} + +#endif // IP_UTIL_H diff --git a/include/util/log.h b/include/util/log.h new file mode 100644 index 0000000..ab94a9c --- /dev/null +++ b/include/util/log.h @@ -0,0 +1,32 @@ +// +// Created by sligh on 2026-01-09. +// + +#ifndef LOG_H +#define LOG_H + +#define ERRBUF_SIZE 300 + +#include "spdlog/spdlog.h" + +#ifdef _WIN32 + +void print_errno() { + char errbuf[ERRBUF_SIZE]; + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, WSAGetLastError(), 0, errbuf, sizeof(errbuf), + NULL); + spdlog::error("{}", errbuf); +} + +#else + +#include +#include + +void print_errno() { + spdlog::error("{}", strerror(errno)); +} + +#endif + +#endif //LOG_H diff --git a/include/util/string.h b/include/util/string.h new file mode 100644 index 0000000..37cf16e --- /dev/null +++ b/include/util/string.h @@ -0,0 +1,23 @@ +// +// Created by Johnathon Slightham on 2025-07-05. +// + +#ifndef STRING_H +#define STRING_H +#include +#include +#include + +inline std::vector split(const std::string &str, const char delimiter) { + std::vector result; + std::stringstream ss(str); + std::string token; + + while (std::getline(ss, token, delimiter)) { + result.push_back(token); + } + + return result; +} + +#endif //STRING_H diff --git a/src/MPIMessageBuilder.cpp b/src/MPIMessageBuilder.cpp new file mode 100644 index 0000000..24fd4fe --- /dev/null +++ b/src/MPIMessageBuilder.cpp @@ -0,0 +1,31 @@ +// +// Created by Johnathon Slightham on 2025-06-30. +// + +#include "flatbuffers/MPIMessageBuilder.h" +#include "flatbuffers/SerializedMessage.h" + +namespace Flatbuffers { +SerializedMessage MPIMessageBuilder::build_mpi_message(const Messaging::MessageType type, + const uint8_t sender, + const uint8_t destination, + const uint16_t sequence_number, + const bool is_durable, const uint8_t tag, + const std::vector &payload) { + builder_.Clear(); + + const auto payload_vector = builder_.CreateVector(payload); + + const auto message = Messaging::CreateMPIMessage( + builder_, type, sender, destination, sequence_number, is_durable, + static_cast(payload.size()), tag, payload_vector); + + builder_.Finish(message); + + return {builder_.GetBufferPointer(), builder_.GetSize()}; +} + +const Messaging::MPIMessage *MPIMessageBuilder::parse_mpi_message(const uint8_t *buffer) { + return flatbuffers::GetRoot(buffer); +} +} // namespace Flatbuffers diff --git a/src/TCPClient.cpp b/src/TCPClient.cpp new file mode 100644 index 0000000..8884d6b --- /dev/null +++ b/src/TCPClient.cpp @@ -0,0 +1,114 @@ +// +// Created by Johnathon Slightham on 2025-06-10. +// + +#include +#include +#include + +#include "TCPClient.h" +#include "constants.h" +#include "spdlog/spdlog.h" + +constexpr auto SLEEP_WHILE_INITIALIZING = std::chrono::milliseconds(250); +constexpr int PORT = 3001; +constexpr auto QUEUE_ADD_TIMEOUT = std::chrono::milliseconds(100); +constexpr auto RX_SLEEP_ON_ERROR = std::chrono::milliseconds(100); +constexpr auto SOCKET_TIMEOUT_MS = 2500; + +// todo: - add authentication +// - encryption + +TCPClient::~TCPClient() { + this->m_stop_flag = true; + this->m_thread.join(); + this->deinit(); +} + +int TCPClient::init() { + sockaddr_in serv_addr{}; + + if ((this->m_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + spdlog::error("[TCP] Failed to create socket"); + return -2; + } + + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(PORT); + + if (inet_pton(AF_INET, this->m_ip.c_str(), &serv_addr.sin_addr) <= 0) { + spdlog::error("[TCP] Invalid address"); + deinit(); + return -1; + } + + timeval timeout{}; + timeout.tv_sec = SOCKET_TIMEOUT_MS / 1000; + timeout.tv_usec = (SOCKET_TIMEOUT_MS % 1000) * 1000; + +#ifdef _WIN32 + setsockopt(this->m_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)); + setsockopt(this->m_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)); +#else + setsockopt(this->m_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); + setsockopt(this->m_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); +#endif + + if (connect(this->m_socket, reinterpret_cast(&serv_addr), sizeof(serv_addr)) < 0) { + spdlog::error("[TCP] Connection failed"); + deinit(); + return -1; + } + + this->m_initialized = true; + return 0; +} + +void TCPClient::deinit() { + this->m_initialized = false; + if (this->m_socket > 0) { + CLOSE_SOCKET(this->m_socket); + this->m_socket = -1; + } +} + +int TCPClient::send_msg(void *sendbuff, const uint32_t len) { + if (!m_initialized) { + return -1; + } + + if (send(this->m_socket, (char *)&len, 4, 0) < 4) { + return -1; + } + + return send(this->m_socket, (char *)sendbuff, len, 0); +} + +void TCPClient::rx_thread() const { + while (!m_stop_flag) { + if (!m_initialized) { + std::this_thread::sleep_for(SLEEP_WHILE_INITIALIZING); + continue; + } + + uint32_t data_len = 0; + if (recv(this->m_socket, (char *)&data_len, 4, MSG_WAITALL) < 0) { + std::this_thread::sleep_for(RX_SLEEP_ON_ERROR); + continue; + } + + if (data_len > MAX_BUFFER_SIZE || data_len < 1) { + std::this_thread::sleep_for(RX_SLEEP_ON_ERROR); + continue; + } + + auto buffer = std::make_unique>(); + buffer->resize(MAX_BUFFER_SIZE); + if (const auto read = recv(this->m_socket, (char *)buffer->data(), data_len, MSG_WAITALL); + read > 0) { + m_rx_queue->enqueue(std::move(buffer), QUEUE_ADD_TIMEOUT); + } else { + std::this_thread::sleep_for(RX_SLEEP_ON_ERROR); + } + } +} diff --git a/src/UDPClient.cpp b/src/UDPClient.cpp new file mode 100644 index 0000000..3ad248d --- /dev/null +++ b/src/UDPClient.cpp @@ -0,0 +1,187 @@ +// +// Created by Johnathon Slightham on 2025-06-10. +// + +#include +#include +#include +#include + +#include "UDPClient.h" +#include "spdlog/spdlog.h" +#include "util/log.h" + +constexpr auto SLEEP_WHILE_INITIALIZING = std::chrono::milliseconds(250); +constexpr int TX_PORT = 3101; +constexpr int RX_PORT = 3100; +constexpr std::string RECV_MCAST = "239.1.1.2"; +constexpr std::string SEND_MCAST = "239.1.1.1"; +constexpr auto SOCKET_TIMEOUT_MS = 2500; +constexpr auto QUEUE_ADD_TIMEOUT = std::chrono::milliseconds(100); +constexpr auto RX_SLEEP_ON_ERROR = std::chrono::milliseconds(100); +constexpr auto RX_BUFFER_SIZE = 1024; + +// todo: - add authentication +// - encryption + +UDPClient::~UDPClient() { + this->m_stop_flag = true; + this->m_thread.join(); + this->deinit(); +} + +int UDPClient::init() { + if ((this->m_rx_socket = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + spdlog::error("[UDP] Failed to create socket"); + print_errno(); + return -2; + } + + if ((this->m_tx_socket = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + spdlog::error("[UDP] Failed to create socket"); + print_errno(); + deinit(); + return -2; + } + + timeval timeout{}; + timeout.tv_sec = SOCKET_TIMEOUT_MS / 1000; + timeout.tv_usec = (SOCKET_TIMEOUT_MS % 1000) * 1000; + +#ifdef _WIN32 + setsockopt(this->m_rx_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout, sizeof(timeout)); + setsockopt(this->m_tx_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)); +#else + setsockopt(this->m_rx_socket, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); + setsockopt(this->m_tx_socket, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); +#endif + + sockaddr_in server_addr = { + .sin_family = AF_INET, + .sin_port = htons(RX_PORT), + }; + server_addr.sin_addr.s_addr = INADDR_ANY; + + if (int err = bind(m_rx_socket, reinterpret_cast(&server_addr), + sizeof(server_addr)); + 0 != err) { + spdlog::error("[UDP] Socket unable to bind"); + print_errno(); + deinit(); + return -1; + } + + constexpr int opt = 1; +#ifdef _WIN32 + setsockopt(m_rx_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)); + setsockopt(m_tx_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)); +#else + setsockopt(m_rx_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + setsockopt(m_tx_socket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); +#endif + + ip_mreq mreq; + mreq.imr_multiaddr.s_addr = inet_addr(RECV_MCAST.c_str()); + mreq.imr_interface.s_addr = INADDR_ANY; + +#ifdef _WIN32 + // Get hostname, resolve to primary IPv4 (won't work for all cases) + char hostname[256]; + gethostname(hostname, sizeof(hostname)); + hostent *host = gethostbyname(hostname); + if (host && host->h_addr_list[0]) { + mreq.imr_interface.s_addr = *(uint32_t *)host->h_addr_list[0]; + } else { + mreq.imr_interface.s_addr = INADDR_ANY; // Fallback + } + + spdlog::info("[UDP] Listening on {}", mreq.imr_interface.s_addr); + + if (setsockopt(m_rx_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&mreq, sizeof(mreq)) < 0) { + spdlog::error("[UDP] Failed to join multicast group"); + print_errno(); + deinit(); + return -1; + } +#else + setsockopt(m_rx_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); +#endif + + this->m_initialized = true; + + return 0; +} + +void UDPClient::deinit() { + this->m_initialized = false; + + if (this->m_tx_socket > 0) { + CLOSE_SOCKET(this->m_tx_socket); + this->m_tx_socket = -1; + } + + if (this->m_rx_socket > 0) { + CLOSE_SOCKET(this->m_rx_socket); + this->m_rx_socket = -1; + } +} + +int UDPClient::send_msg(void *sendbuff, const uint32_t len) { + if (!m_initialized) { + return -1; + } + + std::vector buffer; + buffer.resize(len + 4); + + *reinterpret_cast(buffer.data()) = static_cast(len); + std::memcpy(buffer.data() + 4, sendbuff, len); + + sockaddr_in mcast_dest = { + .sin_family = AF_INET, + .sin_port = htons(TX_PORT), + }; + inet_pton(AF_INET, SEND_MCAST.c_str(), &mcast_dest.sin_addr); + +#ifdef _WIN32 + return sendto(m_tx_socket, reinterpret_cast(buffer.data()), buffer.size(), 0, + reinterpret_cast(&mcast_dest), sizeof(mcast_dest)); +#else + return sendto(m_tx_socket, buffer.data(), buffer.size(), 0, + reinterpret_cast(&mcast_dest), sizeof(mcast_dest)); +#endif +} + +void UDPClient::rx_thread() const { + + while (!m_stop_flag) { + if (!m_initialized) { + std::this_thread::sleep_for(RX_SLEEP_ON_ERROR); + continue; + } + + auto buffer = std::make_unique>(); + buffer->resize(RX_BUFFER_SIZE); + +#ifdef _WIN32 + const auto len = recv(m_rx_socket, (char *)buffer->data(), RX_BUFFER_SIZE, 0); +#else + const auto len = recv(m_rx_socket, buffer->data(), RX_BUFFER_SIZE, 0); +#endif + if (len < 0) { + std::this_thread::sleep_for(RX_SLEEP_ON_ERROR); + } else if (len < 4 || len > RX_BUFFER_SIZE) { + spdlog::error("[UDP] Message size of {} incorrect", len); + } else { + uint32_t msg_size = *reinterpret_cast(buffer->data()); + if (msg_size > len - 4) { + spdlog::error("[UDP] Message size incorrect {}", msg_size); + continue; + } + + buffer->erase(buffer->begin(), buffer->begin() + 4); + buffer->resize(msg_size); + m_rx_queue->enqueue(std::move(buffer), QUEUE_ADD_TIMEOUT); + } + } +} diff --git a/src/librpc.cpp b/src/librpc.cpp new file mode 100644 index 0000000..7dfcea0 --- /dev/null +++ b/src/librpc.cpp @@ -0,0 +1,134 @@ +#include "librpc.h" + +#include +#include +#include + +#undef min +#include + +#include "flatbuffers/MPIMessageBuilder.h" +#include "spdlog/spdlog.h" + +constexpr auto MAX_RECV_WAIT_TIME = std::chrono::seconds(3); +constexpr auto PER_TAG_MAX_QUEUE_SIZE = 50; +constexpr auto MAX_WAIT_TIME_TAG_ENQUEUE = std::chrono::milliseconds(250); +constexpr auto MAX_WAIT_TIME_RX_THREAD_DEQUEUE = std::chrono::milliseconds(250); + +MessagingInterface::~MessagingInterface() { + m_stop_flag = true; + m_rx_thread.join(); + +#ifdef _WIN32 + WSACleanup(); +#endif +} + +int MessagingInterface::send(uint8_t *buffer, const size_t size, const uint8_t destination, + const uint8_t tag, const bool durable) { + if (!this->m_id_to_lossless_client.contains(destination)) { + return -1; + } + + Flatbuffers::MPIMessageBuilder builder; + const auto [mpi_buffer, mpi_size] = builder.build_mpi_message( + Messaging::MessageType_PTP, PC_MODULE_ID, destination, m_sequence_number++, durable, tag, + std::vector(buffer, buffer + size)); + + std::shared_lock lock(m_client_mutex); + if (durable) { + this->m_id_to_lossless_client[destination]->send_msg(mpi_buffer, mpi_size); + } else { + this->m_id_to_lossy_client[destination]->send_msg(mpi_buffer, mpi_size); + } + + return 0; +} + +int MessagingInterface::broadcast(uint8_t *buffer, size_t size, bool durable) { + return -1; // todo +} + +std::optional MessagingInterface::recv(uint8_t *buffer, const size_t size, + uint8_t tag) { + if (!m_tag_to_queue_map.contains(tag)) { + m_tag_to_queue_map.insert( + {tag, std::make_unique>>>( + PER_TAG_MAX_QUEUE_SIZE)}); + } + + const auto data = m_tag_to_queue_map[tag]->dequeue(MAX_RECV_WAIT_TIME); + + if (!data.has_value()) { + return std::nullopt; + } + + // Anything in the queue should already be validated + const auto mpi_message = + Flatbuffers::MPIMessageBuilder::parse_mpi_message(data.value()->data()); + const auto data_size = std::min(size, static_cast(mpi_message->length())); + + std::memcpy(buffer, mpi_message->payload()->data(), data_size); + + return std::make_optional({data_size, mpi_message->sender()}); +} + +int MessagingInterface::sendrecv(uint8_t *send_buffer, size_t send_size, uint8_t dest, + uint8_t send_tag, uint8_t *recv_buffer, size_t recv_size, + uint8_t recv_tag) { + // no-op + return -1; +} + +std::unordered_set +MessagingInterface::find_connected_modules(const std::chrono::duration scan_duration) { + // Cannot just skip the call if already running, since the caller needs the list of modules. + std::unique_lock scan_lock(m_scan_mutex); + const auto foundModules = this->m_discovery_service->find_modules(scan_duration); + scan_lock.unlock(); + + std::unique_lock lock(m_client_mutex); + + std::vector existing_clients; + existing_clients.reserve(m_id_to_lossless_client.size()); + for (auto &kv : m_id_to_lossless_client) { + existing_clients.push_back(kv.first); + } + + const auto new_lossless = + this->m_discovery_service->get_lossless_clients(m_rx_queue, existing_clients); + const auto new_lossy = + this->m_discovery_service->get_lossy_clients(m_rx_queue, existing_clients); + + m_id_to_lossless_client.insert(new_lossless.begin(), new_lossless.end()); + m_id_to_lossy_client.insert(new_lossy.begin(), new_lossy.end()); + + return foundModules; +} + +void MessagingInterface::handle_recv() { + while (!m_stop_flag) { + if (auto data = this->m_rx_queue->dequeue(MAX_WAIT_TIME_RX_THREAD_DEQUEUE); + data.has_value()) { + flatbuffers::Verifier verifier(data.value()->data(), data.value()->size()); + bool ok = Messaging::VerifyMPIMessageBuffer(verifier); + if (!ok) { + spdlog::error("[LibRPC] Got invalid flatbuffer data"); + continue; + } + + const auto &mpi_message = + Flatbuffers::MPIMessageBuilder::parse_mpi_message(data.value()->data()); + + if (!m_tag_to_queue_map.contains(mpi_message->tag())) { + m_tag_to_queue_map.insert( + {mpi_message->tag(), + std::make_unique>>>( + PER_TAG_MAX_QUEUE_SIZE)}); + } + + m_tag_to_queue_map[mpi_message->tag()]->enqueue(std::move(data.value()), + MAX_WAIT_TIME_TAG_ENQUEUE); + } + } +} diff --git a/src/mDNSDiscoveryService.cpp b/src/mDNSDiscoveryService.cpp new file mode 100644 index 0000000..ec3257d --- /dev/null +++ b/src/mDNSDiscoveryService.cpp @@ -0,0 +1,367 @@ +// +// Created by Johnathon Slightham on 2025-06-10. +// + +#include +#include +#include +#include +#include + +#include "TCPClient.h" +#include "mDNSDiscoveryService.h" + +#include "UDPClient.h" +#include "spdlog/spdlog.h" +#include "util/ip.h" +#include "util/string.h" + +#define MDNS_PORT 5353 +#define MDNS_GROUP "224.0.0.251" +#define RECV_BLOCK_SIZE 1024 +#define MODULE_TYPE_STR "module_type" +#define MODULE_ID_STR "module_id" +#define CONNECTED_MODULES_STR "connected_modules" + +#pragma pack(push, 1) // prevent padding between struct members +struct query_header { + uint16_t id; + uint16_t flags; + uint16_t num_questions; + uint16_t num_answers; + uint16_t num_authority; + uint16_t num_additional; +}; + +struct query_footer { // footer for the question not for the packet + uint16_t type = htons(0x00FF); + uint16_t class_id = htons(0x8001); +}; + +struct answer { + uint16_t type; + uint16_t answer_class; + uint32_t ttl; + uint16_t data_length; +}; + +#pragma pack(pop) + +mDNSDiscoveryService::mDNSDiscoveryService() = default; + +mDNSDiscoveryService::~mDNSDiscoveryService() = default; + +std::unordered_set +mDNSDiscoveryService::find_modules(const std::chrono::duration wait_time) { + std::unordered_set modules{}; + + const socket_t sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + printf("socket() failed: %s\n", strerror(errno)); + return modules; + } + + constexpr int reuse = 1; + timeval tv{}; + tv.tv_sec = 1; + tv.tv_usec = 0; +#ifdef _WIN32 + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)); + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof tv); + // Windows does not support SO_REUSEPORT +#else + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)); + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv); +#endif + + sockaddr_in localAddr{}; + localAddr.sin_family = AF_INET; + localAddr.sin_port = htons(MDNS_PORT); + localAddr.sin_addr.s_addr = INADDR_ANY; + if (bind(sock, reinterpret_cast(&localAddr), sizeof(localAddr)) < 0) { + printf("bind() failed: %s\n", strerror(errno)); + CLOSE_SOCKET(sock); + return modules; + } + + // Join mDNS multicast group + ip_mreq mreq{}; + mreq.imr_multiaddr.s_addr = inet_addr(MDNS_GROUP); + mreq.imr_interface.s_addr = INADDR_ANY; + +#ifdef _WIN32 + if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&mreq, sizeof(mreq)) < 0) { +#else + if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { +#endif + printf("setsockopt() failed: %s\n", strerror(errno)); + CLOSE_SOCKET(sock); + return modules; + } + + // Send mDNS query and get responses + sockaddr_in mcastAddr{}; + mcastAddr.sin_family = AF_INET; + mcastAddr.sin_port = htons(MDNS_PORT); + inet_pton(AF_INET, MDNS_GROUP, &mcastAddr.sin_addr); + const auto start = std::chrono::system_clock::now(); + std::vector>> responses; + + while (std::chrono::system_clock::now() - start < wait_time) { + send_mdns_query(sock, mcastAddr); + + std::this_thread::sleep_for(wait_time / 5); + + responses.emplace_back(std::make_unique>()); + responses.back()->resize(RECV_BLOCK_SIZE); +#ifdef _WIN32 + const auto len = recv(sock, (char *)responses.back()->data(), RECV_BLOCK_SIZE, 0); +#else + const auto len = recv(sock, responses.back()->data(), RECV_BLOCK_SIZE, 0); +#endif + if (len > 0) { + responses.back()->resize(len); + } else { + responses.pop_back(); + } + } + + CLOSE_SOCKET(sock); + + this->module_to_mdns.clear(); + + for (const auto &response : responses) { + if (const auto parsed_response = parse_response(response->data(), response->size()); + parsed_response.has_value()) { + modules.insert(parsed_response.value().id); + this->module_to_mdns.insert({parsed_response.value().id, parsed_response.value()}); + } + } + + return modules; +} + +std::unordered_map> +mDNSDiscoveryService::get_lossy_clients( + const std::shared_ptr>>> &rx_queue, + std::vector &skip_modules) { + return this->create_clients(rx_queue, skip_modules); +} + +std::unordered_map> +mDNSDiscoveryService::get_lossless_clients( + const std::shared_ptr>>> &rx_queue, + std::vector &skip_modules) { + return this->create_clients(rx_queue, skip_modules); +} + +template +std::unordered_map> +mDNSDiscoveryService::create_clients( + const std::shared_ptr>>> &rx_queue, + std::vector &skip_modules) { + std::unordered_map> clients; + + for (const auto &[id, module] : this->module_to_mdns) { + if (std::find(skip_modules.begin(), skip_modules.end(), id) != skip_modules.end()) { + continue; + } + + const auto client = std::make_shared(module.ip, rx_queue); + client->init(); + + for (const auto &connected_module : module.connected_module_ids) { + // todo: add only if not connected directly + clients[connected_module] = client; + } + + clients[id] = client; + } + + return clients; +} + +void mDNSDiscoveryService::send_mdns_query(const socket_t sock, const sockaddr_in &addr) { + query_header header{}; + header.id = htons(0); + header.flags = htons(0x0000); + header.num_questions = htons(1); + header.num_answers = htons(0); + header.num_authority = htons(0); + header.num_additional = htons(0); + + constexpr uint8_t domain_name[] = { + 13, '_', 'r', 'o', 'b', 'o', 't', 'c', 'o', 'n', 't', 'r', 'o', + 'l', 4, '_', 't', 'c', 'p', 5, 'l', 'o', 'c', 'a', 'l', 0, + }; + + query_footer footer; + footer.type = htons(0x00FF); + footer.class_id = htons(0x0001); + + uint8_t buffer[1024] = {}; + memcpy(buffer, &header, sizeof(header)); + memcpy(buffer + sizeof(header), &domain_name, sizeof(domain_name)); + memcpy(buffer + sizeof(header) + sizeof(domain_name), &footer, sizeof(footer)); +#ifdef _WIN32 + sendto(sock, (char *)&buffer, sizeof(header) + sizeof(domain_name) + sizeof(footer), 0, + (sockaddr *)&addr, sizeof(addr)); +#else + sendto(sock, &buffer, sizeof(header) + sizeof(domain_name) + sizeof(footer), 0, + (sockaddr *)&addr, sizeof(addr)); +#endif +} + +std::optional mDNSDiscoveryService::parse_response(uint8_t *buffer, + const int size) { + int ptr = 0; + mDNSRobotModule response{}; + + // Header + if (size < sizeof(query_header)) { + return std::nullopt; + } + const auto h = reinterpret_cast(buffer + ptr); + ptr += sizeof(query_header); + h->num_questions = ntohs(h->num_questions); + h->num_answers = ntohs(h->num_answers); + h->num_authority = ntohs(h->num_authority); + h->num_additional = ntohs(h->num_additional); + + // Questions + for (int i = 0; i < h->num_questions; i++) { + if (ptr > size) { + return std::nullopt; + } + + // We ignore questions for now + auto [name, new_ptr] = read_mdns_name(buffer, size, ptr); + if (new_ptr < 1) { + return std::nullopt; + } + ptr = new_ptr; + ptr += sizeof(query_footer); + } + + // Answers and authority (we do not care about authority). + bool robot_module = false; + for (int i = 0; i < h->num_answers + h->num_authority + h->num_additional; i++) { + if (ptr > size) { + return std::nullopt; + } + // We assume that the boards mdns does not send any questions asking for + // other boards (and thus does not compress the domain name we are looking + // for). + + const auto [name, new_ptr] = read_mdns_name(buffer, size, ptr); + if (new_ptr < 1) { + return std::nullopt; + } + ptr = new_ptr; + + robot_module |= name.find("_robotcontrol") != std::string::npos; + response.hostname = name; + + const auto a = reinterpret_cast(buffer + ptr); + a->type = ntohs(a->type); + a->answer_class = ntohs(a->answer_class); + a->ttl = ntohs(a->ttl); + a->data_length = ntohs(a->data_length); + ptr += sizeof(answer); + + // A-Record + if (a->type == 1 && robot_module) { + std::vector data; + data.resize(a->data_length); + std::memcpy(data.data(), buffer + ptr, a->data_length); + + std::stringstream ip; + for (int j = 0; j < a->data_length; j++) { + ip << static_cast(data[j]); + if (j < a->data_length - 1) { + ip << '.'; + } + } + response.ip = ip.str(); + } + + // TXT-Recrod + if (a->type == 16 && robot_module) { + int inner_ptr = ptr; + while (inner_ptr < a->data_length + ptr) { + const int len = buffer[inner_ptr++]; + std::string s(reinterpret_cast(buffer + inner_ptr), len); + inner_ptr += len; + + const auto split_string = split(s, '='); + if (split_string.size() != 2) { + continue; + } + + if (split_string[0] == MODULE_ID_STR) { + response.id = stoi(split_string[1]); + } + + if (split_string[0] == MODULE_TYPE_STR) { + response.module_type = static_cast(stoi(split_string[1])); + } + + if (split_string[0] == CONNECTED_MODULES_STR) { + for (const auto connected_modules = split(split_string[1], ','); + const auto &module_id : connected_modules) { + response.connected_module_ids.emplace_back(stoi(module_id)); + } + } + } + } + + ptr += a->data_length; + } + + return robot_module && is_valid_ipv4(response.ip) ? std::optional{response} : std::nullopt; +} + +std::tuple mDNSDiscoveryService::read_mdns_name(const uint8_t *buffer, + const int size, int ptr) { + int len = 0; + std::stringstream ss; + + int i = 0; + while (ptr < size) { + if (0 >= len) { + if (0 == buffer[ptr]) { // end + ptr++; + break; + } + + if (0 != i) { + ss << "."; + } + + if (buffer[ptr] >= 0xC0) { // compressed + ptr++; + if (buffer[ptr] < 0 || buffer[ptr] > ptr) { + return {"", -1}; + } + const auto [name, l] = read_mdns_name(buffer, size, buffer[ptr]); + if (l < 1) { + return {"", -1}; + } + ptr++; + ss << name; + break; + } + + len = buffer[ptr]; // update length + } else { + len--; + ss << buffer[ptr]; + } + ptr++; + + i++; + } + + return {ss.str(), ptr}; +}