package server import ( "database/sql" "errors" "fmt" _ "github.com/mattn/go-sqlite3" // SQLite driver "log" "strings" "time" ) // Messages cache const ( createMessagesTableQuery = ` BEGIN; CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, time INT NOT NULL, topic TEXT NOT NULL, message TEXT NOT NULL, title TEXT NOT NULL, priority INT NOT NULL, tags TEXT NOT NULL, attachment_name TEXT NOT NULL, attachment_type TEXT NOT NULL, attachment_size INT NOT NULL, attachment_expires INT NOT NULL, attachment_url TEXT NOT NULL, published INT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); COMMIT; ` insertMessageQuery = ` INSERT INTO messages (id, time, topic, message, title, priority, tags, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1` selectMessagesSinceTimeQuery = ` SELECT id, time, topic, message, title, priority, tags, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url FROM messages WHERE topic = ? AND time >= ? AND published = 1 ORDER BY time ASC ` selectMessagesSinceTimeIncludeScheduledQuery = ` SELECT id, time, topic, message, title, priority, tags, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url FROM messages WHERE topic = ? AND time >= ? ORDER BY time ASC ` selectMessagesDueQuery = ` SELECT id, time, topic, message, title, priority, tags, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url FROM messages WHERE time <= ? AND published = 0 ` updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?` selectMessagesCountQuery = `SELECT COUNT(*) FROM messages` selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?` selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` ) // Schema management queries const ( currentSchemaVersion = 3 createSchemaVersionTableQuery = ` CREATE TABLE IF NOT EXISTS schemaVersion ( id INT PRIMARY KEY, version INT NOT NULL ); ` insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)` updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1` selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1` // 0 -> 1 migrate0To1AlterMessagesTableQuery = ` BEGIN; ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0); ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT(''); COMMIT; ` // 1 -> 2 migrate1To2AlterMessagesTableQuery = ` ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1); ` // 2 -> 3 migrate2To3AlterMessagesTableQuery = ` BEGIN; ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL; ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL; ALTER TABLE messages ADD COLUMN attachment_size INT NOT NULL; ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL; ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL; COMMIT; ` ) type sqliteCache struct { db *sql.DB } var _ cache = (*sqliteCache)(nil) func newSqliteCache(filename string) (*sqliteCache, error) { db, err := sql.Open("sqlite3", filename) if err != nil { return nil, err } if err := setupDB(db); err != nil { return nil, err } return &sqliteCache{ db: db, }, nil } func (c *sqliteCache) AddMessage(m *message) error { if m.Event != messageEvent { return errUnexpectedMessageType } published := m.Time <= time.Now().Unix() tags := strings.Join(m.Tags, ",") var attachmentName, attachmentType, attachmentURL string var attachmentSize, attachmentExpires int64 if m.Attachment != nil { attachmentName = m.Attachment.Name attachmentType = m.Attachment.Type attachmentSize = m.Attachment.Size attachmentExpires = m.Attachment.Expires attachmentURL = m.Attachment.URL } _, err := c.db.Exec( insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, tags, attachmentName, attachmentType, attachmentSize, attachmentExpires, attachmentURL, published, ) return err } func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) { if since.IsNone() { return make([]*message, 0), nil } var rows *sql.Rows var err error if scheduled { rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix()) } else { rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix()) } if err != nil { return nil, err } return readMessages(rows) } func (c *sqliteCache) MessagesDue() ([]*message, error) { rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix()) if err != nil { return nil, err } return readMessages(rows) } func (c *sqliteCache) MarkPublished(m *message) error { _, err := c.db.Exec(updateMessagePublishedQuery, m.ID) return err } func (c *sqliteCache) MessageCount(topic string) (int, error) { rows, err := c.db.Query(selectMessageCountForTopicQuery, topic) if err != nil { return 0, err } defer rows.Close() var count int if !rows.Next() { return 0, errors.New("no rows found") } if err := rows.Scan(&count); err != nil { return 0, err } else if err := rows.Err(); err != nil { return 0, err } return count, nil } func (c *sqliteCache) Topics() (map[string]*topic, error) { rows, err := c.db.Query(selectTopicsQuery) if err != nil { return nil, err } defer rows.Close() topics := make(map[string]*topic) for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return nil, err } topics[id] = newTopic(id) } if err := rows.Err(); err != nil { return nil, err } return topics, nil } func (c *sqliteCache) Prune(olderThan time.Time) error { _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix()) return err } func readMessages(rows *sql.Rows) ([]*message, error) { defer rows.Close() messages := make([]*message, 0) for rows.Next() { var timestamp, attachmentSize, attachmentExpires int64 var priority int var id, topic, msg, title, tagsStr, attachmentName, attachmentType, attachmentURL string if err := rows.Scan(&id, ×tamp, &topic, &msg, &title, &priority, &tagsStr, &attachmentName, &attachmentType, &attachmentSize, &attachmentExpires, &attachmentURL); err != nil { return nil, err } var tags []string if tagsStr != "" { tags = strings.Split(tagsStr, ",") } var att *attachment if attachmentName != "" && attachmentURL != "" { att = &attachment{ Name: attachmentName, Type: attachmentType, Size: attachmentSize, Expires: attachmentExpires, URL: attachmentURL, } } messages = append(messages, &message{ ID: id, Time: timestamp, Event: messageEvent, Topic: topic, Message: msg, Title: title, Priority: priority, Tags: tags, Attachment: att, }) } if err := rows.Err(); err != nil { return nil, err } return messages, nil } func setupDB(db *sql.DB) error { // If 'messages' table does not exist, this must be a new database rowsMC, err := db.Query(selectMessagesCountQuery) if err != nil { return setupNewDB(db) } rowsMC.Close() // If 'messages' table exists, check 'schemaVersion' table schemaVersion := 0 rowsSV, err := db.Query(selectSchemaVersionQuery) if err == nil { defer rowsSV.Close() if !rowsSV.Next() { return errors.New("cannot determine schema version: cache file may be corrupt") } if err := rowsSV.Scan(&schemaVersion); err != nil { return err } rowsSV.Close() } // Do migrations if schemaVersion == currentSchemaVersion { return nil } else if schemaVersion == 0 { return migrateFrom0(db) } else if schemaVersion == 1 { return migrateFrom1(db) } else if schemaVersion == 2 { return migrateFrom2(db) } return fmt.Errorf("unexpected schema version found: %d", schemaVersion) } func setupNewDB(db *sql.DB) error { if _, err := db.Exec(createMessagesTableQuery); err != nil { return err } if _, err := db.Exec(createSchemaVersionTableQuery); err != nil { return err } if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil { return err } return nil } func migrateFrom0(db *sql.DB) error { log.Print("Migrating cache database schema: from 0 to 1") if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil { return err } if _, err := db.Exec(createSchemaVersionTableQuery); err != nil { return err } if _, err := db.Exec(insertSchemaVersion, 1); err != nil { return err } return migrateFrom1(db) } func migrateFrom1(db *sql.DB) error { log.Print("Migrating cache database schema: from 1 to 2") if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil { return err } if _, err := db.Exec(updateSchemaVersion, 2); err != nil { return err } return migrateFrom2(db) } func migrateFrom2(db *sql.DB) error { log.Print("Migrating cache database schema: from 2 to 3") if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil { return err } if _, err := db.Exec(updateSchemaVersion, 3); err != nil { return err } return nil // Update this when a new version is added }