66 const std::shared_ptr<Tracker> &tracker,
67 const std::shared_ptr<boost::asio::io_context> &io_context,
72 std::chrono::milliseconds timeout = std::chrono::seconds(15))
73 : bridge_(bridge), handler_(bridge_), tracker_(tracker), request_timeout_(timeout), io_context_(io_context),
74 in_queue_(in_queue), transport_out_queue_(transport_out_queue), presentation_out_queue_(presentation_out_queue),
75 connection_monitor_out_queue_(connection_monitor_out_queue)
85 auto run_once(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot =
nullptr) -> boost::asio::awaitable<void>
87 auto evt =
co_await in_queue_->pop(cancel_slot);
88 std::visit([&](
auto &&event) { handle(std::forward<
decltype(event)>(event)); }, evt);
99 auto run(std::shared_ptr<boost::asio::cancellation_slot> cancel_slot =
nullptr) -> boost::asio::awaitable<void>
102 while (
true) {
co_await run_once(cancel_slot); }
103 }
catch (
const boost::system::system_error &e) {
104 if (e.code() == boost::asio::error::operation_aborted
105 or e.code() == boost::asio::experimental::error::channel_cancelled
106 or e.code() == boost::asio::experimental::error::channel_closed) {
107 spdlog::debug(
"[session_orchestrator] Cancelled, exiting run loop");
110 spdlog::error(
"[session_orchestrator] Unexpected error in run loop: {}", e.what());
122 [[nodiscard]]
auto get_discovered_bundles() const -> const std::vector<
discovered_bundle> &
124 return discovered_bundles_;
127 std::shared_ptr<Bridge> bridge_;
129 std::shared_ptr<Tracker> tracker_;
130 std::chrono::milliseconds request_timeout_;
131 std::shared_ptr<boost::asio::io_context> io_context_;
132 std::shared_ptr<async::async_queue<core::events::session_orchestrator::in_t>> in_queue_;
133 std::shared_ptr<async::async_queue<core::events::transport::in_t>> transport_out_queue_;
134 std::shared_ptr<async::async_queue<core::events::presentation_event_variant_t>> presentation_out_queue_;
135 std::shared_ptr<async::async_queue<core::events::connection_monitor::in_t>> connection_monitor_out_queue_;
136 std::vector<discovered_bundle> discovered_bundles_;
152 presentation_out_queue_->push(std::move(evt));
162 connection_monitor_out_queue_->push(std::move(evt));
170 auto handle(
const core::events::send &cmd) ->
void
172 boost::asio::co_spawn(
175 [self = this->shared_from_this(), cmd]() -> boost::asio::awaitable<void> {
176 auto [event_id, bytes] = self->handler_.handle(cmd);
179 .bytes = std::move(bytes) };
180 self->emit_transport_event(transport_cmd);
184 co_await self->tracker_->template async_track<nostr::protocol::ok>(event_id, self->request_timeout_);
185 self->emit_presentation_event(core::events::message_sent{ cmd.peer, event_id, ok_response.accepted });
186 }
catch (
const std::exception &) {
187 self->emit_presentation_event(
188 core::events::message_sent{ .peer = cmd.peer, .event_id =
"", .accepted =
false });
191 boost::asio::detached);
199 auto handle(
const core::events::publish_identity &cmd) ->
void
201 boost::asio::co_spawn(
204 [self = this->shared_from_this(), cmd]() -> boost::asio::awaitable<void> {
205 auto result = self->handler_.handle(cmd);
208 .bytes = std::move(result.bytes) };
209 self->emit_transport_event(transport_cmd);
213 co_await self->tracker_->template async_track<nostr::protocol::ok>(result.event_id, self->request_timeout_);
214 if (ok_response.accepted) {
215 self->bridge_->record_published_bundle(
216 result.pre_key_id, result.signed_pre_key_id, result.kyber_pre_key_id);
218 self->emit_presentation_event(
219 core::events::bundle_published{ .event_id = result.event_id, .accepted = ok_response.accepted });
220 }
catch (
const std::exception &e) {
221 spdlog::warn(
"[session_orchestrator] OK timeout for event: {} - {}", result.event_id, e.what());
222 self->emit_presentation_event(core::events::bundle_published{ .event_id =
"", .accepted =
false });
225 boost::asio::detached);
233 auto handle(
const core::events::unpublish_identity &cmd) ->
void
235 boost::asio::co_spawn(
238 [self = this->shared_from_this(), cmd]() -> boost::asio::awaitable<void> {
239 auto [event_id, bytes] = self->handler_.handle(cmd);
242 .bytes = std::move(bytes) };
243 self->emit_transport_event(transport_cmd);
247 co_await self->tracker_->template async_track<nostr::protocol::ok>(event_id, self->request_timeout_);
248 self->emit_presentation_event(
249 core::events::bundle_published{ .event_id = event_id, .accepted = ok_response.accepted });
250 }
catch (
const std::exception &e) {
251 spdlog::warn(
"[session_orchestrator] OK timeout for event: {} - {}", event_id, e.what());
252 self->emit_presentation_event(core::events::bundle_published{ .event_id =
"", .accepted =
false });
255 boost::asio::detached);
263 auto handle(
const core::events::trust &cmd) ->
void
265 bool contact_exists =
false;
267 std::ignore = bridge_->lookup_contact(cmd.peer);
268 contact_exists =
true;
269 }
catch (
const std::exception &e) {
270 spdlog::debug(
"Contact {} not found, will check discovered bundles: {}", cmd.peer, e.what());
273 if (contact_exists) {
274 if (not cmd.alias.empty()) {
276 spdlog::info(
"Updated alias for existing contact: {}", cmd.peer);
281 auto bundle_iter = std::ranges::find_if(
282 discovered_bundles_, [&cmd](
const discovered_bundle &bundle) {
return bundle.rdx_fingerprint == cmd.peer; });
284 if (bundle_iter != discovered_bundles_.end()) {
285 auto session_result =
286 handler_.
handle(core::events::establish_session{ .bundle_data = bundle_iter->bundle_base64 });
287 if (session_result and not cmd.alias.empty()) { handler_.
handle(cmd); }
288 if (session_result) { emit_presentation_event(*session_result); }
291 "Cannot establish session with {}: identity not found in discovered bundles and no existing contact", cmd.peer);
300 auto handle(
const core::events::subscribe &cmd) ->
void
302 boost::asio::co_spawn(
305 [self = this->shared_from_this(), cmd]() -> boost::asio::awaitable<void> {
306 auto [subscription_id, bytes] = self->handler_.handle(cmd);
309 .bytes = std::move(bytes) };
310 self->emit_transport_event(transport_cmd);
313 auto eose =
co_await self->tracker_->template async_track<nostr::protocol::eose>(
314 subscription_id, self->request_timeout_);
315 self->emit_presentation_event(core::events::subscription_established{ eose.subscription_id });
316 }
catch (
const std::exception &e) {
317 spdlog::warn(
"[session_orchestrator] EOSE timeout for subscription: {} - {}", subscription_id, e.what());
318 self->emit_presentation_event(core::events::subscription_established{
"" });
321 boost::asio::detached);
329 auto handle(
const core::events::subscribe_identities & ) ->
void
334 const auto kind_value = std::to_string(
336 const std::string subscription_json =
337 R
"(["REQ",")" + subscription_id + R"(",{"kinds":[)" + kind_value + R"(],"#d":["radix_prekey_bundle_v1"]}])";
338 handle(core::events::subscribe{ .subscription_json = subscription_json });
346 auto handle(
const core::events::bundle_announcement_received &event) ->
void
348 auto rdx_fingerprint = bridge_->extract_rdx_from_bundle_base64(event.bundle_content);
350 auto existing = std::ranges::find_if(
351 discovered_bundles_, [&event](
const auto &bundle) {
return bundle.nostr_pubkey ==
event.pubkey; });
353 if (existing != discovered_bundles_.end()) {
354 existing->rdx_fingerprint = rdx_fingerprint;
355 existing->bundle_base64 =
event.bundle_content;
356 existing->event_id =
event.event_id;
358 discovered_bundles_.push_back(discovered_bundle{ .rdx_fingerprint = rdx_fingerprint,
359 .nostr_pubkey =
event.pubkey,
360 .bundle_base64 =
event.bundle_content,
361 .event_id =
event.event_id });
370 auto handle(
const core::events::bundle_announcement_removed &event) ->
void
372 std::erase_if(discovered_bundles_, [&event](
const auto &bundle) {
return bundle.nostr_pubkey ==
event.pubkey; });
380 auto handle(
const core::events::list_identities & ) ->
void
382 std::vector<core::events::discovered_identity> identities;
383 identities.reserve(discovered_bundles_.size());
384 std::ranges::transform(discovered_bundles_, std::back_inserter(identities), [](
const auto &bundle) {
385 return core::events::discovered_identity{
386 .rdx_fingerprint = bundle.rdx_fingerprint, .nostr_pubkey = bundle.nostr_pubkey, .event_id = bundle.event_id
389 emit_presentation_event(core::events::identities_listed{ .identities = std::move(identities) });
397 auto handle(
const core::events::subscribe_messages & ) ->
void
402 auto subscription_json = bridge_->create_subscription_for_self(subscription_id, 0);
403 handle(core::events::subscribe{ .subscription_json = subscription_json });
411 auto handle(
const core::events::transport::bytes_received &evt)
noexcept ->
void
414 std::string json_str;
415 json_str.resize(evt.bytes.size());
416 std::ranges::transform(evt.bytes, json_str.begin(), [](std::byte
byte) { return std::bit_cast<char>(byte); });
419 auto parsed = nlohmann::json::parse(json_str);
420 if (not parsed.is_array() or parsed.empty() or not parsed[0].is_string()) {
421 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
422 handler_.
handle(evt_inner);
426 const auto msg_type = parsed[0].get<std::string>();
428 if (msg_type ==
"OK") {
431 tracker_->resolve(ok_msg->event_id, *ok_msg);
432 nostr::events::incoming::ok evt_inner{ *ok_msg };
433 handler_.
handle(evt_inner);
435 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
436 handler_.
handle(evt_inner);
438 }
else if (msg_type ==
"EOSE") {
441 tracker_->resolve(eose_msg->subscription_id, *eose_msg);
442 nostr::events::incoming::eose evt_inner{ *eose_msg };
443 handler_.
handle(evt_inner);
445 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
446 handler_.
handle(evt_inner);
448 }
else if (msg_type ==
"EVENT" and parsed.size() >= 3) {
449 auto event_data = parsed[2];
450 auto kind_value = event_data[
"kind"].get<std::uint32_t>();
454 nostr::events::incoming::encrypted_message evt_inner{ nostr::protocol::event_data{ .id = event_data[
"id"],
455 .pubkey = event_data[
"pubkey"],
456 .created_at = event_data[
"created_at"],
457 .kind = event_data[
"kind"],
458 .tags = event_data[
"tags"],
459 .content = event_data[
"content"],
460 .sig = event_data[
"sig"] } };
462 if (
auto result = handler_.
handle(evt_inner)) {
463 emit_presentation_event(*result);
464 if (result->should_republish_bundle) { handle(core::events::publish_identity{}); }
469 nostr::events::incoming::bundle_announcement evt_inner{ nostr::protocol::event_data{ .id = event_data[
"id"],
470 .pubkey = event_data[
"pubkey"],
471 .created_at = event_data[
"created_at"],
472 .kind = event_data[
"kind"],
473 .tags = event_data[
"tags"],
474 .content = event_data[
"content"],
475 .sig = event_data[
"sig"] } };
477 if (
auto result = handler_.
handle(evt_inner)) {
479 [
this](
auto &&inner_evt) {
481 emit_presentation_event(inner_evt);
488 nostr::events::incoming::identity_announcement evt_inner{ nostr::protocol::event_data{
489 .id = event_data[
"id"],
490 .pubkey = event_data[
"pubkey"],
491 .created_at = event_data[
"created_at"],
492 .kind = event_data[
"kind"],
493 .tags = event_data[
"tags"],
494 .content = event_data[
"content"],
495 .sig = event_data[
"sig"] } };
497 handler_.
handle(evt_inner);
501 nostr::events::incoming::session_request evt_inner{ nostr::protocol::event_data{ .id = event_data[
"id"],
502 .pubkey = event_data[
"pubkey"],
503 .created_at = event_data[
"created_at"],
504 .kind = event_data[
"kind"],
505 .tags = event_data[
"tags"],
506 .content = event_data[
"content"],
507 .sig = event_data[
"sig"] } };
509 handler_.
handle(evt_inner);
513 nostr::events::incoming::node_status evt_inner{ nostr::protocol::event_data{ .id = event_data[
"id"],
514 .pubkey = event_data[
"pubkey"],
515 .created_at = event_data[
"created_at"],
516 .kind = event_data[
"kind"],
517 .tags = event_data[
"tags"],
518 .content = event_data[
"content"],
519 .sig = event_data[
"sig"] } };
521 handler_.
handle(evt_inner);
525 nostr::events::incoming::unknown_message evt_inner{ nostr::protocol::event_data{ .id = event_data[
"id"],
526 .pubkey = event_data[
"pubkey"],
527 .created_at = event_data[
"created_at"],
528 .kind = event_data[
"kind"],
529 .tags = event_data[
"tags"],
530 .content = event_data[
"content"],
531 .sig = event_data[
"sig"] } };
533 handler_.
handle(evt_inner);
538 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
539 handler_.
handle(evt_inner);
541 }
catch (
const std::exception &e) {
542 std::string error_msg(e.what());
543 if (error_msg.find(
"old counter") != std::string::npos
544 or error_msg.find(
"message with old") != std::string::npos) {
546 auto parsed = nlohmann::json::parse(json_str);
547 if (parsed.is_array() and parsed.size() >= 3 and parsed[2].contains(
"pubkey")
548 and parsed[2].contains(
"id")) {
549 spdlog::debug(
"[session_orchestrator] Ignored duplicate message from {} (event: {})",
550 parsed[2][
"pubkey"].get<std::string>().substr(0, 16),
551 parsed[2][
"id"].get<std::string>().substr(0, 16));
553 spdlog::debug(
"[session_orchestrator] Ignored duplicate message: {}", error_msg);
556 spdlog::debug(
"[session_orchestrator] Ignored duplicate message: {}", error_msg);
559 spdlog::warn(
"[session_orchestrator] Failed to parse message: {} - Raw: {}", error_msg, json_str);
560 nostr::events::incoming::unknown_protocol evt_inner{ json_str };
561 handler_.
handle(evt_inner);
564 }
catch (
const std::bad_alloc &e) {
565 spdlog::error(
"[session_orchestrator] Failed to process bytes_received event: {}", e.what());
574 auto handle(
const core::events::connect &evt) ->
void
576 spdlog::info(
"[session_orchestrator] Connecting to relay: {}", evt.relay);
577 emit_transport_event(core::events::transport::connect{ .url = evt.relay });
585 auto handle(
const core::events::transport::connected &evt) ->
void
587 emit_connection_monitor_event(evt);
589 spdlog::info(
"[session_orchestrator] Transport connected, performing key maintenance");
590 auto maintenance_result = bridge_->perform_key_maintenance();
592 if (maintenance_result.signed_pre_key_rotated or maintenance_result.kyber_pre_key_rotated) {
593 spdlog::info(
"[session_orchestrator] Keys rotated, republishing bundle");
594 handle(core::events::publish_identity{});
597 spdlog::info(
"[session_orchestrator] Subscribing to identities and messages");
598 handle(core::events::subscribe_identities{});
599 handle(core::events::subscribe_messages{});
607 auto handle(
const core::events::transport::connect_failed &evt) ->
void
609 emit_connection_monitor_event(evt);
611 spdlog::error(
"[session_orchestrator] Transport connect failed: {}", evt.error_message);
612 std::ignore = bridge_;
620 auto handle(
const core::events::transport::sent & ) ->
void
622 spdlog::debug(
"[session_orchestrator] Transport sent");
623 std::ignore = bridge_;
631 auto handle(
const core::events::transport::send_failed &evt) ->
void
633 emit_connection_monitor_event(evt);
635 spdlog::error(
"[session_orchestrator] Transport send failed: {}", evt.error_message);
636 std::ignore = bridge_;
644 auto handle(
const core::events::transport::disconnected &evt) ->
void
646 emit_connection_monitor_event(evt);
648 spdlog::info(
"[session_orchestrator] Transport disconnected");
649 std::ignore = bridge_;