Radix Relay
Hybrid mesh communications with Signal Protocol encryption
Loading...
Searching...
No Matches
processor.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <algorithm>
5#include <atomic>
7#include <core/events.hpp>
8#include <memory>
9#include <replxx.hxx>
10#include <string>
11#include <thread>
12
14
23template<concepts::signal_bridge Bridge> struct processor
24{
34 processor(std::string node_id,
35 std::string mode,
36 const std::shared_ptr<Bridge> &bridge,
37 const std::shared_ptr<async::async_queue<core::events::raw_command>> &command_queue,
38 const std::shared_ptr<async::async_queue<core::events::display_message>> &display_queue)
39 : node_id_(std::move(node_id)), mode_(std::move(mode)), bridge_(bridge), command_queue_(command_queue),
40 display_queue_(display_queue)
41 {}
42
44
45 processor(const processor &) = delete;
46 auto operator=(const processor &) -> processor & = delete;
47 processor(processor &&) = delete;
48 auto operator=(processor &&) -> processor & = delete;
49
55 auto run() -> void
56 {
57 setup_replxx();
58
59 print_message("Radix Relay - Interactive Mode");
60 print_message("Node: " + node_id_);
61 print_message("Mode: " + mode_);
62 print_message("");
63 print_message("Type '/help' for available commands, '/quit' to exit");
64 print_message("");
65
66 running_.store(true);
67
68 message_poller_ = std::thread([this] {
69 while (running_.load()) {
70 while (auto msg = display_queue_->try_pop()) { print_message(msg->message); }
71 std::this_thread::sleep_for(std::chrono::milliseconds(message_poll_interval_ms));
72 }
73 });
74
75 while (running_.load()) {
76 const char *input = rx_.input(prompt_);
77
78 if (input == nullptr) {
79 running_.store(false);
80 break;
81 }
82
83 std::string command(input);
84
85 command.erase(0, command.find_first_not_of(" \t\r\n"));
86 command.erase(command.find_last_not_of(" \t\r\n") + 1);
87
88 if (command.empty()) { continue; }
89
90 if (command == "/quit" or command == "/exit" or command == "/q") {
91 print_message("Goodbye!");
92 running_.store(false);
93 break;
94 }
95
96 rx_.history_add(command);
97 process_command(command);
98 }
99
100 stop();
101 save_history();
102 }
103
107 auto stop() -> void
108 {
109 if (running_.exchange(false)) {
110 if (message_poller_.joinable()) { message_poller_.join(); }
111 }
112 }
113
119 [[nodiscard]] auto get_mode() const -> const std::string & { return mode_; }
120
121private:
125 auto setup_replxx() -> void
126 {
127 prompt_ = std::string(GREEN) + "[⇌] " + RESET;
128
129 rx_.history_load(HISTORY_FILE);
130
131 rx_.set_max_history_size(MAX_HISTORY_SIZE);
132 rx_.set_max_hint_rows(MAX_HINT_ROWS);
133
134 rx_.set_completion_callback([](const std::string &input, int & /*context*/) {
135 replxx::Replxx::completions_t completions;
136
137 std::vector<std::string> commands = { "/connect",
138 "/send",
139 "/trust",
140 "/list",
141 "/publish",
142 "/unpublish",
143 "/subscribe",
144 "/help",
145 "/mode",
146 "/quit",
147 "/exit",
148 "/status" };
149
150 std::copy_if(commands.begin(), commands.end(), std::back_inserter(completions), [&input](const std::string &cmd) {
151 return cmd.starts_with(input);
152 });
153
154 return completions;
155 });
156 }
157
163 auto print_message(const std::string &message) -> void
164 {
165 auto msg = message;
166 if (not msg.empty() and msg.back() == '\n') { msg.pop_back(); }
167 auto formatted = std::string(GREEN) + msg + RESET + "\n";
168 rx_.write(formatted.c_str(), static_cast<int>(formatted.size()));
169 }
170
176 auto process_command(const std::string &input) -> void
177 {
178 constexpr auto mode_cmd = "/mode ";
179 if (input.starts_with(mode_cmd)) {
180 const auto new_mode = input.substr(std::string_view(mode_cmd).length());
181 if (new_mode == "internet" or new_mode == "mesh" or new_mode == "hybrid") {
182 mode_ = std::string(new_mode);
183 print_message("Switched to " + std::string(new_mode) + " mode");
184 } else {
185 print_message("Invalid mode. Use: internet, mesh, or hybrid");
186 }
187 return;
188 }
189
190 command_queue_->push(core::events::raw_command{ .input = std::string(input) });
191 }
192
196 auto save_history() -> void { rx_.history_save(HISTORY_FILE); }
197
198 static constexpr const char *GREEN = "\033[32m";
199 static constexpr const char *RESET = "\033[0m";
200 static constexpr const char *HISTORY_FILE = ".radix_relay_history";
201 static constexpr auto message_poll_interval_ms = 16;
202 static constexpr auto MAX_HISTORY_SIZE = 1000;
203 static constexpr auto MAX_HINT_ROWS = 3;
204
205 std::string node_id_;
206 std::string mode_;
207 std::shared_ptr<Bridge> bridge_;
208 std::shared_ptr<async::async_queue<core::events::raw_command>> command_queue_;
209 std::shared_ptr<async::async_queue<core::events::display_message>> display_queue_;
210
211 std::atomic<bool> running_{ false };
212 std::thread message_poller_;
213 replxx::Replxx rx_;
214 std::string prompt_;
215};
216
217}// namespace radix_relay::tui
Thread-safe asynchronous queue for message passing between coroutines.
Text-based user interface processor with REPL.
Definition processor.hpp:24
auto get_mode() const -> const std::string &
Returns the current network mode.
processor(processor &&)=delete
processor(const processor &)=delete
auto operator=(processor &&) -> processor &=delete
processor(std::string node_id, std::string mode, const std::shared_ptr< Bridge > &bridge, const std::shared_ptr< async::async_queue< core::events::raw_command > > &command_queue, const std::shared_ptr< async::async_queue< core::events::display_message > > &display_queue)
Constructs a TUI processor.
Definition processor.hpp:34
auto run() -> void
Runs the interactive REPL loop.
Definition processor.hpp:55
auto stop() -> void
Stops the TUI processor and message polling thread.
auto operator=(const processor &) -> processor &=delete