Radix Relay
Hybrid mesh communications with Signal Protocol encryption
Loading...
Searching...
No Matches
processor_runner.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <boost/asio/awaitable.hpp>
4#include <boost/asio/cancellation_signal.hpp>
5#include <boost/asio/co_spawn.hpp>
6#include <boost/asio/detached.hpp>
7#include <boost/asio/error.hpp>
8#include <boost/asio/experimental/channel_error.hpp>
9#include <boost/asio/io_context.hpp>
10#include <concepts>
11#include <memory>
12#include <spdlog/spdlog.h>
13#include <string_view>
14
15namespace radix_relay::core {
16
22template<typename T>
23concept Processor = requires(T proc, std::shared_ptr<boost::asio::cancellation_slot> slot) {
24 { proc.run(slot) } -> std::same_as<boost::asio::awaitable<void>>;
25};
26
36template<Processor P>
37auto run_processor(std::shared_ptr<P> proc,
38 std::shared_ptr<boost::asio::cancellation_slot> cancel_slot,
39 std::string_view processor_name) -> boost::asio::awaitable<void>
40{
41 spdlog::trace("[{}] Coroutine started", processor_name);
42 try {
43 co_await proc->run(cancel_slot);
44 } catch (const boost::system::system_error &err) {
45 if (err.code() == boost::asio::error::operation_aborted
46 or err.code() == boost::asio::experimental::error::channel_cancelled
47 or err.code() == boost::asio::experimental::error::channel_closed) {
48 spdlog::debug("[{}] Cancelled, exiting run loop", processor_name);
49 co_return;
50 }
51 spdlog::error("[{}] Unexpected error in run_once: {}", processor_name, err.what());
52 } catch (const std::exception &err) {
53 spdlog::error("[{}] Unknown exception in run_once: {}", processor_name, err.what());
54 }
55 spdlog::trace("[{}] Coroutine exiting", processor_name);
56}
57
62{
63 std::atomic<bool> started{ false };
64 std::atomic<bool> done{ false };
65};
66
77template<Processor P>
78auto spawn_processor(const std::shared_ptr<boost::asio::io_context> &io_ctx,
79 std::shared_ptr<P> proc,
80 std::shared_ptr<boost::asio::cancellation_slot> cancel_slot,
81 std::string_view processor_name) -> std::shared_ptr<coroutine_state>
82{
83 auto state = std::make_shared<coroutine_state>();
84 boost::asio::co_spawn(
85 *io_ctx,
86 [](std::shared_ptr<P> processor,
87 std::shared_ptr<boost::asio::cancellation_slot> c_slot,
88 std::string_view name,
89 std::shared_ptr<coroutine_state> coro_state) -> boost::asio::awaitable<void> {
90 coro_state->started = true;
91 co_await run_processor(processor, c_slot, name);
92 coro_state->done = true;
93 }(proc, cancel_slot, processor_name, state),
94 boost::asio::detached);
95 return state;
96}
97
98}// namespace radix_relay::core
Concept for types that can be run as processors.
auto spawn_processor(const std::shared_ptr< boost::asio::io_context > &io_ctx, std::shared_ptr< P > proc, std::shared_ptr< boost::asio::cancellation_slot > cancel_slot, std::string_view processor_name) -> std::shared_ptr< coroutine_state >
Spawns a processor as a detached coroutine.
auto run_processor(std::shared_ptr< P > proc, std::shared_ptr< boost::asio::cancellation_slot > cancel_slot, std::string_view processor_name) -> boost::asio::awaitable< void >
Runs a processor coroutine with error handling.
Tracks the lifecycle state of a spawned coroutine.
std::atomic< bool > done
True when coroutine has completed.
std::atomic< bool > started
True when coroutine has started execution.