Radix Relay
Hybrid mesh communications with Signal Protocol encryption
Loading...
Searching...
No Matches
processor.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <algorithm>
5#include <atomic>
7#include <core/events.hpp>
8#include <core/overload.hpp>
9#include <memory>
10#include <optional>
11#include <replxx.hxx>
12#include <string>
13#include <thread>
14
16
25template<concepts::signal_bridge Bridge> struct processor
26{
36 processor(std::string node_id,
37 std::string mode,
38 const std::shared_ptr<Bridge> &bridge,
39 const std::shared_ptr<async::async_queue<core::events::raw_command>> &command_queue,
40 const std::shared_ptr<async::async_queue<core::events::ui_event_t>> &ui_event_queue)
41 : node_id_(std::move(node_id)), mode_(std::move(mode)), bridge_(bridge), command_queue_(command_queue),
42 ui_event_queue_(ui_event_queue), prompt_(std::string(GREEN) + "[⇌] " + RESET)
43 {}
44
46
47 processor(const processor &) = delete;
48 auto operator=(const processor &) -> processor & = delete;
49 processor(processor &&) = delete;
50 auto operator=(processor &&) -> processor & = delete;
51
57 auto run() -> void
58 {
59 setup_replxx();
60
61 print_message("Radix Relay - Interactive Mode");
62 print_message("Node: " + node_id_);
63 print_message("Mode: " + mode_);
64 print_message("");
65 print_message("Type '/help' for available commands, '/quit' to exit");
66 print_message("");
67
68 running_.store(true);
69
70 while (running_.load()) {
71 // Process any pending UI events before showing the prompt
72 while (auto event = ui_event_queue_->try_pop()) { process_ui_event(*event); }
73
74 const char *input = rx_.input(prompt_);
75
76 if (input == nullptr) {
77 running_.store(false);
78 break;
79 }
80
81 std::string command(input);
82
83 command.erase(0, command.find_first_not_of(" \t\r\n"));
84 command.erase(command.find_last_not_of(" \t\r\n") + 1);
85
86 if (command.empty()) { continue; }
87
88 if (command == "/quit" or command == "/exit" or command == "/q") {
89 print_message("Goodbye!");
90 running_.store(false);
91 break;
92 }
93
94 rx_.history_add(command);
95 process_command(command);
96
97 // Poll for UI events with timeout to allow pipeline to process
98 constexpr auto max_wait_ms = 100;
99 constexpr auto poll_interval_ms = 5;
100 for (int waited = 0; waited < max_wait_ms; waited += poll_interval_ms) {
101 if (auto event = ui_event_queue_->try_pop()) {
102 // Found an event, process it and all other pending events
103 process_ui_event(*event);
104 while (auto next_event = ui_event_queue_->try_pop()) { process_ui_event(*next_event); }
105 break;
106 }
107 std::this_thread::sleep_for(std::chrono::milliseconds(poll_interval_ms));
108 }
109 }
110
111 stop();
112 save_history();
113 }
114
118 auto stop() -> void
119 {
120 if (running_.exchange(false)) {
121 if (message_poller_.joinable()) { message_poller_.join(); }
122 }
123 }
124
130 [[nodiscard]] auto get_mode() const -> const std::string & { return mode_; }
131
137 auto update_chat_context(std::string contact_name) -> void
138 {
139 active_chat_context_ = std::move(contact_name);
140 update_prompt();
141 }
142
146 auto clear_chat_context() -> void
147 {
148 active_chat_context_.reset();
149 update_prompt();
150 }
151
157 [[nodiscard]] auto get_chat_context() const -> std::optional<std::string> { return active_chat_context_; }
158
164 [[nodiscard]] auto get_prompt() const -> const std::string & { return prompt_; }
165
166private:
170 auto setup_replxx() -> void
171 {
172 prompt_ = std::string(GREEN) + "[⇌] " + RESET;
173
174 rx_.history_load(HISTORY_FILE);
175
176 rx_.set_max_history_size(MAX_HISTORY_SIZE);
177 rx_.set_max_hint_rows(MAX_HINT_ROWS);
178
179 rx_.set_completion_callback([](const std::string &input, int & /*context*/) {
180 replxx::Replxx::completions_t completions;
181
182 std::vector<std::string> commands = { "/connect",
183 "/send",
184 "/trust",
185 "/list",
186 "/publish",
187 "/unpublish",
188 "/subscribe",
189 "/help",
190 "/mode",
191 "/quit",
192 "/exit",
193 "/status" };
194
195 std::copy_if(commands.begin(), commands.end(), std::back_inserter(completions), [&input](const std::string &cmd) {
196 return cmd.starts_with(input);
197 });
198
199 return completions;
200 });
201 }
202
208 auto print_message(const std::string &message) -> void
209 {
210 auto msg = message;
211 if (not msg.empty() and msg.back() == '\n') { msg.pop_back(); }
212 auto formatted = std::string(GREEN) + msg + RESET + "\n";
213 rx_.write(formatted.c_str(), static_cast<int>(formatted.size()));
214 }
215
221 auto process_command(const std::string &input) -> void
222 {
223 constexpr auto mode_cmd = "/mode ";
224 if (input.starts_with(mode_cmd)) {
225 const auto new_mode = input.substr(std::string_view(mode_cmd).length());
226 if (new_mode == "internet" or new_mode == "mesh" or new_mode == "hybrid") {
227 mode_ = std::string(new_mode);
228 print_message("Switched to " + std::string(new_mode) + " mode");
229 } else {
230 print_message("Invalid mode. Use: internet, mesh, or hybrid");
231 }
232 return;
233 }
234
235 command_queue_->push(core::events::raw_command{ .input = std::string(input) });
236 }
237
241 auto save_history() -> void { rx_.history_save(HISTORY_FILE); }
242
246 auto update_prompt() -> void
247 {
248 if (active_chat_context_.has_value()) {
249 prompt_ = std::string(GREEN) + "[⇌ @" + active_chat_context_.value() + "] " + RESET;
250 } else {
251 prompt_ = std::string(GREEN) + "[⇌] " + RESET;
252 }
253 }
254
260 auto process_ui_event(const core::events::ui_event_t &event) -> void
261 {
262 std::visit(core::overload{ [this](const core::events::display_message &evt) { print_message(evt.message); },
263 [this](const core::events::enter_chat_mode &evt) { update_chat_context(evt.display_name); },
264 [this](const core::events::exit_chat_mode &) { clear_chat_context(); } },
265 event);
266 }
267
268 static constexpr const char *GREEN = "\033[32m";
269 static constexpr const char *RESET = "\033[0m";
270 static constexpr const char *HISTORY_FILE = ".radix_relay_history";
271 static constexpr auto message_poll_interval_ms = 16;
272 static constexpr auto MAX_HISTORY_SIZE = 1000;
273 static constexpr auto MAX_HINT_ROWS = 3;
274
275 std::string node_id_;
276 std::string mode_;
277 std::shared_ptr<Bridge> bridge_;
278 std::shared_ptr<async::async_queue<core::events::raw_command>> command_queue_;
279 std::shared_ptr<async::async_queue<core::events::ui_event_t>> ui_event_queue_;
280
281 std::atomic<bool> running_{ false };
282 std::thread message_poller_;
283 replxx::Replxx rx_;
284 std::string prompt_;
285 std::optional<std::string> active_chat_context_;
286};
287
288}// namespace radix_relay::tui
Thread-safe asynchronous queue for message passing between coroutines.
std::variant< display_message, enter_chat_mode, exit_chat_mode > ui_event_t
UI events: unified event stream for UI layers (replaces separate display + control queues)
Definition events.hpp:413
Text-based user interface processor with REPL.
Definition processor.hpp:26
auto get_mode() const -> const std::string &
Returns the current network mode.
auto get_chat_context() const -> std::optional< std::string >
Returns the current chat context.
processor(std::string node_id, std::string mode, const std::shared_ptr< Bridge > &bridge, const std::shared_ptr< async::async_queue< core::events::raw_command > > &command_queue, const std::shared_ptr< async::async_queue< core::events::ui_event_t > > &ui_event_queue)
Constructs a TUI processor.
Definition processor.hpp:36
processor(processor &&)=delete
processor(const processor &)=delete
auto operator=(processor &&) -> processor &=delete
auto run() -> void
Runs the interactive REPL loop.
Definition processor.hpp:57
auto stop() -> void
Stops the TUI processor and message polling thread.
auto get_prompt() const -> const std::string &
Returns the current prompt string.
auto clear_chat_context() -> void
Clears the active chat context, returning to default mode.
auto operator=(const processor &) -> processor &=delete
auto update_chat_context(std::string contact_name) -> void
Updates the active chat context with a contact name.