32template<concepts::websocket_stream WebSocketStream>
struct transport
42 transport(
const std::shared_ptr<WebSocketStream> &websocket_stream,
43 const std::shared_ptr<boost::asio::io_context> &io_context,
46 : ws_(websocket_stream), io_context_(io_context), in_queue_(in_queue), to_session_queue_(to_session_queue)
54 ws_->async_close([](
const boost::system::error_code & , std::size_t ) {});
70 auto run_once(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot =
nullptr) -> boost::asio::awaitable<void>
72 auto cmd =
co_await in_queue_->pop(cancel_slot);
73 std::visit([&](
auto &&event) { handle(std::forward<
decltype(event)>(event)); }, cmd);
84 auto run(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot =
nullptr) -> boost::asio::awaitable<void>
87 while (
true) {
co_await run_once(cancel_slot); }
88 }
catch (
const boost::system::system_error &e) {
89 if (e.code() == boost::asio::error::operation_aborted
90 or e.code() == boost::asio::experimental::error::channel_cancelled
91 or e.code() == boost::asio::experimental::error::channel_closed) {
92 spdlog::debug(
"[transport] Cancelled, exiting run loop");
95 spdlog::error(
"[transport] Unexpected error in run loop: {}", e.what());
102 bool connected_{
false };
103 std::shared_ptr<WebSocketStream> ws_;
104 std::shared_ptr<boost::asio::io_context> io_context_;
105 std::shared_ptr<async::async_queue<core::events::transport::in_t>> in_queue_;
106 std::shared_ptr<async::async_queue<core::events::session_orchestrator::in_t>> to_session_queue_;
107 static constexpr size_t read_buffer_size = 8192;
108 std::array<std::byte, read_buffer_size> read_buffer_{};
109 std::unordered_map<std::string, std::vector<std::byte>> pending_sends_;
128 auto parse_url(
const std::string_view address) ->
void
130 std::string addr_str(address);
135 static constexpr size_t wss_prefix_length = 6;
137 if (addr_str.starts_with(
"ws://")) {
138 throw std::runtime_error(
"Insecure WebSocket (ws://) not supported. Use wss:// for security.");
140 if (addr_str.starts_with(
"wss://")) {
141 addr_str = addr_str.substr(wss_prefix_length);
143 }
else if (not addr_str.starts_with(
"http")) {
147 auto slash_pos = addr_str.find(
'/');
148 if (slash_pos != std::string::npos) {
149 host_ = addr_str.substr(0, slash_pos);
150 path_ = addr_str.substr(slash_pos);
155 auto colon_pos = host_.find(
':');
156 if (colon_pos != std::string::npos) {
157 port_ = host_.substr(colon_pos + 1);
158 host_.resize(colon_pos);
165 auto start_read() ->
void
168 boost::asio::buffer(read_buffer_), [
this](
const boost::system::error_code &error, std::size_t bytes_transferred) {
169 process_read(error, bytes_transferred);
179 auto process_read(
const boost::system::error_code &error, std::size_t bytes_transferred) ->
void
181 if (not error and bytes_transferred > 0) {
182 std::vector<std::byte> bytes(
183 read_buffer_.begin(), read_buffer_.begin() +
static_cast<std::ptrdiff_t
>(bytes_transferred));
185 core::events::transport::bytes_received evt{ .bytes = bytes };
186 emit_event(std::move(evt));
199 auto handle(
const core::events::transport::connect &evt)
noexcept ->
void
203 }
catch (
const std::runtime_error &e) {
204 core::events::transport::connect_failed failed{
207 emit_event(std::move(failed));
211 ws_->async_connect({ .host = host_, .port = port_, .path = path_ },
212 [
this, url = evt.url](
const boost::system::error_code &error_code, std::size_t ) {
213 if (not error_code) {
216 core::events::transport::connected connected_evt{ .url = url,
218 emit_event(std::move(connected_evt));
220 core::events::transport::connect_failed failed{
223 emit_event(std::move(failed));
233 auto handle(
const core::events::transport::send &evt)
noexcept ->
void
235 if (not connected_) {
236 core::events::transport::send_failed failed{
239 emit_event(std::move(failed));
243 std::shared_ptr<std::vector<std::byte>> data;
245 data = std::make_shared<std::vector<std::byte>>(evt.bytes);
246 }
catch (
const std::bad_alloc &e) {
247 core::events::transport::send_failed failed{
250 emit_event(std::move(failed));
254 auto message_id = evt.message_id;
256 ws_->async_write(std::span<const std::byte>(*data),
257 [
this, data, message_id](
const boost::system::error_code &error, std::size_t bytes_transferred) {
259 spdlog::error(
"[transport] Write failed: {} (attempted {} bytes)", error.message(), data->size());
260 core::events::transport::send_failed failed{
263 emit_event(std::move(failed));
265 spdlog::trace(
"[transport] Wrote {} bytes", bytes_transferred);
266 core::events::transport::sent sent_evt{ .message_id = message_id,
268 emit_event(std::move(sent_evt));
278 auto handle(
const core::events::transport::disconnect & )
noexcept ->
void
282 ws_->async_close([
this](
const boost::system::error_code & , std::size_t ) {
std::variant< send, publish_identity, unpublish_identity, trust, subscribe, subscribe_identities, subscribe_messages, list_identities, connect, transport::bytes_received, transport::connected, transport::connect_failed, transport::sent, transport::send_failed, transport::disconnected, bundle_announcement_received, bundle_announcement_removed > in_t
Variant of all input events to session orchestrator.