ntfy/server/cache_sqlite.go

285 lines
7.2 KiB
Go
Raw Normal View History

2021-11-03 02:09:49 +01:00
package server
import (
"database/sql"
"errors"
"fmt"
2021-11-03 02:09:49 +01:00
_ "github.com/mattn/go-sqlite3" // SQLite driver
2021-11-29 21:34:29 +01:00
"log"
"strings"
2021-11-03 02:09:49 +01:00
"time"
)
// Messages cache
2021-11-03 02:09:49 +01:00
const (
createMessagesTableQuery = `
2021-11-03 02:09:49 +01:00
BEGIN;
CREATE TABLE IF NOT EXISTS messages (
id VARCHAR(20) PRIMARY KEY,
time INT NOT NULL,
topic VARCHAR(64) NOT NULL,
message VARCHAR(512) NOT NULL,
title VARCHAR(256) NOT NULL,
priority INT NOT NULL,
tags VARCHAR(256) NOT NULL,
published INT NOT NULL
2021-11-03 02:09:49 +01:00
);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
COMMIT;
`
insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
2021-12-11 04:57:01 +01:00
pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
2021-11-03 02:09:49 +01:00
selectMessagesSinceTimeQuery = `
SELECT id, time, topic, message, title, priority, tags
FROM messages
WHERE topic = ? AND time >= ? AND published = 1
ORDER BY time ASC
`
selectMessagesSinceTimeIncludeScheduledQuery = `
SELECT id, time, topic, message, title, priority, tags
2021-11-03 02:09:49 +01:00
FROM messages
WHERE topic = ? AND time >= ?
ORDER BY time ASC
`
selectMessagesDueQuery = `
SELECT id, time, topic, message, title, priority, tags
FROM messages
WHERE time <= ? AND published = 0
`
updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?`
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
2021-11-29 17:48:34 +01:00
selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
2021-12-09 04:57:31 +01:00
selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
)
// Schema management queries
const (
currentSchemaVersion = 2
createSchemaVersionTableQuery = `
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
version INT NOT NULL
);
`
insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
// 0 -> 1
migrate0To1AlterMessagesTableQuery = `
BEGIN;
ALTER TABLE messages ADD COLUMN title VARCHAR(256) NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
COMMIT;
`
// 1 -> 2
migrate1To2AlterMessagesTableQuery = `
ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
`
2021-11-03 02:09:49 +01:00
)
type sqliteCache struct {
db *sql.DB
}
var _ cache = (*sqliteCache)(nil)
func newSqliteCache(filename string) (*sqliteCache, error) {
db, err := sql.Open("sqlite3", filename)
if err != nil {
return nil, err
}
if err := setupDB(db); err != nil {
2021-11-03 02:09:49 +01:00
return nil, err
}
return &sqliteCache{
2021-11-03 16:33:34 +01:00
db: db,
2021-11-03 02:09:49 +01:00
}, nil
}
func (c *sqliteCache) AddMessage(m *message) error {
2021-12-07 17:45:15 +01:00
if m.Event != messageEvent {
return errUnexpectedMessageType
}
published := m.Time <= time.Now().Unix()
_, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), published)
2021-11-03 02:09:49 +01:00
return err
}
func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
2021-12-07 17:45:15 +01:00
if since.IsNone() {
return make([]*message, 0), nil
}
var rows *sql.Rows
var err error
if scheduled {
rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
} else {
rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
}
2021-11-03 02:09:49 +01:00
if err != nil {
return nil, err
}
return readMessages(rows)
}
func (c *sqliteCache) MessagesDue() ([]*message, error) {
rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
if err != nil {
2021-11-03 02:09:49 +01:00
return nil, err
}
return readMessages(rows)
}
func (c *sqliteCache) MarkPublished(m *message) error {
_, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
return err
2021-11-03 02:09:49 +01:00
}
func (c *sqliteCache) MessageCount(topic string) (int, error) {
rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
2021-11-03 02:09:49 +01:00
if err != nil {
return 0, err
}
defer rows.Close()
var count int
if !rows.Next() {
return 0, errors.New("no rows found")
}
if err := rows.Scan(&count); err != nil {
return 0, err
} else if err := rows.Err(); err != nil {
return 0, err
}
return count, nil
}
2021-12-07 17:45:15 +01:00
func (c *sqliteCache) Topics() (map[string]*topic, error) {
rows, err := c.db.Query(selectTopicsQuery)
2021-11-03 02:09:49 +01:00
if err != nil {
return nil, err
}
defer rows.Close()
2021-12-07 17:45:15 +01:00
topics := make(map[string]*topic)
2021-11-03 02:09:49 +01:00
for rows.Next() {
var id string
2021-12-09 04:57:31 +01:00
if err := rows.Scan(&id); err != nil {
2021-11-03 02:09:49 +01:00
return nil, err
}
2021-12-09 04:57:31 +01:00
topics[id] = newTopic(id)
2021-11-03 02:09:49 +01:00
}
if err := rows.Err(); err != nil {
return nil, err
}
return topics, nil
}
2021-12-09 04:57:31 +01:00
func (c *sqliteCache) Prune(olderThan time.Time) error {
_, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
2021-11-03 02:09:49 +01:00
return err
}
func readMessages(rows *sql.Rows) ([]*message, error) {
defer rows.Close()
messages := make([]*message, 0)
for rows.Next() {
var timestamp int64
var priority int
var id, topic, msg, title, tagsStr string
if err := rows.Scan(&id, &timestamp, &topic, &msg, &title, &priority, &tagsStr); err != nil {
return nil, err
}
var tags []string
if tagsStr != "" {
tags = strings.Split(tagsStr, ",")
}
messages = append(messages, &message{
ID: id,
Time: timestamp,
Event: messageEvent,
Topic: topic,
Message: msg,
Title: title,
Priority: priority,
Tags: tags,
})
}
if err := rows.Err(); err != nil {
return nil, err
}
return messages, nil
}
func setupDB(db *sql.DB) error {
// If 'messages' table does not exist, this must be a new database
rowsMC, err := db.Query(selectMessagesCountQuery)
if err != nil {
return setupNewDB(db)
}
2021-12-11 02:28:56 +01:00
rowsMC.Close()
// If 'messages' table exists, check 'schemaVersion' table
schemaVersion := 0
rowsSV, err := db.Query(selectSchemaVersionQuery)
if err == nil {
defer rowsSV.Close()
if !rowsSV.Next() {
return errors.New("cannot determine schema version: cache file may be corrupt")
}
if err := rowsSV.Scan(&schemaVersion); err != nil {
return err
}
2021-12-11 02:28:56 +01:00
rowsSV.Close()
}
// Do migrations
if schemaVersion == currentSchemaVersion {
return nil
} else if schemaVersion == 0 {
return migrateFrom0(db)
} else if schemaVersion == 1 {
return migrateFrom1(db)
}
return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
}
func setupNewDB(db *sql.DB) error {
if _, err := db.Exec(createMessagesTableQuery); err != nil {
return err
}
if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
return err
}
if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
return err
}
return nil
}
func migrateFrom0(db *sql.DB) error {
2021-11-29 21:34:29 +01:00
log.Print("Migrating cache database schema: from 0 to 1")
if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
return err
}
if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
return err
}
if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
return err
}
return migrateFrom1(db)
}
func migrateFrom1(db *sql.DB) error {
log.Print("Migrating cache database schema: from 1 to 2")
if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
return err
}
if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
return err
}
return nil // Update this when a new version is added
}