signal_bridge/
message_history.rs

1//! Message history storage and retrieval
2//!
3//! This module provides persistent storage for message history using SQLCipher-encrypted
4//! SQLite database. Messages are stored as plaintext (already decrypted by Signal Protocol)
5//! and protected by full database encryption.
6
7use rusqlite::Connection;
8use std::sync::{Arc, Mutex};
9use thiserror::Error;
10
11#[derive(Error, Debug)]
12pub enum MessageHistoryError {
13    #[error("Database error: {0}")]
14    Database(#[from] rusqlite::Error),
15
16    #[error("Conversation not found: {0}")]
17    ConversationNotFound(String),
18
19    #[error("Message not found: {0}")]
20    MessageNotFound(i64),
21
22    #[error("UTF-8 conversion error: {0}")]
23    Utf8Error(#[from] std::string::FromUtf8Error),
24}
25
26/// Message direction
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28#[repr(u8)]
29pub enum MessageDirection {
30    Incoming = 0,
31    Outgoing = 1,
32}
33
34impl From<i64> for MessageDirection {
35    fn from(value: i64) -> Self {
36        match value {
37            0 => MessageDirection::Incoming,
38            1 => MessageDirection::Outgoing,
39            _ => MessageDirection::Incoming,
40        }
41    }
42}
43
44/// Message type
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46#[repr(u8)]
47pub enum MessageType {
48    Text = 0,
49    BundleAnnouncement = 1,
50    System = 2,
51}
52
53impl From<i64> for MessageType {
54    fn from(value: i64) -> Self {
55        match value {
56            0 => MessageType::Text,
57            1 => MessageType::BundleAnnouncement,
58            2 => MessageType::System,
59            _ => MessageType::Text,
60        }
61    }
62}
63
64/// Delivery status for outgoing messages
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66#[repr(u8)]
67pub enum DeliveryStatus {
68    Pending = 0,
69    Sent = 1,
70    Delivered = 2,
71    Failed = 3,
72}
73
74impl From<i64> for DeliveryStatus {
75    fn from(value: i64) -> Self {
76        match value {
77            0 => DeliveryStatus::Pending,
78            1 => DeliveryStatus::Sent,
79            2 => DeliveryStatus::Delivered,
80            3 => DeliveryStatus::Failed,
81            _ => DeliveryStatus::Pending,
82        }
83    }
84}
85
86/// Stored message record
87#[derive(Debug, Clone)]
88pub struct StoredMessage {
89    pub id: i64,
90    pub conversation_id: i64,
91    pub direction: MessageDirection,
92    pub timestamp: u64,
93    pub message_type: MessageType,
94    pub content: String,
95    pub delivery_status: DeliveryStatus,
96    pub was_prekey_message: bool,
97    pub session_established: bool,
98}
99
100/// Conversation/thread record
101#[derive(Debug, Clone)]
102pub struct Conversation {
103    pub id: i64,
104    pub rdx_fingerprint: String,
105    pub last_message_timestamp: u64,
106    pub unread_count: u32,
107    pub archived: bool,
108}
109
110/// Message history storage and retrieval
111pub struct MessageHistory {
112    connection: Arc<Mutex<Connection>>,
113}
114
115impl MessageHistory {
116    pub fn new(connection: Arc<Mutex<Connection>>) -> Self {
117        Self { connection }
118    }
119
120    /// Get or create a conversation for a contact
121    fn get_or_create_conversation(
122        &self,
123        rdx_fingerprint: &str,
124    ) -> Result<i64, MessageHistoryError> {
125        let conn = self.connection.lock().unwrap();
126
127        let result: Result<i64, rusqlite::Error> = conn.query_row(
128            "SELECT id FROM conversations WHERE rdx_fingerprint = ?1",
129            [rdx_fingerprint],
130            |row| row.get(0),
131        );
132
133        match result {
134            Ok(id) => Ok(id),
135            Err(rusqlite::Error::QueryReturnedNoRows) => {
136                let nostr_pubkey: Option<String> = conn
137                    .query_row(
138                        "SELECT nostr_pubkey FROM contacts WHERE rdx_fingerprint = ?1",
139                        [rdx_fingerprint],
140                        |row| row.get(0),
141                    )
142                    .ok();
143
144                conn.execute(
145                    "INSERT INTO conversations (rdx_fingerprint, nostr_pubkey, last_message_timestamp)
146                     VALUES (?1, ?2, 0)",
147                    rusqlite::params![rdx_fingerprint, nostr_pubkey],
148                )?;
149
150                Ok(conn.last_insert_rowid())
151            }
152            Err(e) => Err(e.into()),
153        }
154    }
155
156    /// Update conversation timestamp and unread count
157    fn update_conversation(
158        &self,
159        conversation_id: i64,
160        timestamp: u64,
161        increment_unread: bool,
162    ) -> Result<(), MessageHistoryError> {
163        let conn = self.connection.lock().unwrap();
164
165        if increment_unread {
166            conn.execute(
167                "UPDATE conversations
168                 SET last_message_timestamp = ?1, unread_count = unread_count + 1
169                 WHERE id = ?2",
170                rusqlite::params![timestamp as i64, conversation_id],
171            )?;
172        } else {
173            conn.execute(
174                "UPDATE conversations
175                 SET last_message_timestamp = ?1
176                 WHERE id = ?2",
177                rusqlite::params![timestamp as i64, conversation_id],
178            )?;
179        }
180
181        Ok(())
182    }
183
184    /// Store an incoming message
185    pub fn store_incoming_message(
186        &self,
187        rdx_fingerprint: &str,
188        timestamp: u64,
189        plaintext: &[u8],
190        was_prekey_message: bool,
191        session_established: bool,
192    ) -> Result<i64, MessageHistoryError> {
193        let conversation_id = self.get_or_create_conversation(rdx_fingerprint)?;
194
195        let conn = self.connection.lock().unwrap();
196        conn.execute(
197            "INSERT INTO messages
198             (conversation_id, direction, timestamp, message_type, content,
199              was_prekey_message, session_established)
200             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
201            rusqlite::params![
202                conversation_id,
203                MessageDirection::Incoming as i64,
204                timestamp as i64,
205                MessageType::Text as i64,
206                plaintext,
207                was_prekey_message,
208                session_established,
209            ],
210        )?;
211
212        let message_id = conn.last_insert_rowid();
213        drop(conn);
214
215        self.update_conversation(conversation_id, timestamp, true)?;
216
217        Ok(message_id)
218    }
219
220    /// Store an outgoing message
221    pub fn store_outgoing_message(
222        &self,
223        rdx_fingerprint: &str,
224        timestamp: u64,
225        plaintext: &[u8],
226    ) -> Result<i64, MessageHistoryError> {
227        let conversation_id = self.get_or_create_conversation(rdx_fingerprint)?;
228
229        let conn = self.connection.lock().unwrap();
230        conn.execute(
231            "INSERT INTO messages
232             (conversation_id, direction, timestamp, message_type, content, delivery_status)
233             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
234            rusqlite::params![
235                conversation_id,
236                MessageDirection::Outgoing as i64,
237                timestamp as i64,
238                MessageType::Text as i64,
239                plaintext,
240                DeliveryStatus::Pending as i64,
241            ],
242        )?;
243
244        let message_id = conn.last_insert_rowid();
245        drop(conn);
246
247        self.update_conversation(conversation_id, timestamp, false)?;
248
249        Ok(message_id)
250    }
251
252    /// Update message delivery status
253    pub fn update_delivery_status(
254        &self,
255        message_id: i64,
256        status: DeliveryStatus,
257    ) -> Result<(), MessageHistoryError> {
258        let conn = self.connection.lock().unwrap();
259        conn.execute(
260            "UPDATE messages SET delivery_status = ?1 WHERE id = ?2",
261            rusqlite::params![status as i64, message_id],
262        )?;
263        Ok(())
264    }
265
266    /// Retrieve message by ID
267    pub fn get_message(&self, message_id: i64) -> Result<StoredMessage, MessageHistoryError> {
268        let conn = self.connection.lock().unwrap();
269        let result = conn.query_row(
270            "SELECT id, conversation_id, direction, timestamp, message_type, content,
271                    delivery_status, was_prekey_message, session_established
272             FROM messages WHERE id = ?1",
273            [message_id],
274            |row| {
275                let content_bytes: Vec<u8> = row.get(5)?;
276                Ok(StoredMessage {
277                    id: row.get(0)?,
278                    conversation_id: row.get(1)?,
279                    direction: row.get::<_, i64>(2)?.into(),
280                    timestamp: row.get::<_, i64>(3)? as u64,
281                    message_type: row.get::<_, i64>(4)?.into(),
282                    content: String::from_utf8(content_bytes).unwrap_or_default(),
283                    delivery_status: row.get::<_, i64>(6)?.into(),
284                    was_prekey_message: row.get(7)?,
285                    session_established: row.get(8)?,
286                })
287            },
288        );
289
290        match result {
291            Ok(msg) => Ok(msg),
292            Err(rusqlite::Error::QueryReturnedNoRows) => {
293                Err(MessageHistoryError::MessageNotFound(message_id))
294            }
295            Err(e) => Err(e.into()),
296        }
297    }
298
299    /// Get messages for a conversation (paginated, newest first)
300    pub fn get_conversation_messages(
301        &self,
302        rdx_fingerprint: &str,
303        limit: u32,
304        offset: u32,
305    ) -> Result<Vec<StoredMessage>, MessageHistoryError> {
306        let conn = self.connection.lock().unwrap();
307
308        let conversation_id: Result<i64, rusqlite::Error> = conn.query_row(
309            "SELECT id FROM conversations WHERE rdx_fingerprint = ?1",
310            [rdx_fingerprint],
311            |row| row.get(0),
312        );
313
314        let conversation_id = match conversation_id {
315            Ok(id) => id,
316            Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(Vec::new()),
317            Err(e) => return Err(e.into()),
318        };
319
320        let mut stmt = conn.prepare(
321            "SELECT id, conversation_id, direction, timestamp, message_type, content,
322                    delivery_status, was_prekey_message, session_established
323             FROM messages
324             WHERE conversation_id = ?1
325             ORDER BY timestamp DESC
326             LIMIT ?2 OFFSET ?3",
327        )?;
328
329        let messages = stmt
330            .query_map(rusqlite::params![conversation_id, limit, offset], |row| {
331                let content_bytes: Vec<u8> = row.get(5)?;
332                Ok(StoredMessage {
333                    id: row.get(0)?,
334                    conversation_id: row.get(1)?,
335                    direction: row.get::<_, i64>(2)?.into(),
336                    timestamp: row.get::<_, i64>(3)? as u64,
337                    message_type: row.get::<_, i64>(4)?.into(),
338                    content: String::from_utf8(content_bytes).unwrap_or_default(),
339                    delivery_status: row.get::<_, i64>(6)?.into(),
340                    was_prekey_message: row.get(7)?,
341                    session_established: row.get(8)?,
342                })
343            })?
344            .collect::<Result<Vec<_>, _>>()?;
345
346        Ok(messages)
347    }
348
349    /// Get all conversations ordered by recent activity
350    pub fn get_conversations(
351        &self,
352        include_archived: bool,
353    ) -> Result<Vec<Conversation>, MessageHistoryError> {
354        let conn = self.connection.lock().unwrap();
355
356        let query = if include_archived {
357            "SELECT id, rdx_fingerprint, last_message_timestamp, unread_count, archived
358             FROM conversations
359             ORDER BY last_message_timestamp DESC"
360        } else {
361            "SELECT id, rdx_fingerprint, last_message_timestamp, unread_count, archived
362             FROM conversations
363             WHERE archived = 0
364             ORDER BY last_message_timestamp DESC"
365        };
366
367        let mut stmt = conn.prepare(query)?;
368
369        let conversations = stmt
370            .query_map([], |row| {
371                Ok(Conversation {
372                    id: row.get(0)?,
373                    rdx_fingerprint: row.get(1)?,
374                    last_message_timestamp: row.get::<_, i64>(2)? as u64,
375                    unread_count: row.get::<_, i64>(3)? as u32,
376                    archived: row.get(4)?,
377                })
378            })?
379            .collect::<Result<Vec<_>, _>>()?;
380
381        Ok(conversations)
382    }
383
384    /// Get unread message count for a conversation
385    pub fn get_unread_count(&self, rdx_fingerprint: &str) -> Result<u32, MessageHistoryError> {
386        let conn = self.connection.lock().unwrap();
387
388        let result: Result<i64, rusqlite::Error> = conn.query_row(
389            "SELECT unread_count FROM conversations WHERE rdx_fingerprint = ?1",
390            [rdx_fingerprint],
391            |row| row.get(0),
392        );
393
394        match result {
395            Ok(count) => Ok(count as u32),
396            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
397            Err(e) => Err(e.into()),
398        }
399    }
400
401    /// Mark conversation as read
402    pub fn mark_conversation_read(&self, rdx_fingerprint: &str) -> Result<(), MessageHistoryError> {
403        let conn = self.connection.lock().unwrap();
404        conn.execute(
405            "UPDATE conversations SET unread_count = 0 WHERE rdx_fingerprint = ?1",
406            [rdx_fingerprint],
407        )?;
408        Ok(())
409    }
410
411    /// Delete message
412    pub fn delete_message(&self, message_id: i64) -> Result<(), MessageHistoryError> {
413        let conn = self.connection.lock().unwrap();
414        conn.execute("DELETE FROM messages WHERE id = ?1", [message_id])?;
415        Ok(())
416    }
417
418    /// Delete entire conversation
419    pub fn delete_conversation(&self, rdx_fingerprint: &str) -> Result<(), MessageHistoryError> {
420        let conn = self.connection.lock().unwrap();
421
422        let conversation_id: Result<i64, rusqlite::Error> = conn.query_row(
423            "SELECT id FROM conversations WHERE rdx_fingerprint = ?1",
424            [rdx_fingerprint],
425            |row| row.get(0),
426        );
427
428        if let Ok(id) = conversation_id {
429            conn.execute("DELETE FROM messages WHERE conversation_id = ?1", [id])?;
430            conn.execute("DELETE FROM conversations WHERE id = ?1", [id])?;
431        }
432
433        Ok(())
434    }
435}