Radix Relay
Hybrid mesh communications with Signal Protocol encryption
Loading...
Searching...
No Matches
transport.hpp
Go to the documentation of this file.
1#pragma once
2
5#include <core/events.hpp>
7
8#include <boost/asio.hpp>
9#include <boost/asio/experimental/channel_error.hpp>
10#include <spdlog/spdlog.h>
11
12#include <cstddef>
13#include <memory>
14#include <span>
15#include <stdexcept>
16#include <string>
17#include <string_view>
18#include <unordered_map>
19#include <variant>
20#include <vector>
21
22namespace radix_relay::nostr {
23
32template<concepts::websocket_stream WebSocketStream> struct transport
33{
42 transport(const std::shared_ptr<WebSocketStream> &websocket_stream,
43 const std::shared_ptr<boost::asio::io_context> &io_context,
44 const std::shared_ptr<async::async_queue<core::events::transport::in_t>> &in_queue,
45 const std::shared_ptr<async::async_queue<core::events::session_orchestrator::in_t>> &to_session_queue)
46 : ws_(websocket_stream), io_context_(io_context), in_queue_(in_queue), to_session_queue_(to_session_queue)
47 {}
48
50 {
51 in_queue_->close();
52 if (connected_) {
53 connected_ = false;
54 ws_->async_close([](const boost::system::error_code & /*error*/, std::size_t /*bytes*/) {});
55 }
56 }
57
58 transport(const transport &) = delete;
59 auto operator=(const transport &) -> transport & = delete;
60 transport(transport &&) = delete;
61 auto operator=(transport &&) -> transport & = delete;
62
69 // NOLINTNEXTLINE(performance-unnecessary-value-param)
70 auto run_once(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot = nullptr) -> boost::asio::awaitable<void>
71 {
72 auto cmd = co_await in_queue_->pop(cancel_slot);
73 std::visit([&](auto &&event) { handle(std::forward<decltype(event)>(event)); }, cmd);
74 co_return;
75 }
76
83 // NOLINTNEXTLINE(performance-unnecessary-value-param)
84 auto run(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot = nullptr) -> boost::asio::awaitable<void>
85 {
86 try {
87 while (true) { co_await run_once(cancel_slot); }
88 } catch (const boost::system::system_error &e) {
89 if (e.code() == boost::asio::error::operation_aborted
90 or e.code() == boost::asio::experimental::error::channel_cancelled
91 or e.code() == boost::asio::experimental::error::channel_closed) {
92 spdlog::debug("[transport] Cancelled, exiting run loop");
93 co_return;
94 } else {
95 spdlog::error("[transport] Unexpected error in run loop: {}", e.what());
96 throw;
97 }
98 }
99 }
100
101private:
102 bool connected_{ false };
103 std::shared_ptr<WebSocketStream> ws_;
104 std::shared_ptr<boost::asio::io_context> io_context_;
105 std::shared_ptr<async::async_queue<core::events::transport::in_t>> in_queue_;
106 std::shared_ptr<async::async_queue<core::events::session_orchestrator::in_t>> to_session_queue_;
107 static constexpr size_t read_buffer_size = 8192;
108 std::array<std::byte, read_buffer_size> read_buffer_{};
109 std::unordered_map<std::string, std::vector<std::byte>> pending_sends_;
110
111 std::string host_;
112 std::string port_;
113 std::string path_;
114
120 auto emit_event(core::events::session_orchestrator::in_t evt) -> void { to_session_queue_->push(std::move(evt)); }
121
128 auto parse_url(const std::string_view address) -> void
129 {
130 std::string addr_str(address);
131
132 port_ = "443";
133 path_ = "/";
134
135 static constexpr size_t wss_prefix_length = 6;
136
137 if (addr_str.starts_with("ws://")) {
138 throw std::runtime_error("Insecure WebSocket (ws://) not supported. Use wss:// for security.");
139 }
140 if (addr_str.starts_with("wss://")) {
141 addr_str = addr_str.substr(wss_prefix_length);
142 port_ = "443";
143 } else if (not addr_str.starts_with("http")) {
144 port_ = "443";
145 }
146
147 auto slash_pos = addr_str.find('/');
148 if (slash_pos != std::string::npos) {
149 host_ = addr_str.substr(0, slash_pos);
150 path_ = addr_str.substr(slash_pos);
151 } else {
152 host_ = addr_str;
153 }
154
155 auto colon_pos = host_.find(':');
156 if (colon_pos != std::string::npos) {
157 port_ = host_.substr(colon_pos + 1);
158 host_.resize(colon_pos);
159 }
160 }
161
165 auto start_read() -> void
166 {
167 ws_->async_read(
168 boost::asio::buffer(read_buffer_), [this](const boost::system::error_code &error, std::size_t bytes_transferred) {
169 process_read(error, bytes_transferred);
170 });
171 }
172
179 auto process_read(const boost::system::error_code &error, std::size_t bytes_transferred) -> void
180 {
181 if (not error and bytes_transferred > 0) {
182 std::vector<std::byte> bytes(
183 read_buffer_.begin(), read_buffer_.begin() + static_cast<std::ptrdiff_t>(bytes_transferred));
184
185 core::events::transport::bytes_received evt{ .bytes = bytes };
186 emit_event(std::move(evt));
187
188 start_read();
189 } else {
190 connected_ = false;
191 }
192 }
193
199 auto handle(const core::events::transport::connect &evt) noexcept -> void
200 {
201 try {
202 parse_url(evt.url);
203 } catch (const std::runtime_error &e) {
204 core::events::transport::connect_failed failed{
205 .url = evt.url, .error_message = e.what(), .type = core::events::transport_type::internet
206 };
207 emit_event(std::move(failed));
208 return;
209 }
210
211 ws_->async_connect({ .host = host_, .port = port_, .path = path_ },
212 [this, url = evt.url](const boost::system::error_code &error_code, std::size_t /*bytes*/) {
213 if (not error_code) {
214 connected_ = true;
215 start_read();
216 core::events::transport::connected connected_evt{ .url = url,
218 emit_event(std::move(connected_evt));
219 } else {
220 core::events::transport::connect_failed failed{
221 .url = url, .error_message = error_code.message(), .type = core::events::transport_type::internet
222 };
223 emit_event(std::move(failed));
224 }
225 });
226 }
227
233 auto handle(const core::events::transport::send &evt) noexcept -> void
234 {
235 if (not connected_) {
236 core::events::transport::send_failed failed{
237 .message_id = evt.message_id, .error_message = "Not connected", .type = core::events::transport_type::internet
238 };
239 emit_event(std::move(failed));
240 return;
241 }
242
243 std::shared_ptr<std::vector<std::byte>> data;
244 try {
245 data = std::make_shared<std::vector<std::byte>>(evt.bytes);
246 } catch (const std::bad_alloc &e) {
247 core::events::transport::send_failed failed{
248 .message_id = evt.message_id, .error_message = e.what(), .type = core::events::transport_type::internet
249 };
250 emit_event(std::move(failed));
251 return;
252 }
253
254 auto message_id = evt.message_id;
255
256 ws_->async_write(std::span<const std::byte>(*data),
257 [this, data, message_id](const boost::system::error_code &error, std::size_t bytes_transferred) {
258 if (error) {
259 spdlog::error("[transport] Write failed: {} (attempted {} bytes)", error.message(), data->size());
260 core::events::transport::send_failed failed{
261 .message_id = message_id, .error_message = error.message(), .type = core::events::transport_type::internet
262 };
263 emit_event(std::move(failed));
264 } else {
265 spdlog::trace("[transport] Wrote {} bytes", bytes_transferred);
266 core::events::transport::sent sent_evt{ .message_id = message_id,
268 emit_event(std::move(sent_evt));
269 }
270 });
271 }
272
278 auto handle(const core::events::transport::disconnect & /*evt*/) noexcept -> void
279 {
280 if (connected_) {
281 connected_ = false;
282 ws_->async_close([this](const boost::system::error_code & /*error*/, std::size_t /*bytes*/) {
283 core::events::transport::disconnected evt{ .type = core::events::transport_type::internet };
284 emit_event(evt);
285 });
286 } else {
287 core::events::transport::disconnected evt{ .type = core::events::transport_type::internet };
288 emit_event(evt);
289 }
290 }
291};
292
293}// namespace radix_relay::nostr
Thread-safe asynchronous queue for message passing between coroutines.
std::variant< send, publish_identity, unpublish_identity, trust, subscribe, subscribe_identities, subscribe_messages, list_identities, connect, transport::bytes_received, transport::connected, transport::connect_failed, transport::sent, transport::send_failed, transport::disconnected, bundle_announcement_received, bundle_announcement_removed > in_t
Variant of all input events to session orchestrator.
Definition events.hpp:369
Nostr WebSocket transport layer.
Definition transport.hpp:33
transport(const transport &)=delete
transport(const std::shared_ptr< WebSocketStream > &websocket_stream, const std::shared_ptr< boost::asio::io_context > &io_context, const std::shared_ptr< async::async_queue< core::events::transport::in_t > > &in_queue, const std::shared_ptr< async::async_queue< core::events::session_orchestrator::in_t > > &to_session_queue)
Constructs a Nostr transport.
Definition transport.hpp:42
auto run(std::shared_ptr< boost::asio::cancellation_slot > cancel_slot=nullptr) -> boost::asio::awaitable< void >
Continuously processes transport commands until cancelled.
Definition transport.hpp:84
auto operator=(const transport &) -> transport &=delete
auto operator=(transport &&) -> transport &=delete
auto run_once(std::shared_ptr< boost::asio::cancellation_slot > cancel_slot=nullptr) -> boost::asio::awaitable< void >
Processes a single transport command from the queue.
Definition transport.hpp:70
transport(transport &&)=delete