Radix Relay
Hybrid mesh communications with Signal Protocol encryption
Loading...
Searching...
No Matches
async_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <boost/asio/awaitable.hpp>
5#include <boost/asio/bind_cancellation_slot.hpp>
6#include <boost/asio/cancellation_signal.hpp>
7#include <boost/asio/experimental/concurrent_channel.hpp>
8#include <boost/asio/io_context.hpp>
9#include <boost/asio/redirect_error.hpp>
10#include <boost/asio/use_awaitable.hpp>
11#include <cstddef>
12#include <memory>
13#include <optional>
14
16
25template<typename T> class async_queue
26{
27public:
29 static const std::size_t channel_size{ 1024 };
30
36 explicit async_queue(const std::shared_ptr<boost::asio::io_context> &io_context)
37 : io_context_(io_context), channel_(*io_context_, channel_size), size_(0)
38 {}
39
40 async_queue(const async_queue &) = delete;
41 auto operator=(const async_queue &) -> async_queue & = delete;
43 auto operator=(async_queue &&) -> async_queue & = delete;
44 ~async_queue() = default;
45
51 auto push(T value) -> void
52 {
53 channel_.try_send(boost::system::error_code{}, std::move(value));
54 ++size_;
55 }
56
64 auto pop(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot = nullptr) -> boost::asio::awaitable<T>
65 {
66 boost::system::error_code err;
67
68 if (cancel_slot) {
69 auto val = co_await channel_.async_receive(boost::asio::bind_cancellation_slot(
70 *cancel_slot, boost::asio::redirect_error(boost::asio::use_awaitable, err)));
71 if (err) { throw boost::system::system_error(err); }
72 --size_;
73 co_return val;
74 } else {
75 auto val = co_await channel_.async_receive(boost::asio::redirect_error(boost::asio::use_awaitable, err));
76 if (err) { throw boost::system::system_error(err); }
77 --size_;
78 co_return val;
79 }
80 }
81
89 auto try_pop() -> std::optional<T>
90 {
91 T value;
92 bool received =
93 channel_.try_receive([&value](boost::system::error_code /*ec*/, T rx_value) { value = std::move(rx_value); });
94
95 if (received) {
96 --size_;
97 return value;
98 }
99 return std::nullopt;
100 }
101
107 [[nodiscard]] auto empty() const -> bool { return size_.load() == 0; }
108
114 [[nodiscard]] auto size() const -> std::size_t { return size_.load(); }
115
119 auto close() -> void { channel_.close(); }
120
121private:
122 std::shared_ptr<boost::asio::io_context> io_context_;
123 boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)> channel_;
124 std::atomic<std::size_t> size_;
125};
126
127}// namespace radix_relay::async
Thread-safe asynchronous queue for message passing between coroutines.
auto push(T value) -> void
Pushes a value onto the queue (non-blocking).
auto size() const -> std::size_t
Returns the current number of elements in the queue.
auto operator=(const async_queue &) -> async_queue &=delete
async_queue(async_queue &&)=delete
static const std::size_t channel_size
Maximum number of elements the queue can hold.
auto pop(std::shared_ptr< boost::asio::cancellation_slot > cancel_slot=nullptr) -> boost::asio::awaitable< T >
Asynchronously pops a value from the queue (coroutine).
auto try_pop() -> std::optional< T >
Attempts to pop a value without blocking.
auto operator=(async_queue &&) -> async_queue &=delete
async_queue(const std::shared_ptr< boost::asio::io_context > &io_context)
Constructs a new async queue.
auto close() -> void
Closes the queue, preventing further operations.
auto empty() const -> bool
Checks if the queue is empty.
async_queue(const async_queue &)=delete