4#include <boost/asio/awaitable.hpp>
5#include <boost/asio/bind_cancellation_slot.hpp>
6#include <boost/asio/cancellation_signal.hpp>
7#include <boost/asio/experimental/concurrent_channel.hpp>
8#include <boost/asio/io_context.hpp>
9#include <boost/asio/redirect_error.hpp>
10#include <boost/asio/use_awaitable.hpp>
36 explicit async_queue(
const std::shared_ptr<boost::asio::io_context> &io_context)
37 : io_context_(io_context), channel_(*io_context_,
channel_size), size_(0)
53 channel_.try_send(boost::system::error_code{}, std::move(value));
64 auto pop(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot =
nullptr) -> boost::asio::awaitable<T>
66 boost::system::error_code err;
69 auto val =
co_await channel_.async_receive(boost::asio::bind_cancellation_slot(
70 *cancel_slot, boost::asio::redirect_error(boost::asio::use_awaitable, err)));
71 if (err) {
throw boost::system::system_error(err); }
75 auto val =
co_await channel_.async_receive(boost::asio::redirect_error(boost::asio::use_awaitable, err));
76 if (err) {
throw boost::system::system_error(err); }
93 channel_.try_receive([&value](boost::system::error_code , T rx_value) { value = std::move(rx_value); });
107 [[nodiscard]]
auto empty() const ->
bool {
return size_.load() == 0; }
114 [[nodiscard]]
auto size() const -> std::
size_t {
return size_.load(); }
119 auto close() ->
void { channel_.close(); }
122 std::shared_ptr<boost::asio::io_context> io_context_;
123 boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)> channel_;
124 std::atomic<std::size_t> size_;
Thread-safe asynchronous queue for message passing between coroutines.
auto push(T value) -> void
Pushes a value onto the queue (non-blocking).
auto size() const -> std::size_t
Returns the current number of elements in the queue.
auto operator=(const async_queue &) -> async_queue &=delete
async_queue(async_queue &&)=delete
static const std::size_t channel_size
Maximum number of elements the queue can hold.
auto pop(std::shared_ptr< boost::asio::cancellation_slot > cancel_slot=nullptr) -> boost::asio::awaitable< T >
Asynchronously pops a value from the queue (coroutine).
auto try_pop() -> std::optional< T >
Attempts to pop a value without blocking.
auto operator=(async_queue &&) -> async_queue &=delete
async_queue(const std::shared_ptr< boost::asio::io_context > &io_context)
Constructs a new async queue.
auto close() -> void
Closes the queue, preventing further operations.
auto empty() const -> bool
Checks if the queue is empty.
async_queue(const async_queue &)=delete