Radix Relay
Hybrid mesh communications with Signal Protocol encryption
Loading...
Searching...
No Matches
session_orchestrator.hpp
Go to the documentation of this file.
1#pragma once
2
4#include <bit>
5#include <boost/asio/bind_executor.hpp>
6#include <boost/asio/co_spawn.hpp>
7#include <boost/asio/detached.hpp>
8#include <boost/asio/experimental/channel_error.hpp>
9#include <boost/asio/io_context.hpp>
10#include <boost/asio/post.hpp>
11#include <boost/asio/strand.hpp>
12#include <chrono>
15#include <core/events.hpp>
17#include <cstddef>
18#include <memory>
19#include <nlohmann/json.hpp>
20#include <nostr/events.hpp>
22#include <nostr/protocol.hpp>
23#include <spdlog/spdlog.h>
24#include <variant>
25#include <vector>
26
27namespace radix_relay::nostr {
28
33{
34 std::string rdx_fingerprint;
35 std::string nostr_pubkey;
36 std::string bundle_base64;
37 std::string event_id;
38};
39
50template<concepts::signal_bridge Bridge, concepts::request_tracker Tracker>
51struct session_orchestrator : public std::enable_shared_from_this<session_orchestrator<Bridge, Tracker>>
52{
65 session_orchestrator(const std::shared_ptr<Bridge> &bridge,
66 const std::shared_ptr<Tracker> &tracker,
67 const std::shared_ptr<boost::asio::io_context> &io_context,
69 const std::shared_ptr<async::async_queue<core::events::transport::in_t>> &transport_out_queue,
70 const std::shared_ptr<async::async_queue<core::events::presentation_event_variant_t>> &presentation_out_queue,
71 const std::shared_ptr<async::async_queue<core::events::connection_monitor::in_t>> &connection_monitor_out_queue,
72 std::chrono::milliseconds timeout = std::chrono::seconds(15))
73 : bridge_(bridge), handler_(bridge_), tracker_(tracker), request_timeout_(timeout), io_context_(io_context),
74 in_queue_(in_queue), transport_out_queue_(transport_out_queue), presentation_out_queue_(presentation_out_queue),
75 connection_monitor_out_queue_(connection_monitor_out_queue)
76 {}
77
84 // NOLINTNEXTLINE(performance-unnecessary-value-param)
85 auto run_once(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot = nullptr) -> boost::asio::awaitable<void>
86 {
87 auto evt = co_await in_queue_->pop(cancel_slot);
88 std::visit([&](auto &&event) { handle(std::forward<decltype(event)>(event)); }, evt);
89 co_return;
90 }
91
98 // NOLINTNEXTLINE(performance-unnecessary-value-param)
99 auto run(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot = nullptr) -> boost::asio::awaitable<void>
100 {
101 try {
102 while (true) { co_await run_once(cancel_slot); }
103 } catch (const boost::system::system_error &e) {
104 if (e.code() == boost::asio::error::operation_aborted
105 or e.code() == boost::asio::experimental::error::channel_cancelled
106 or e.code() == boost::asio::experimental::error::channel_closed) {
107 spdlog::debug("[session_orchestrator] Cancelled, exiting run loop");
108 co_return;
109 } else {
110 spdlog::error("[session_orchestrator] Unexpected error in run loop: {}", e.what());
111 throw;
112 }
113 }
114 }
115
116private:
122 [[nodiscard]] auto get_discovered_bundles() const -> const std::vector<discovered_bundle> &
123 {
124 return discovered_bundles_;
125 }
126
127 std::shared_ptr<Bridge> bridge_;
129 std::shared_ptr<Tracker> tracker_;
130 std::chrono::milliseconds request_timeout_;
131 std::shared_ptr<boost::asio::io_context> io_context_;
132 std::shared_ptr<async::async_queue<core::events::session_orchestrator::in_t>> in_queue_;
133 std::shared_ptr<async::async_queue<core::events::transport::in_t>> transport_out_queue_;
134 std::shared_ptr<async::async_queue<core::events::presentation_event_variant_t>> presentation_out_queue_;
135 std::shared_ptr<async::async_queue<core::events::connection_monitor::in_t>> connection_monitor_out_queue_;
136 std::vector<discovered_bundle> discovered_bundles_;
137
143 auto emit_transport_event(core::events::transport::in_t evt) -> void { transport_out_queue_->push(std::move(evt)); }
144
150 auto emit_presentation_event(core::events::presentation_event_variant_t evt) -> void
151 {
152 presentation_out_queue_->push(std::move(evt));
153 }
154
160 auto emit_connection_monitor_event(core::events::connection_monitor::in_t evt) -> void
161 {
162 connection_monitor_out_queue_->push(std::move(evt));
163 }
164
170 auto handle(const core::events::send &cmd) -> void
171 {
172 boost::asio::co_spawn(
173 *io_context_,
174 // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
175 [self = this->shared_from_this(), cmd]() -> boost::asio::awaitable<void> {
176 auto [event_id, bytes] = self->handler_.handle(cmd);
177
178 core::events::transport::send transport_cmd{ .message_id = core::uuid_generator::generate(),
179 .bytes = std::move(bytes) };
180 self->emit_transport_event(transport_cmd);
181
182 try {
183 auto ok_response =
184 co_await self->tracker_->template async_track<nostr::protocol::ok>(event_id, self->request_timeout_);
185 self->emit_presentation_event(core::events::message_sent{ cmd.peer, event_id, ok_response.accepted });
186 } catch (const std::exception &) {
187 self->emit_presentation_event(
188 core::events::message_sent{ .peer = cmd.peer, .event_id = "", .accepted = false });
189 }
190 },
191 boost::asio::detached);
192 }
193
199 auto handle(const core::events::publish_identity &cmd) -> void
200 {
201 boost::asio::co_spawn(
202 *io_context_,
203 // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
204 [self = this->shared_from_this(), cmd]() -> boost::asio::awaitable<void> {
205 auto result = self->handler_.handle(cmd);
206
207 core::events::transport::send transport_cmd{ .message_id = core::uuid_generator::generate(),
208 .bytes = std::move(result.bytes) };
209 self->emit_transport_event(transport_cmd);
210
211 try {
212 auto ok_response =
213 co_await self->tracker_->template async_track<nostr::protocol::ok>(result.event_id, self->request_timeout_);
214 if (ok_response.accepted) {
215 self->bridge_->record_published_bundle(
216 result.pre_key_id, result.signed_pre_key_id, result.kyber_pre_key_id);
217 }
218 self->emit_presentation_event(
219 core::events::bundle_published{ .event_id = result.event_id, .accepted = ok_response.accepted });
220 } catch (const std::exception &e) {
221 spdlog::warn("[session_orchestrator] OK timeout for event: {} - {}", result.event_id, e.what());
222 self->emit_presentation_event(core::events::bundle_published{ .event_id = "", .accepted = false });
223 }
224 },
225 boost::asio::detached);
226 }
227
233 auto handle(const core::events::unpublish_identity &cmd) -> void
234 {
235 boost::asio::co_spawn(
236 *io_context_,
237 // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
238 [self = this->shared_from_this(), cmd]() -> boost::asio::awaitable<void> {
239 auto [event_id, bytes] = self->handler_.handle(cmd);
240
241 core::events::transport::send transport_cmd{ .message_id = core::uuid_generator::generate(),
242 .bytes = std::move(bytes) };
243 self->emit_transport_event(transport_cmd);
244
245 try {
246 auto ok_response =
247 co_await self->tracker_->template async_track<nostr::protocol::ok>(event_id, self->request_timeout_);
248 self->emit_presentation_event(
249 core::events::bundle_published{ .event_id = event_id, .accepted = ok_response.accepted });
250 } catch (const std::exception &e) {
251 spdlog::warn("[session_orchestrator] OK timeout for event: {} - {}", event_id, e.what());
252 self->emit_presentation_event(core::events::bundle_published{ .event_id = "", .accepted = false });
253 }
254 },
255 boost::asio::detached);
256 }
257
263 auto handle(const core::events::trust &cmd) -> void
264 {
265 bool contact_exists = false;
266 try {
267 std::ignore = bridge_->lookup_contact(cmd.peer);
268 contact_exists = true;
269 } catch (const std::exception &e) {
270 spdlog::debug("Contact {} not found, will check discovered bundles: {}", cmd.peer, e.what());
271 }
272
273 if (contact_exists) {
274 if (not cmd.alias.empty()) {
275 handler_.handle(cmd);
276 spdlog::info("Updated alias for existing contact: {}", cmd.peer);
277 }
278 return;
279 }
280
281 auto bundle_iter = std::ranges::find_if(
282 discovered_bundles_, [&cmd](const discovered_bundle &bundle) { return bundle.rdx_fingerprint == cmd.peer; });
283
284 if (bundle_iter != discovered_bundles_.end()) {
285 auto session_result =
286 handler_.handle(core::events::establish_session{ .bundle_data = bundle_iter->bundle_base64 });
287 if (session_result and not cmd.alias.empty()) { handler_.handle(cmd); }
288 if (session_result) { emit_presentation_event(*session_result); }
289 } else {
290 spdlog::error(
291 "Cannot establish session with {}: identity not found in discovered bundles and no existing contact", cmd.peer);
292 }
293 }
294
300 auto handle(const core::events::subscribe &cmd) -> void
301 {
302 boost::asio::co_spawn(
303 *io_context_,
304 // NOLINTNEXTLINE(cppcoreguidelines-avoid-capturing-lambda-coroutines)
305 [self = this->shared_from_this(), cmd]() -> boost::asio::awaitable<void> {
306 auto [subscription_id, bytes] = self->handler_.handle(cmd);
307
308 core::events::transport::send transport_cmd{ .message_id = core::uuid_generator::generate(),
309 .bytes = std::move(bytes) };
310 self->emit_transport_event(transport_cmd);
311
312 try {
313 auto eose = co_await self->tracker_->template async_track<nostr::protocol::eose>(
314 subscription_id, self->request_timeout_);
315 self->emit_presentation_event(core::events::subscription_established{ eose.subscription_id });
316 } catch (const std::exception &e) {
317 spdlog::warn("[session_orchestrator] EOSE timeout for subscription: {} - {}", subscription_id, e.what());
318 self->emit_presentation_event(core::events::subscription_established{ "" });
319 }
320 },
321 boost::asio::detached);
322 }
323
329 auto handle(const core::events::subscribe_identities & /*cmd*/) -> void
330 {
331 const auto subscription_id = core::uuid_generator::generate();
333
334 const auto kind_value = std::to_string(
335 static_cast<std::underlying_type_t<nostr::protocol::kind>>(nostr::protocol::kind::bundle_announcement));
336 const std::string subscription_json =
337 R"(["REQ",")" + subscription_id + R"(",{"kinds":[)" + kind_value + R"(],"#d":["radix_prekey_bundle_v1"]}])";
338 handle(core::events::subscribe{ .subscription_json = subscription_json });
339 }
340
346 auto handle(const core::events::bundle_announcement_received &event) -> void
347 {
348 auto rdx_fingerprint = bridge_->extract_rdx_from_bundle_base64(event.bundle_content);
349
350 auto existing = std::ranges::find_if(
351 discovered_bundles_, [&event](const auto &bundle) { return bundle.nostr_pubkey == event.pubkey; });
352
353 if (existing != discovered_bundles_.end()) {
354 existing->rdx_fingerprint = rdx_fingerprint;
355 existing->bundle_base64 = event.bundle_content;
356 existing->event_id = event.event_id;
357 } else {
358 discovered_bundles_.push_back(discovered_bundle{ .rdx_fingerprint = rdx_fingerprint,
359 .nostr_pubkey = event.pubkey,
360 .bundle_base64 = event.bundle_content,
361 .event_id = event.event_id });
362 }
363 }
364
370 auto handle(const core::events::bundle_announcement_removed &event) -> void
371 {
372 std::erase_if(discovered_bundles_, [&event](const auto &bundle) { return bundle.nostr_pubkey == event.pubkey; });
373 }
374
380 auto handle(const core::events::list_identities & /*cmd*/) -> void
381 {
382 std::vector<core::events::discovered_identity> identities;
383 identities.reserve(discovered_bundles_.size());
384 std::ranges::transform(discovered_bundles_, std::back_inserter(identities), [](const auto &bundle) {
385 return core::events::discovered_identity{
386 .rdx_fingerprint = bundle.rdx_fingerprint, .nostr_pubkey = bundle.nostr_pubkey, .event_id = bundle.event_id
387 };
388 });
389 emit_presentation_event(core::events::identities_listed{ .identities = std::move(identities) });
390 }
391
397 auto handle(const core::events::subscribe_messages & /*cmd*/) -> void
398 {
399 const auto subscription_id = core::uuid_generator::generate();
401
402 auto subscription_json = bridge_->create_subscription_for_self(subscription_id, 0);
403 handle(core::events::subscribe{ .subscription_json = subscription_json });
404 }
405
411 auto handle(const core::events::transport::bytes_received &evt) noexcept -> void
412 {
413 try {
414 std::string json_str;
415 json_str.resize(evt.bytes.size());
416 std::ranges::transform(evt.bytes, json_str.begin(), [](std::byte byte) { return std::bit_cast<char>(byte); });
417
418 try {
419 auto parsed = nlohmann::json::parse(json_str);
420 if (not parsed.is_array() or parsed.empty() or not parsed[0].is_string()) {
421 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
422 handler_.handle(evt_inner);
423 return;
424 }
425
426 const auto msg_type = parsed[0].get<std::string>();
427
428 if (msg_type == "OK") {
429 auto ok_msg = nostr::protocol::ok::deserialize(json_str);
430 if (ok_msg) {
431 tracker_->resolve(ok_msg->event_id, *ok_msg);
432 nostr::events::incoming::ok evt_inner{ *ok_msg };
433 handler_.handle(evt_inner);
434 } else {
435 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
436 handler_.handle(evt_inner);
437 }
438 } else if (msg_type == "EOSE") {
439 auto eose_msg = nostr::protocol::eose::deserialize(json_str);
440 if (eose_msg) {
441 tracker_->resolve(eose_msg->subscription_id, *eose_msg);
442 nostr::events::incoming::eose evt_inner{ *eose_msg };
443 handler_.handle(evt_inner);
444 } else {
445 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
446 handler_.handle(evt_inner);
447 }
448 } else if (msg_type == "EVENT" and parsed.size() >= 3) {
449 auto event_data = parsed[2];
450 auto kind_value = event_data["kind"].get<std::uint32_t>();
451
452 switch (static_cast<nostr::protocol::kind>(kind_value)) {
454 nostr::events::incoming::encrypted_message evt_inner{ nostr::protocol::event_data{ .id = event_data["id"],
455 .pubkey = event_data["pubkey"],
456 .created_at = event_data["created_at"],
457 .kind = event_data["kind"],
458 .tags = event_data["tags"],
459 .content = event_data["content"],
460 .sig = event_data["sig"] } };
461
462 if (auto result = handler_.handle(evt_inner)) {
463 emit_presentation_event(*result);
464 if (result->should_republish_bundle) { handle(core::events::publish_identity{}); }
465 }
466 break;
467 }
469 nostr::events::incoming::bundle_announcement evt_inner{ nostr::protocol::event_data{ .id = event_data["id"],
470 .pubkey = event_data["pubkey"],
471 .created_at = event_data["created_at"],
472 .kind = event_data["kind"],
473 .tags = event_data["tags"],
474 .content = event_data["content"],
475 .sig = event_data["sig"] } };
476
477 if (auto result = handler_.handle(evt_inner)) {
478 std::visit(
479 [this](auto &&inner_evt) {
480 handle(inner_evt);
481 emit_presentation_event(inner_evt);
482 },
483 *result);
484 }
485 break;
486 }
488 nostr::events::incoming::identity_announcement evt_inner{ nostr::protocol::event_data{
489 .id = event_data["id"],
490 .pubkey = event_data["pubkey"],
491 .created_at = event_data["created_at"],
492 .kind = event_data["kind"],
493 .tags = event_data["tags"],
494 .content = event_data["content"],
495 .sig = event_data["sig"] } };
496
497 handler_.handle(evt_inner);
498 break;
499 }
501 nostr::events::incoming::session_request evt_inner{ nostr::protocol::event_data{ .id = event_data["id"],
502 .pubkey = event_data["pubkey"],
503 .created_at = event_data["created_at"],
504 .kind = event_data["kind"],
505 .tags = event_data["tags"],
506 .content = event_data["content"],
507 .sig = event_data["sig"] } };
508
509 handler_.handle(evt_inner);
510 break;
511 }
513 nostr::events::incoming::node_status evt_inner{ nostr::protocol::event_data{ .id = event_data["id"],
514 .pubkey = event_data["pubkey"],
515 .created_at = event_data["created_at"],
516 .kind = event_data["kind"],
517 .tags = event_data["tags"],
518 .content = event_data["content"],
519 .sig = event_data["sig"] } };
520
521 handler_.handle(evt_inner);
522 break;
523 }
524 default: {
525 nostr::events::incoming::unknown_message evt_inner{ nostr::protocol::event_data{ .id = event_data["id"],
526 .pubkey = event_data["pubkey"],
527 .created_at = event_data["created_at"],
528 .kind = event_data["kind"],
529 .tags = event_data["tags"],
530 .content = event_data["content"],
531 .sig = event_data["sig"] } };
532
533 handler_.handle(evt_inner);
534 break;
535 }
536 }
537 } else {
538 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
539 handler_.handle(evt_inner);
540 }
541 } catch (const std::exception &e) {
542 std::string error_msg(e.what());
543 if (error_msg.find("old counter") != std::string::npos
544 or error_msg.find("message with old") != std::string::npos) {
545 try {
546 auto parsed = nlohmann::json::parse(json_str);
547 if (parsed.is_array() and parsed.size() >= 3 and parsed[2].contains("pubkey")
548 and parsed[2].contains("id")) {
549 spdlog::debug("[session_orchestrator] Ignored duplicate message from {} (event: {})",
550 parsed[2]["pubkey"].get<std::string>().substr(0, 16),
551 parsed[2]["id"].get<std::string>().substr(0, 16));
552 } else {
553 spdlog::debug("[session_orchestrator] Ignored duplicate message: {}", error_msg);
554 }
555 } catch (...) {
556 spdlog::debug("[session_orchestrator] Ignored duplicate message: {}", error_msg);
557 }
558 } else {
559 spdlog::warn("[session_orchestrator] Failed to parse message: {} - Raw: {}", error_msg, json_str);
560 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
561 handler_.handle(evt_inner);
562 }
563 }
564 } catch (const std::bad_alloc &e) {
565 spdlog::error("[session_orchestrator] Failed to process bytes_received event: {}", e.what());
566 }
567 }
568
574 auto handle(const core::events::connect &evt) -> void
575 {
576 spdlog::info("[session_orchestrator] Connecting to relay: {}", evt.relay);
577 emit_transport_event(core::events::transport::connect{ .url = evt.relay });
578 }
579
585 auto handle(const core::events::transport::connected &evt) -> void
586 {
587 emit_connection_monitor_event(evt);
588
589 spdlog::info("[session_orchestrator] Transport connected, performing key maintenance");
590 auto maintenance_result = bridge_->perform_key_maintenance();
591
592 if (maintenance_result.signed_pre_key_rotated or maintenance_result.kyber_pre_key_rotated) {
593 spdlog::info("[session_orchestrator] Keys rotated, republishing bundle");
594 handle(core::events::publish_identity{});
595 }
596
597 spdlog::info("[session_orchestrator] Subscribing to identities and messages");
598 handle(core::events::subscribe_identities{});
599 handle(core::events::subscribe_messages{});
600 }
601
607 auto handle(const core::events::transport::connect_failed &evt) -> void
608 {
609 emit_connection_monitor_event(evt);
610
611 spdlog::error("[session_orchestrator] Transport connect failed: {}", evt.error_message);
612 std::ignore = bridge_;
613 }
614
620 auto handle(const core::events::transport::sent & /*evt*/) -> void
621 {
622 spdlog::debug("[session_orchestrator] Transport sent");
623 std::ignore = bridge_;
624 }
625
631 auto handle(const core::events::transport::send_failed &evt) -> void
632 {
633 emit_connection_monitor_event(evt);
634
635 spdlog::error("[session_orchestrator] Transport send failed: {}", evt.error_message);
636 std::ignore = bridge_;
637 }
638
644 auto handle(const core::events::transport::disconnected &evt) -> void
645 {
646 emit_connection_monitor_event(evt);
647
648 spdlog::info("[session_orchestrator] Transport disconnected");
649 std::ignore = bridge_;
650 }
651};
652
653}// namespace radix_relay::nostr
Thread-safe asynchronous queue for message passing between coroutines.
static auto generate() -> std::string
Generates a new UUID string.
Handles processing of incoming and outgoing Nostr messages.
auto handle(const nostr::events::incoming::encrypted_message &event) -> std::optional< core::events::message_received >
Handles an incoming encrypted message event.
std::variant< transport::connected, transport::connect_failed, transport::disconnected, transport::send_failed, query_status > in_t
Variant type for connection monitor input events.
Definition events.hpp:302
std::variant< connect, send, disconnect > in_t
Variant type for transport input events.
Definition events.hpp:287
std::variant< message_received, session_established, bundle_announcement_received, bundle_announcement_removed, message_sent, bundle_published, subscription_established, identities_listed > presentation_event_variant_t
Variant type for presentation events.
Definition events.hpp:333
auto validate_subscription_id(const std::string &subscription_id) -> void
Validates a subscription ID.
Definition protocol.hpp:250
kind
Nostr event kind identifiers.
Definition protocol.hpp:21
@ identity_announcement
Radix: Node identity announcement.
@ node_status
Radix: Node status update.
@ bundle_announcement
Radix: Signal Protocol prekey bundle.
@ session_request
Radix: Session establishment request.
@ encrypted_message
Radix: Encrypted message via Signal Protocol.
Information about a discovered prekey bundle.
std::string bundle_base64
Base64-encoded bundle data.
std::string nostr_pubkey
Nostr public key.
std::string rdx_fingerprint
RDX fingerprint from bundle.
static auto deserialize(const std::string &json) -> std::optional< eose >
Deserializes EOSE message from JSON.
static auto deserialize(const std::string &json) -> std::optional< ok >
Deserializes OK message from JSON.
Orchestrates Nostr sessions, message handling, and bundle management.
auto run(std::shared_ptr< boost::asio::cancellation_slot > cancel_slot=nullptr) -> boost::asio::awaitable< void >
Continuously processes events until cancelled.
auto run_once(std::shared_ptr< boost::asio::cancellation_slot > cancel_slot=nullptr) -> boost::asio::awaitable< void >
Processes a single event from the queue.
session_orchestrator(const std::shared_ptr< Bridge > &bridge, const std::shared_ptr< Tracker > &tracker, const std::shared_ptr< boost::asio::io_context > &io_context, const std::shared_ptr< async::async_queue< core::events::session_orchestrator::in_t > > &in_queue, const std::shared_ptr< async::async_queue< core::events::transport::in_t > > &transport_out_queue, const std::shared_ptr< async::async_queue< core::events::presentation_event_variant_t > > &presentation_out_queue, const std::shared_ptr< async::async_queue< core::events::connection_monitor::in_t > > &connection_monitor_out_queue, std::chrono::milliseconds timeout=std::chrono::seconds(15))
Constructs a session orchestrator.