diff --git a/server/message_cache.go b/server/message_cache.go index 1d7302af..140271fe 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -270,7 +270,7 @@ func newSqliteCache(filename, startupQueries string, cacheDuration time.Duration if err != nil { return nil, err } - if err := setupDB(db, startupQueries, cacheDuration); err != nil { + if err := setupMessagesDB(db, startupQueries, cacheDuration); err != nil { return nil, err } var queue *util.BatchingQueue[*message] @@ -749,7 +749,7 @@ func (c *messageCache) Close() error { return c.db.Close() } -func setupDB(db *sql.DB, startupQueries string, cacheDuration time.Duration) error { +func setupMessagesDB(db *sql.DB, startupQueries string, cacheDuration time.Duration) error { // Run startup queries if startupQueries != "" { if _, err := db.Exec(startupQueries); err != nil { diff --git a/server/server.go b/server/server.go index 72e9c19a..ebb8b70e 100644 --- a/server/server.go +++ b/server/server.go @@ -55,7 +55,7 @@ type Server struct { messagesHistory []int64 // Last n values of the messages counter, used to determine rate userManager *user.Manager // Might be nil! messageCache *messageCache // Database that stores the messages - webPushSubscriptionStore *webPushSubscriptionStore // Database that stores web push subscriptions + webPushSubscriptionStore *webPushStore // Database that stores web push subscriptions fileCache *fileCache // File system based cache that stores attachments stripe stripeAPI // Stripe API, can be replaced with a mock priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!) @@ -227,12 +227,12 @@ func createMessageCache(conf *Config) (*messageCache, error) { return newMemCache() } -func createWebPushSubscriptionStore(conf *Config) (*webPushSubscriptionStore, error) { +func createWebPushSubscriptionStore(conf *Config) (*webPushStore, error) { if !conf.WebPushEnabled { return nil, nil } - return newWebPushSubscriptionStore(conf.WebPushSubscriptionsFile) + return newWebPushStore(conf.WebPushSubscriptionsFile) } // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts @@ -979,18 +979,12 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) { func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) { subscriptions, err := s.webPushSubscriptionStore.GetSubscriptionsForTopic(m.Topic) - if err != nil { logvm(v, m).Err(err).Warn("Unable to publish web push messages") return } - - totalCount := len(subscriptions) - - wg := &sync.WaitGroup{} - wg.Add(totalCount) - - ctx := log.Context{"topic": m.Topic, "message_id": m.ID, "total_count": totalCount} + + ctx := log.Context{"topic": m.Topic, "message_id": m.ID, "total_count": len(subscriptions)} // Importing the emojis in the service worker would add unnecessary complexity, // simply do it here for web push notifications instead @@ -1017,7 +1011,6 @@ func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) { for i, xi := range subscriptions { go func(i int, sub webPushSubscription) { - defer wg.Done() ctx := log.Context{"endpoint": sub.BrowserSubscription.Endpoint, "username": sub.Username, "topic": m.Topic, "message_id": m.ID} payload := &webPushPayload{ diff --git a/server/web_push.go b/server/web_push.go index fe9f5149..2fafb2a8 100644 --- a/server/web_push.go +++ b/server/web_push.go @@ -6,11 +6,10 @@ import ( _ "github.com/mattn/go-sqlite3" // SQLite driver ) -// Messages cache const ( createWebPushSubscriptionsTableQuery = ` BEGIN; - CREATE TABLE IF NOT EXISTS web_push_subscriptions ( + CREATE TABLE IF NOT EXISTS subscriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT NOT NULL, username TEXT, @@ -19,60 +18,58 @@ const ( key_p256dh TEXT NOT NULL, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); - CREATE INDEX IF NOT EXISTS idx_topic ON web_push_subscriptions (topic); - CREATE INDEX IF NOT EXISTS idx_endpoint ON web_push_subscriptions (endpoint); - CREATE UNIQUE INDEX IF NOT EXISTS idx_topic_endpoint ON web_push_subscriptions (topic, endpoint); + CREATE INDEX IF NOT EXISTS idx_topic ON subscriptions (topic); + CREATE INDEX IF NOT EXISTS idx_endpoint ON subscriptions (endpoint); + CREATE UNIQUE INDEX IF NOT EXISTS idx_topic_endpoint ON subscriptions (topic, endpoint); COMMIT; ` insertWebPushSubscriptionQuery = ` - INSERT OR REPLACE INTO web_push_subscriptions (topic, username, endpoint, key_auth, key_p256dh) - VALUES (?, ?, ?, ?, ?); + INSERT OR REPLACE INTO subscriptions (topic, username, endpoint, key_auth, key_p256dh) + VALUES (?, ?, ?, ?, ?) ` - deleteWebPushSubscriptionByEndpointQuery = `DELETE FROM web_push_subscriptions WHERE endpoint = ?` - deleteWebPushSubscriptionByUsernameQuery = `DELETE FROM web_push_subscriptions WHERE username = ?` - deleteWebPushSubscriptionByTopicAndEndpointQuery = `DELETE FROM web_push_subscriptions WHERE topic = ? AND endpoint = ?` + deleteWebPushSubscriptionByEndpointQuery = `DELETE FROM subscriptions WHERE endpoint = ?` + deleteWebPushSubscriptionByUsernameQuery = `DELETE FROM subscriptions WHERE username = ?` + deleteWebPushSubscriptionByTopicAndEndpointQuery = `DELETE FROM subscriptions WHERE topic = ? AND endpoint = ?` - selectWebPushSubscriptionsForTopicQuery = `SELECT endpoint, key_auth, key_p256dh, username FROM web_push_subscriptions WHERE topic = ?` + selectWebPushSubscriptionsForTopicQuery = `SELECT endpoint, key_auth, key_p256dh, username FROM subscriptions WHERE topic = ?` - selectWebPushSubscriptionsCountQuery = `SELECT COUNT(*) FROM web_push_subscriptions` + selectWebPushSubscriptionsCountQuery = `SELECT COUNT(*) FROM subscriptions` ) -type webPushSubscriptionStore struct { +type webPushStore struct { db *sql.DB } -func newWebPushSubscriptionStore(filename string) (*webPushSubscriptionStore, error) { +func newWebPushStore(filename string) (*webPushStore, error) { db, err := sql.Open("sqlite3", filename) if err != nil { return nil, err } - if err := setupSubscriptionDb(db); err != nil { + if err := setupSubscriptionsDB(db); err != nil { return nil, err } - webPushSubscriptionStore := &webPushSubscriptionStore{ + return &webPushStore{ db: db, - } - return webPushSubscriptionStore, nil + }, nil } -func setupSubscriptionDb(db *sql.DB) error { - // If 'messages' table does not exist, this must be a new database +func setupSubscriptionsDB(db *sql.DB) error { + // If 'subscriptions' table does not exist, this must be a new database rowsMC, err := db.Query(selectWebPushSubscriptionsCountQuery) if err != nil { - return setupNewSubscriptionDb(db) + return setupNewSubscriptionsDB(db) } - rowsMC.Close() - return nil + return rowsMC.Close() } -func setupNewSubscriptionDb(db *sql.DB) error { +func setupNewSubscriptionsDB(db *sql.DB) error { if _, err := db.Exec(createWebPushSubscriptionsTableQuery); err != nil { return err } return nil } -func (c *webPushSubscriptionStore) AddSubscription(topic string, username string, subscription webPushSubscribePayload) error { +func (c *webPushStore) AddSubscription(topic string, username string, subscription webPushSubscribePayload) error { _, err := c.db.Exec( insertWebPushSubscriptionQuery, topic, @@ -84,7 +81,7 @@ func (c *webPushSubscriptionStore) AddSubscription(topic string, username string return err } -func (c *webPushSubscriptionStore) RemoveSubscription(topic string, endpoint string) error { +func (c *webPushStore) RemoveSubscription(topic string, endpoint string) error { _, err := c.db.Exec( deleteWebPushSubscriptionByTopicAndEndpointQuery, topic, @@ -93,14 +90,14 @@ func (c *webPushSubscriptionStore) RemoveSubscription(topic string, endpoint str return err } -func (c *webPushSubscriptionStore) GetSubscriptionsForTopic(topic string) (subscriptions []webPushSubscription, err error) { +func (c *webPushStore) GetSubscriptionsForTopic(topic string) (subscriptions []webPushSubscription, err error) { rows, err := c.db.Query(selectWebPushSubscriptionsForTopicQuery, topic) if err != nil { return nil, err } defer rows.Close() - data := []webPushSubscription{} + var data []webPushSubscription for rows.Next() { i := webPushSubscription{} err = rows.Scan(&i.BrowserSubscription.Endpoint, &i.BrowserSubscription.Keys.Auth, &i.BrowserSubscription.Keys.P256dh, &i.Username) @@ -112,7 +109,7 @@ func (c *webPushSubscriptionStore) GetSubscriptionsForTopic(topic string) (subsc return data, nil } -func (c *webPushSubscriptionStore) ExpireWebPushEndpoint(endpoint string) error { +func (c *webPushStore) ExpireWebPushEndpoint(endpoint string) error { _, err := c.db.Exec( deleteWebPushSubscriptionByEndpointQuery, endpoint, @@ -120,13 +117,13 @@ func (c *webPushSubscriptionStore) ExpireWebPushEndpoint(endpoint string) error return err } -func (c *webPushSubscriptionStore) ExpireWebPushForUser(username string) error { +func (c *webPushStore) ExpireWebPushForUser(username string) error { _, err := c.db.Exec( deleteWebPushSubscriptionByUsernameQuery, username, ) return err } -func (c *webPushSubscriptionStore) Close() error { +func (c *webPushStore) Close() error { return c.db.Close() }