1use rusqlite::Connection;
8use std::sync::{Arc, Mutex};
9use thiserror::Error;
10
11#[derive(Error, Debug)]
12pub enum MessageHistoryError {
13 #[error("Database error: {0}")]
14 Database(#[from] rusqlite::Error),
15
16 #[error("Conversation not found: {0}")]
17 ConversationNotFound(String),
18
19 #[error("Message not found: {0}")]
20 MessageNotFound(i64),
21
22 #[error("UTF-8 conversion error: {0}")]
23 Utf8Error(#[from] std::string::FromUtf8Error),
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28#[repr(u8)]
29pub enum MessageDirection {
30 Incoming = 0,
31 Outgoing = 1,
32}
33
34impl From<i64> for MessageDirection {
35 fn from(value: i64) -> Self {
36 match value {
37 0 => MessageDirection::Incoming,
38 1 => MessageDirection::Outgoing,
39 _ => MessageDirection::Incoming,
40 }
41 }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46#[repr(u8)]
47pub enum MessageType {
48 Text = 0,
49 BundleAnnouncement = 1,
50 System = 2,
51}
52
53impl From<i64> for MessageType {
54 fn from(value: i64) -> Self {
55 match value {
56 0 => MessageType::Text,
57 1 => MessageType::BundleAnnouncement,
58 2 => MessageType::System,
59 _ => MessageType::Text,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66#[repr(u8)]
67pub enum DeliveryStatus {
68 Pending = 0,
69 Sent = 1,
70 Delivered = 2,
71 Failed = 3,
72}
73
74impl From<i64> for DeliveryStatus {
75 fn from(value: i64) -> Self {
76 match value {
77 0 => DeliveryStatus::Pending,
78 1 => DeliveryStatus::Sent,
79 2 => DeliveryStatus::Delivered,
80 3 => DeliveryStatus::Failed,
81 _ => DeliveryStatus::Pending,
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct StoredMessage {
89 pub id: i64,
90 pub conversation_id: i64,
91 pub direction: MessageDirection,
92 pub timestamp: u64,
93 pub message_type: MessageType,
94 pub content: String,
95 pub delivery_status: DeliveryStatus,
96 pub was_prekey_message: bool,
97 pub session_established: bool,
98}
99
100#[derive(Debug, Clone)]
102pub struct Conversation {
103 pub id: i64,
104 pub rdx_fingerprint: String,
105 pub last_message_timestamp: u64,
106 pub unread_count: u32,
107 pub archived: bool,
108}
109
110pub struct MessageHistory {
112 connection: Arc<Mutex<Connection>>,
113}
114
115impl MessageHistory {
116 pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
117 Self { connection }
118 }
119
120 fn get_or_create_conversation(
122 &self,
123 rdx_fingerprint: &str,
124 ) -> Result<i64, MessageHistoryError> {
125 let conn = self.connection.lock().unwrap();
126
127 let result: Result<i64, rusqlite::Error> = conn.query_row(
128 "SELECT id FROM conversations WHERE rdx_fingerprint = ?1",
129 [rdx_fingerprint],
130 |row| row.get(0),
131 );
132
133 match result {
134 Ok(id) => Ok(id),
135 Err(rusqlite::Error::QueryReturnedNoRows) => {
136 let nostr_pubkey: Option<String> = conn
137 .query_row(
138 "SELECT nostr_pubkey FROM contacts WHERE rdx_fingerprint = ?1",
139 [rdx_fingerprint],
140 |row| row.get(0),
141 )
142 .ok();
143
144 conn.execute(
145 "INSERT INTO conversations (rdx_fingerprint, nostr_pubkey, last_message_timestamp)
146 VALUES (?1, ?2, 0)",
147 rusqlite::params![rdx_fingerprint, nostr_pubkey],
148 )?;
149
150 Ok(conn.last_insert_rowid())
151 }
152 Err(e) => Err(e.into()),
153 }
154 }
155
156 fn update_conversation(
158 &self,
159 conversation_id: i64,
160 timestamp: u64,
161 increment_unread: bool,
162 ) -> Result<(), MessageHistoryError> {
163 let conn = self.connection.lock().unwrap();
164
165 if increment_unread {
166 conn.execute(
167 "UPDATE conversations
168 SET last_message_timestamp = ?1, unread_count = unread_count + 1
169 WHERE id = ?2",
170 rusqlite::params![timestamp as i64, conversation_id],
171 )?;
172 } else {
173 conn.execute(
174 "UPDATE conversations
175 SET last_message_timestamp = ?1
176 WHERE id = ?2",
177 rusqlite::params![timestamp as i64, conversation_id],
178 )?;
179 }
180
181 Ok(())
182 }
183
184 pub fn store_incoming_message(
186 &self,
187 rdx_fingerprint: &str,
188 timestamp: u64,
189 plaintext: &[u8],
190 was_prekey_message: bool,
191 session_established: bool,
192 ) -> Result<i64, MessageHistoryError> {
193 let conversation_id = self.get_or_create_conversation(rdx_fingerprint)?;
194
195 let conn = self.connection.lock().unwrap();
196 conn.execute(
197 "INSERT INTO messages
198 (conversation_id, direction, timestamp, message_type, content,
199 was_prekey_message, session_established)
200 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
201 rusqlite::params![
202 conversation_id,
203 MessageDirection::Incoming as i64,
204 timestamp as i64,
205 MessageType::Text as i64,
206 plaintext,
207 was_prekey_message,
208 session_established,
209 ],
210 )?;
211
212 let message_id = conn.last_insert_rowid();
213 drop(conn);
214
215 self.update_conversation(conversation_id, timestamp, true)?;
216
217 Ok(message_id)
218 }
219
220 pub fn store_outgoing_message(
222 &self,
223 rdx_fingerprint: &str,
224 timestamp: u64,
225 plaintext: &[u8],
226 ) -> Result<i64, MessageHistoryError> {
227 let conversation_id = self.get_or_create_conversation(rdx_fingerprint)?;
228
229 let conn = self.connection.lock().unwrap();
230 conn.execute(
231 "INSERT INTO messages
232 (conversation_id, direction, timestamp, message_type, content, delivery_status)
233 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
234 rusqlite::params![
235 conversation_id,
236 MessageDirection::Outgoing as i64,
237 timestamp as i64,
238 MessageType::Text as i64,
239 plaintext,
240 DeliveryStatus::Pending as i64,
241 ],
242 )?;
243
244 let message_id = conn.last_insert_rowid();
245 drop(conn);
246
247 self.update_conversation(conversation_id, timestamp, false)?;
248
249 Ok(message_id)
250 }
251
252 pub fn update_delivery_status(
254 &self,
255 message_id: i64,
256 status: DeliveryStatus,
257 ) -> Result<(), MessageHistoryError> {
258 let conn = self.connection.lock().unwrap();
259 conn.execute(
260 "UPDATE messages SET delivery_status = ?1 WHERE id = ?2",
261 rusqlite::params![status as i64, message_id],
262 )?;
263 Ok(())
264 }
265
266 pub fn get_message(&self, message_id: i64) -> Result<StoredMessage, MessageHistoryError> {
268 let conn = self.connection.lock().unwrap();
269 let result = conn.query_row(
270 "SELECT id, conversation_id, direction, timestamp, message_type, content,
271 delivery_status, was_prekey_message, session_established
272 FROM messages WHERE id = ?1",
273 [message_id],
274 |row| {
275 let content_bytes: Vec<u8> = row.get(5)?;
276 Ok(StoredMessage {
277 id: row.get(0)?,
278 conversation_id: row.get(1)?,
279 direction: row.get::<_, i64>(2)?.into(),
280 timestamp: row.get::<_, i64>(3)? as u64,
281 message_type: row.get::<_, i64>(4)?.into(),
282 content: String::from_utf8(content_bytes).unwrap_or_default(),
283 delivery_status: row.get::<_, i64>(6)?.into(),
284 was_prekey_message: row.get(7)?,
285 session_established: row.get(8)?,
286 })
287 },
288 );
289
290 match result {
291 Ok(msg) => Ok(msg),
292 Err(rusqlite::Error::QueryReturnedNoRows) => {
293 Err(MessageHistoryError::MessageNotFound(message_id))
294 }
295 Err(e) => Err(e.into()),
296 }
297 }
298
299 pub fn get_conversation_messages(
301 &self,
302 rdx_fingerprint: &str,
303 limit: u32,
304 offset: u32,
305 ) -> Result<Vec<StoredMessage>, MessageHistoryError> {
306 let conn = self.connection.lock().unwrap();
307
308 let conversation_id: Result<i64, rusqlite::Error> = conn.query_row(
309 "SELECT id FROM conversations WHERE rdx_fingerprint = ?1",
310 [rdx_fingerprint],
311 |row| row.get(0),
312 );
313
314 let conversation_id = match conversation_id {
315 Ok(id) => id,
316 Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(Vec::new()),
317 Err(e) => return Err(e.into()),
318 };
319
320 let mut stmt = conn.prepare(
321 "SELECT id, conversation_id, direction, timestamp, message_type, content,
322 delivery_status, was_prekey_message, session_established
323 FROM messages
324 WHERE conversation_id = ?1
325 ORDER BY timestamp DESC
326 LIMIT ?2 OFFSET ?3",
327 )?;
328
329 let messages = stmt
330 .query_map(rusqlite::params![conversation_id, limit, offset], |row| {
331 let content_bytes: Vec<u8> = row.get(5)?;
332 Ok(StoredMessage {
333 id: row.get(0)?,
334 conversation_id: row.get(1)?,
335 direction: row.get::<_, i64>(2)?.into(),
336 timestamp: row.get::<_, i64>(3)? as u64,
337 message_type: row.get::<_, i64>(4)?.into(),
338 content: String::from_utf8(content_bytes).unwrap_or_default(),
339 delivery_status: row.get::<_, i64>(6)?.into(),
340 was_prekey_message: row.get(7)?,
341 session_established: row.get(8)?,
342 })
343 })?
344 .collect::<Result<Vec<_>, _>>()?;
345
346 Ok(messages)
347 }
348
349 pub fn get_conversations(
351 &self,
352 include_archived: bool,
353 ) -> Result<Vec<Conversation>, MessageHistoryError> {
354 let conn = self.connection.lock().unwrap();
355
356 let query = if include_archived {
357 "SELECT id, rdx_fingerprint, last_message_timestamp, unread_count, archived
358 FROM conversations
359 ORDER BY last_message_timestamp DESC"
360 } else {
361 "SELECT id, rdx_fingerprint, last_message_timestamp, unread_count, archived
362 FROM conversations
363 WHERE archived = 0
364 ORDER BY last_message_timestamp DESC"
365 };
366
367 let mut stmt = conn.prepare(query)?;
368
369 let conversations = stmt
370 .query_map([], |row| {
371 Ok(Conversation {
372 id: row.get(0)?,
373 rdx_fingerprint: row.get(1)?,
374 last_message_timestamp: row.get::<_, i64>(2)? as u64,
375 unread_count: row.get::<_, i64>(3)? as u32,
376 archived: row.get(4)?,
377 })
378 })?
379 .collect::<Result<Vec<_>, _>>()?;
380
381 Ok(conversations)
382 }
383
384 pub fn get_unread_count(&self, rdx_fingerprint: &str) -> Result<u32, MessageHistoryError> {
386 let conn = self.connection.lock().unwrap();
387
388 let result: Result<i64, rusqlite::Error> = conn.query_row(
389 "SELECT unread_count FROM conversations WHERE rdx_fingerprint = ?1",
390 [rdx_fingerprint],
391 |row| row.get(0),
392 );
393
394 match result {
395 Ok(count) => Ok(count as u32),
396 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
397 Err(e) => Err(e.into()),
398 }
399 }
400
401 pub fn mark_conversation_read(&self, rdx_fingerprint: &str) -> Result<(), MessageHistoryError> {
403 let conn = self.connection.lock().unwrap();
404 conn.execute(
405 "UPDATE conversations SET unread_count = 0 WHERE rdx_fingerprint = ?1",
406 [rdx_fingerprint],
407 )?;
408 Ok(())
409 }
410
411 pub fn delete_message(&self, message_id: i64) -> Result<(), MessageHistoryError> {
413 let conn = self.connection.lock().unwrap();
414 conn.execute("DELETE FROM messages WHERE id = ?1", [message_id])?;
415 Ok(())
416 }
417
418 pub fn delete_conversation(&self, rdx_fingerprint: &str) -> Result<(), MessageHistoryError> {
420 let conn = self.connection.lock().unwrap();
421
422 let conversation_id: Result<i64, rusqlite::Error> = conn.query_row(
423 "SELECT id FROM conversations WHERE rdx_fingerprint = ?1",
424 [rdx_fingerprint],
425 |row| row.get(0),
426 );
427
428 if let Ok(id) = conversation_id {
429 conn.execute("DELETE FROM messages WHERE conversation_id = ?1", [id])?;
430 conn.execute("DELETE FROM conversations WHERE id = ?1", [id])?;
431 }
432
433 Ok(())
434 }
435}