From b4933a5645f296c519f12a8f78bdeb7cfd3f17b9 Mon Sep 17 00:00:00 2001
From: Philipp Heckel <pheckel@datto.com>
Date: Tue, 15 Nov 2022 14:24:56 -0500
Subject: [PATCH 1/7] WIP: Batch message INSERTs

---
 server/message_cache.go     | 28 +++++++++++++++----
 server/server.go            |  6 ++--
 util/batching_queue.go      | 56 +++++++++++++++++++++++++++++++++++++
 util/batching_queue_test.go | 25 +++++++++++++++++
 4 files changed, 107 insertions(+), 8 deletions(-)
 create mode 100644 util/batching_queue.go
 create mode 100644 util/batching_queue_test.go

diff --git a/server/message_cache.go b/server/message_cache.go
index f4433399..ec710e4f 100644
--- a/server/message_cache.go
+++ b/server/message_cache.go
@@ -188,8 +188,9 @@ const (
 )
 
 type messageCache struct {
-	db  *sql.DB
-	nop bool
+	db    *sql.DB
+	queue *util.BatchingQueue[*message]
+	nop   bool
 }
 
 // newSqliteCache creates a SQLite file-backed cache
@@ -201,10 +202,21 @@ func newSqliteCache(filename, startupQueries string, nop bool) (*messageCache, e
 	if err := setupCacheDB(db, startupQueries); err != nil {
 		return nil, err
 	}
-	return &messageCache{
-		db:  db,
-		nop: nop,
-	}, nil
+	queue := util.NewBatchingQueue[*message](20, 500*time.Millisecond)
+	cache := &messageCache{
+		db:    db,
+		queue: queue,
+		nop:   nop,
+	}
+	go func() {
+		for messages := range queue.Pop() {
+			log.Debug("Adding %d messages to cache", len(messages))
+			if err := cache.addMessages(messages); err != nil {
+				log.Error("error: %s", err.Error())
+			}
+		}
+	}()
+	return cache, nil
 }
 
 // newMemCache creates an in-memory cache
@@ -232,6 +244,10 @@ func (c *messageCache) AddMessage(m *message) error {
 	return c.addMessages([]*message{m})
 }
 
+func (c *messageCache) QueueMessage(m *message) {
+	c.queue.Push(m)
+}
+
 func (c *messageCache) addMessages(ms []*message) error {
 	if c.nop {
 		return nil
diff --git a/server/server.go b/server/server.go
index ef09100d..b90b7630 100644
--- a/server/server.go
+++ b/server/server.go
@@ -491,9 +491,11 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
 		log.Debug("%s Message delayed, will process later", logMessagePrefix(v, m))
 	}
 	if cache {
-		if err := s.messageCache.AddMessage(m); err != nil {
+		log.Trace("%s Queuing for cache", logMessagePrefix(v, m))
+		s.messageCache.QueueMessage(m)
+		/*if err := s.messageCache.AddMessage(m); err != nil {
 			return nil, err
-		}
+		}*/
 	}
 	s.mu.Lock()
 	s.messages++
diff --git a/util/batching_queue.go b/util/batching_queue.go
new file mode 100644
index 00000000..78116470
--- /dev/null
+++ b/util/batching_queue.go
@@ -0,0 +1,56 @@
+package util
+
+import (
+	"sync"
+	"time"
+)
+
+type BatchingQueue[T any] struct {
+	batchSize int
+	timeout   time.Duration
+	in        []T
+	out       chan []T
+	mu        sync.Mutex
+}
+
+func NewBatchingQueue[T any](batchSize int, timeout time.Duration) *BatchingQueue[T] {
+	q := &BatchingQueue[T]{
+		batchSize: batchSize,
+		timeout:   timeout,
+		in:        make([]T, 0),
+		out:       make(chan []T),
+	}
+	ticker := time.NewTicker(timeout)
+	go func() {
+		for range ticker.C {
+			elements := q.popAll()
+			if len(elements) > 0 {
+				q.out <- elements
+			}
+		}
+	}()
+	return q
+}
+
+func (c *BatchingQueue[T]) Push(element T) {
+	c.mu.Lock()
+	c.in = append(c.in, element)
+	limitReached := len(c.in) == c.batchSize
+	c.mu.Unlock()
+	if limitReached {
+		c.out <- c.popAll()
+	}
+}
+
+func (c *BatchingQueue[T]) Pop() <-chan []T {
+	return c.out
+}
+
+func (c *BatchingQueue[T]) popAll() []T {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	elements := make([]T, len(c.in))
+	copy(elements, c.in)
+	c.in = c.in[:0]
+	return elements
+}
diff --git a/util/batching_queue_test.go b/util/batching_queue_test.go
new file mode 100644
index 00000000..46bc06b8
--- /dev/null
+++ b/util/batching_queue_test.go
@@ -0,0 +1,25 @@
+package util_test
+
+import (
+	"fmt"
+	"heckel.io/ntfy/util"
+	"math/rand"
+	"testing"
+	"time"
+)
+
+func TestConcurrentQueue_Next(t *testing.T) {
+	q := util.NewBatchingQueue[int](25, 200*time.Millisecond)
+	go func() {
+		for batch := range q.Pop() {
+			fmt.Printf("Batch of %d items\n", len(batch))
+		}
+	}()
+	for i := 0; i < 1000; i++ {
+		go func(i int) {
+			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
+			q.Push(i)
+		}(i)
+	}
+	time.Sleep(2 * time.Second)
+}

From ad860afb8b3715794d7a6d6d6bd3e25440895c12 Mon Sep 17 00:00:00 2001
From: Philipp Heckel <pheckel@datto.com>
Date: Wed, 16 Nov 2022 10:28:20 -0500
Subject: [PATCH 2/7] Polish async batching

---
 cmd/serve.go                 |  6 +++
 server/config.go             |  4 ++
 server/message_cache.go      | 80 +++++++++++++++++++++++++++---------
 server/message_cache_test.go | 10 ++---
 server/server.go             |  9 ++--
 server/server.yml            |  2 +
 util/batching_queue.go       | 73 +++++++++++++++++++++-----------
 util/batching_queue_test.go  | 43 +++++++++++++++----
 8 files changed, 166 insertions(+), 61 deletions(-)

diff --git a/cmd/serve.go b/cmd/serve.go
index aff7c7c8..ecc4d4a1 100644
--- a/cmd/serve.go
+++ b/cmd/serve.go
@@ -44,6 +44,8 @@ var flagsServe = append(
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"firebase_key_file", "F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}),
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"cache_file", "C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}),
 	altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"cache_duration", "b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: server.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}),
+	altsrc.NewIntFlag(&cli.IntFlag{Name: "cache-batch-size", Aliases: []string{"cache_batch_size"}, EnvVars: []string{"NTFY_BATCH_SIZE"}, Usage: "max size of messages to batch together when writing to message cache (if zero, writes are synchronous)"}),
+	altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-batch-timeout", Aliases: []string{"cache_batch_timeout"}, EnvVars: []string{"NTFY_CACHE_BATCH_TIMEOUT"}, Usage: "timeout for batched async writes to the message cache (if zero, writes are synchronous)"}),
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-startup-queries", Aliases: []string{"cache_startup_queries"}, EnvVars: []string{"NTFY_CACHE_STARTUP_QUERIES"}, Usage: "queries run when the cache database is initialized"}),
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-file", Aliases: []string{"auth_file", "H"}, EnvVars: []string{"NTFY_AUTH_FILE"}, Usage: "auth database file used for access control"}),
 	altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-default-access", Aliases: []string{"auth_default_access", "p"}, EnvVars: []string{"NTFY_AUTH_DEFAULT_ACCESS"}, Value: "read-write", Usage: "default permissions if no matching entries in the auth database are found"}),
@@ -110,6 +112,8 @@ func execServe(c *cli.Context) error {
 	cacheFile := c.String("cache-file")
 	cacheDuration := c.Duration("cache-duration")
 	cacheStartupQueries := c.String("cache-startup-queries")
+	cacheBatchSize := c.Int("cache-batch-size")
+	cacheBatchTimeout := c.Duration("cache-batch-timeout")
 	authFile := c.String("auth-file")
 	authDefaultAccess := c.String("auth-default-access")
 	attachmentCacheDir := c.String("attachment-cache-dir")
@@ -233,6 +237,8 @@ func execServe(c *cli.Context) error {
 	conf.CacheFile = cacheFile
 	conf.CacheDuration = cacheDuration
 	conf.CacheStartupQueries = cacheStartupQueries
+	conf.CacheBatchSize = cacheBatchSize
+	conf.CacheBatchTimeout = cacheBatchTimeout
 	conf.AuthFile = authFile
 	conf.AuthDefaultRead = authDefaultRead
 	conf.AuthDefaultWrite = authDefaultWrite
diff --git a/server/config.go b/server/config.go
index d8fd429e..1e2b517c 100644
--- a/server/config.go
+++ b/server/config.go
@@ -61,6 +61,8 @@ type Config struct {
 	CacheFile                            string
 	CacheDuration                        time.Duration
 	CacheStartupQueries                  string
+	CacheBatchSize                       int
+	CacheBatchTimeout                    time.Duration
 	AuthFile                             string
 	AuthDefaultRead                      bool
 	AuthDefaultWrite                     bool
@@ -114,6 +116,8 @@ func NewConfig() *Config {
 		FirebaseKeyFile:                      "",
 		CacheFile:                            "",
 		CacheDuration:                        DefaultCacheDuration,
+		CacheBatchSize:                       0,
+		CacheBatchTimeout:                    0,
 		AuthFile:                             "",
 		AuthDefaultRead:                      true,
 		AuthDefaultWrite:                     true,
diff --git a/server/message_cache.go b/server/message_cache.go
index ec710e4f..7eb37cf9 100644
--- a/server/message_cache.go
+++ b/server/message_cache.go
@@ -44,6 +44,7 @@ const (
 			published INT NOT NULL
 		);
 		CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
+		CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
 		CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
 		COMMIT;
 	`
@@ -92,7 +93,7 @@ const (
 
 // Schema management queries
 const (
-	currentSchemaVersion          = 8
+	currentSchemaVersion          = 9
 	createSchemaVersionTableQuery = `
 		CREATE TABLE IF NOT EXISTS schemaVersion (
 			id INT PRIMARY KEY,
@@ -185,6 +186,11 @@ const (
 	migrate7To8AlterMessagesTableQuery = `
 		ALTER TABLE messages ADD COLUMN icon TEXT NOT NULL DEFAULT('');
 	`
+
+	// 8 -> 9
+	migrate8To9AlterMessagesTableQuery = `
+		CREATE INDEX IF NOT EXISTS idx_time ON messages (time);	
+	`
 )
 
 type messageCache struct {
@@ -194,7 +200,7 @@ type messageCache struct {
 }
 
 // newSqliteCache creates a SQLite file-backed cache
-func newSqliteCache(filename, startupQueries string, nop bool) (*messageCache, error) {
+func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) {
 	db, err := sql.Open("sqlite3", filename)
 	if err != nil {
 		return nil, err
@@ -202,32 +208,28 @@ func newSqliteCache(filename, startupQueries string, nop bool) (*messageCache, e
 	if err := setupCacheDB(db, startupQueries); err != nil {
 		return nil, err
 	}
-	queue := util.NewBatchingQueue[*message](20, 500*time.Millisecond)
+	var queue *util.BatchingQueue[*message]
+	if batchSize > 0 || batchTimeout > 0 {
+		queue = util.NewBatchingQueue[*message](batchSize, batchTimeout)
+	}
 	cache := &messageCache{
 		db:    db,
 		queue: queue,
 		nop:   nop,
 	}
-	go func() {
-		for messages := range queue.Pop() {
-			log.Debug("Adding %d messages to cache", len(messages))
-			if err := cache.addMessages(messages); err != nil {
-				log.Error("error: %s", err.Error())
-			}
-		}
-	}()
+	go cache.processMessageBatches()
 	return cache, nil
 }
 
 // newMemCache creates an in-memory cache
 func newMemCache() (*messageCache, error) {
-	return newSqliteCache(createMemoryFilename(), "", false)
+	return newSqliteCache(createMemoryFilename(), "", 0, 0, false)
 }
 
 // newNopCache creates an in-memory cache that discards all messages;
 // it is always empty and can be used if caching is entirely disabled
 func newNopCache() (*messageCache, error) {
-	return newSqliteCache(createMemoryFilename(), "", true)
+	return newSqliteCache(createMemoryFilename(), "", 0, 0, true)
 }
 
 // createMemoryFilename creates a unique memory filename to use for the SQLite backend.
@@ -240,18 +242,23 @@ func createMemoryFilename() string {
 	return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
 }
 
+// AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asyncronously.
+// The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor.
 func (c *messageCache) AddMessage(m *message) error {
+	if c.queue != nil {
+		c.queue.Enqueue(m)
+		return nil
+	}
 	return c.addMessages([]*message{m})
 }
 
-func (c *messageCache) QueueMessage(m *message) {
-	c.queue.Push(m)
-}
-
+// addMessages synchronously stores a match of messages. If the database is locked, the transaction waits until
+// SQLite's busy_timeout is exceeded before erroring out.
 func (c *messageCache) addMessages(ms []*message) error {
 	if c.nop {
 		return nil
 	}
+	start := time.Now()
 	tx, err := c.db.Begin()
 	if err != nil {
 		return err
@@ -305,7 +312,12 @@ func (c *messageCache) addMessages(ms []*message) error {
 			return err
 		}
 	}
-	return tx.Commit()
+	if err := tx.Commit(); err != nil {
+		log.Warn("Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
+		return err
+	}
+	log.Debug("Cache: Wrote %d message(s) in %v", len(ms), time.Since(start))
+	return nil
 }
 
 func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
@@ -411,8 +423,12 @@ func (c *messageCache) Topics() (map[string]*topic, error) {
 }
 
 func (c *messageCache) Prune(olderThan time.Time) error {
-	_, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
-	return err
+	start := time.Now()
+	if _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix()); err != nil {
+		log.Warn("Cache: Pruning failed (after %v): %s", time.Since(start), err.Error())
+	}
+	log.Debug("Cache: Pruning successful (took %v)", time.Since(start))
+	return nil
 }
 
 func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) {
@@ -433,6 +449,17 @@ func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) {
 	return size, nil
 }
 
+func (c *messageCache) processMessageBatches() {
+	if c.queue == nil {
+		return
+	}
+	for messages := range c.queue.Dequeue() {
+		if err := c.addMessages(messages); err != nil {
+			log.Error("Cache: %s", err.Error())
+		}
+	}
+}
+
 func readMessages(rows *sql.Rows) ([]*message, error) {
 	defer rows.Close()
 	messages := make([]*message, 0)
@@ -558,6 +585,8 @@ func setupCacheDB(db *sql.DB, startupQueries string) error {
 		return migrateFrom6(db)
 	} else if schemaVersion == 7 {
 		return migrateFrom7(db)
+	} else if schemaVersion == 8 {
+		return migrateFrom8(db)
 	}
 	return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
 }
@@ -663,5 +692,16 @@ func migrateFrom7(db *sql.DB) error {
 	if _, err := db.Exec(updateSchemaVersion, 8); err != nil {
 		return err
 	}
+	return migrateFrom8(db)
+}
+
+func migrateFrom8(db *sql.DB) error {
+	log.Info("Migrating cache database schema: from 8 to 9")
+	if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil {
+		return err
+	}
+	if _, err := db.Exec(updateSchemaVersion, 9); err != nil {
+		return err
+	}
 	return nil // Update this when a new version is added
 }
diff --git a/server/message_cache_test.go b/server/message_cache_test.go
index c72debca..c3b7305e 100644
--- a/server/message_cache_test.go
+++ b/server/message_cache_test.go
@@ -450,7 +450,7 @@ func TestSqliteCache_StartupQueries_WAL(t *testing.T) {
 	startupQueries := `pragma journal_mode = WAL; 
 pragma synchronous = normal; 
 pragma temp_store = memory;`
-	db, err := newSqliteCache(filename, startupQueries, false)
+	db, err := newSqliteCache(filename, startupQueries, 0, 0, false)
 	require.Nil(t, err)
 	require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
 	require.FileExists(t, filename)
@@ -461,7 +461,7 @@ pragma temp_store = memory;`
 func TestSqliteCache_StartupQueries_None(t *testing.T) {
 	filename := newSqliteTestCacheFile(t)
 	startupQueries := ""
-	db, err := newSqliteCache(filename, startupQueries, false)
+	db, err := newSqliteCache(filename, startupQueries, 0, 0, false)
 	require.Nil(t, err)
 	require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
 	require.FileExists(t, filename)
@@ -472,7 +472,7 @@ func TestSqliteCache_StartupQueries_None(t *testing.T) {
 func TestSqliteCache_StartupQueries_Fail(t *testing.T) {
 	filename := newSqliteTestCacheFile(t)
 	startupQueries := `xx error`
-	_, err := newSqliteCache(filename, startupQueries, false)
+	_, err := newSqliteCache(filename, startupQueries, 0, 0, false)
 	require.Error(t, err)
 }
 
@@ -501,7 +501,7 @@ func TestMemCache_NopCache(t *testing.T) {
 }
 
 func newSqliteTestCache(t *testing.T) *messageCache {
-	c, err := newSqliteCache(newSqliteTestCacheFile(t), "", false)
+	c, err := newSqliteCache(newSqliteTestCacheFile(t), "", 0, 0, false)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -513,7 +513,7 @@ func newSqliteTestCacheFile(t *testing.T) string {
 }
 
 func newSqliteTestCacheFromFile(t *testing.T, filename, startupQueries string) *messageCache {
-	c, err := newSqliteCache(filename, startupQueries, false)
+	c, err := newSqliteCache(filename, startupQueries, 0, 0, false)
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/server/server.go b/server/server.go
index b90b7630..fe729b1b 100644
--- a/server/server.go
+++ b/server/server.go
@@ -159,7 +159,7 @@ func createMessageCache(conf *Config) (*messageCache, error) {
 	if conf.CacheDuration == 0 {
 		return newNopCache()
 	} else if conf.CacheFile != "" {
-		return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, false)
+		return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
 	}
 	return newMemCache()
 }
@@ -491,11 +491,10 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes
 		log.Debug("%s Message delayed, will process later", logMessagePrefix(v, m))
 	}
 	if cache {
-		log.Trace("%s Queuing for cache", logMessagePrefix(v, m))
-		s.messageCache.QueueMessage(m)
-		/*if err := s.messageCache.AddMessage(m); err != nil {
+		log.Debug("%s Adding message to cache", logMessagePrefix(v, m))
+		if err := s.messageCache.AddMessage(m); err != nil {
 			return nil, err
-		}*/
+		}
 	}
 	s.mu.Lock()
 	s.messages++
diff --git a/server/server.yml b/server/server.yml
index 9476478f..4b08129b 100644
--- a/server/server.yml
+++ b/server/server.yml
@@ -65,6 +65,8 @@
 # cache-file: <filename>
 # cache-duration: "12h"
 # cache-startup-queries:
+# cache-batch-size: 0
+# cache-batch-timeout: "0ms"
 
 # If set, access to the ntfy server and API can be controlled on a granular level using
 # the 'ntfy user' and 'ntfy access' commands. See the --help pages for details, or check the docs.
diff --git a/util/batching_queue.go b/util/batching_queue.go
index 78116470..86901bcd 100644
--- a/util/batching_queue.go
+++ b/util/batching_queue.go
@@ -5,6 +5,24 @@ import (
 	"time"
 )
 
+// BatchingQueue is a queue that creates batches of the enqueued elements based on a
+// max batch size and a batch timeout.
+//
+// Example:
+//
+//	q := NewBatchingQueue[int](2, 500 * time.Millisecond)
+//	go func() {
+//	  for batch := range q.Dequeue() {
+//	    fmt.Println(batch)
+//	  }
+//	}()
+//	q.Enqueue(1)
+//	q.Enqueue(2)
+//	q.Enqueue(3)
+//	time.Sleep(time.Second)
+//
+// This example will emit batch [1, 2] immediately (because the batch size is 2), and
+// a batch [3] after 500ms.
 type BatchingQueue[T any] struct {
 	batchSize int
 	timeout   time.Duration
@@ -13,6 +31,7 @@ type BatchingQueue[T any] struct {
 	mu        sync.Mutex
 }
 
+// NewBatchingQueue creates a new BatchingQueue
 func NewBatchingQueue[T any](batchSize int, timeout time.Duration) *BatchingQueue[T] {
 	q := &BatchingQueue[T]{
 		batchSize: batchSize,
@@ -20,37 +39,45 @@ func NewBatchingQueue[T any](batchSize int, timeout time.Duration) *BatchingQueu
 		in:        make([]T, 0),
 		out:       make(chan []T),
 	}
-	ticker := time.NewTicker(timeout)
-	go func() {
-		for range ticker.C {
-			elements := q.popAll()
-			if len(elements) > 0 {
-				q.out <- elements
-			}
-		}
-	}()
+	go q.timeoutTicker()
 	return q
 }
 
-func (c *BatchingQueue[T]) Push(element T) {
-	c.mu.Lock()
-	c.in = append(c.in, element)
-	limitReached := len(c.in) == c.batchSize
-	c.mu.Unlock()
+// Enqueue enqueues an element to the queue. If the configured batch size is reached,
+// the batch will be emitted immediately.
+func (q *BatchingQueue[T]) Enqueue(element T) {
+	q.mu.Lock()
+	q.in = append(q.in, element)
+	limitReached := len(q.in) == q.batchSize
+	q.mu.Unlock()
 	if limitReached {
-		c.out <- c.popAll()
+		q.out <- q.dequeueAll()
 	}
 }
 
-func (c *BatchingQueue[T]) Pop() <-chan []T {
-	return c.out
+// Dequeue returns a channel emitting batches of elements
+func (q *BatchingQueue[T]) Dequeue() <-chan []T {
+	return q.out
 }
 
-func (c *BatchingQueue[T]) popAll() []T {
-	c.mu.Lock()
-	defer c.mu.Unlock()
-	elements := make([]T, len(c.in))
-	copy(elements, c.in)
-	c.in = c.in[:0]
+func (q *BatchingQueue[T]) dequeueAll() []T {
+	q.mu.Lock()
+	defer q.mu.Unlock()
+	elements := make([]T, len(q.in))
+	copy(elements, q.in)
+	q.in = q.in[:0]
 	return elements
 }
+
+func (q *BatchingQueue[T]) timeoutTicker() {
+	if q.timeout == 0 {
+		return
+	}
+	ticker := time.NewTicker(q.timeout)
+	for range ticker.C {
+		elements := q.dequeueAll()
+		if len(elements) > 0 {
+			q.out <- elements
+		}
+	}
+}
diff --git a/util/batching_queue_test.go b/util/batching_queue_test.go
index 46bc06b8..28764f18 100644
--- a/util/batching_queue_test.go
+++ b/util/batching_queue_test.go
@@ -2,24 +2,51 @@ package util_test
 
 import (
 	"fmt"
+	"github.com/stretchr/testify/require"
 	"heckel.io/ntfy/util"
 	"math/rand"
 	"testing"
 	"time"
 )
 
-func TestConcurrentQueue_Next(t *testing.T) {
-	q := util.NewBatchingQueue[int](25, 200*time.Millisecond)
+func TestBatchingQueue_InfTimeout(t *testing.T) {
+	q := util.NewBatchingQueue[int](25, 1*time.Hour)
+	batches := make([][]int, 0)
+	total := 0
 	go func() {
-		for batch := range q.Pop() {
-			fmt.Printf("Batch of %d items\n", len(batch))
+		for batch := range q.Dequeue() {
+			batches = append(batches, batch)
+			total += len(batch)
 		}
 	}()
-	for i := 0; i < 1000; i++ {
+	for i := 0; i < 101; i++ {
+		go q.Enqueue(i)
+	}
+	time.Sleep(500 * time.Millisecond)
+	require.Equal(t, 100, total) // One is missing, stuck in the last batch!
+	require.Equal(t, 4, len(batches))
+}
+
+func TestBatchingQueue_WithTimeout(t *testing.T) {
+	q := util.NewBatchingQueue[int](25, 100*time.Millisecond)
+	batches := make([][]int, 0)
+	total := 0
+	go func() {
+		for batch := range q.Dequeue() {
+			batches = append(batches, batch)
+			total += len(batch)
+		}
+	}()
+	for i := 0; i < 101; i++ {
 		go func(i int) {
-			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
-			q.Push(i)
+			time.Sleep(time.Duration(rand.Intn(700)) * time.Millisecond)
+			q.Enqueue(i)
 		}(i)
 	}
-	time.Sleep(2 * time.Second)
+	time.Sleep(time.Second)
+	fmt.Println(len(batches))
+	fmt.Println(batches)
+	require.Equal(t, 101, total)
+	require.True(t, len(batches) > 4) // 101/25
+	require.True(t, len(batches) < 21)
 }

From 497f871447a9b0f8f9e802831e8a4eb8c47401a7 Mon Sep 17 00:00:00 2001
From: Philipp Heckel <pheckel@datto.com>
Date: Wed, 16 Nov 2022 10:33:12 -0500
Subject: [PATCH 3/7] Docs

---
 server/message_cache.go | 2 +-
 server/server.yml       | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/server/message_cache.go b/server/message_cache.go
index 7eb37cf9..bce94220 100644
--- a/server/message_cache.go
+++ b/server/message_cache.go
@@ -313,7 +313,7 @@ func (c *messageCache) addMessages(ms []*message) error {
 		}
 	}
 	if err := tx.Commit(); err != nil {
-		log.Warn("Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
+		log.Error("Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
 		return err
 	}
 	log.Debug("Cache: Wrote %d message(s) in %v", len(ms), time.Since(start))
diff --git a/server/server.yml b/server/server.yml
index 4b08129b..1b268995 100644
--- a/server/server.yml
+++ b/server/server.yml
@@ -53,6 +53,12 @@
 #       pragma journal_mode = WAL;
 #       pragma synchronous = normal;
 #       pragma temp_store = memory;
+#       pragma busy_timeout = 15000;
+#       vacuum;
+#
+# The "cache-batch-size" and "cache-batch-timeout" parameter allow enabling async batch writing
+# of messages. If set, messages will be queued and written to the database in batches of the given
+# size, or after the given timeout. This is only required for high volume servers.
 #
 # Debian/RPM package users:
 #   Use /var/cache/ntfy/cache.db as cache file to avoid permission issues. The package

From e147a41f928241f8497ead40d6aada293cd6255d Mon Sep 17 00:00:00 2001
From: Philipp Heckel <pheckel@datto.com>
Date: Wed, 16 Nov 2022 10:44:10 -0500
Subject: [PATCH 4/7] Fix race in tests

---
 util/batching_queue_test.go | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/util/batching_queue_test.go b/util/batching_queue_test.go
index 28764f18..f7dccfab 100644
--- a/util/batching_queue_test.go
+++ b/util/batching_queue_test.go
@@ -1,40 +1,46 @@
 package util_test
 
 import (
-	"fmt"
 	"github.com/stretchr/testify/require"
 	"heckel.io/ntfy/util"
 	"math/rand"
+	"sync"
 	"testing"
 	"time"
 )
 
 func TestBatchingQueue_InfTimeout(t *testing.T) {
 	q := util.NewBatchingQueue[int](25, 1*time.Hour)
-	batches := make([][]int, 0)
-	total := 0
+	batches, total := make([][]int, 0), 0
+	var mu sync.Mutex
 	go func() {
 		for batch := range q.Dequeue() {
+			mu.Lock()
 			batches = append(batches, batch)
 			total += len(batch)
+			mu.Unlock()
 		}
 	}()
 	for i := 0; i < 101; i++ {
 		go q.Enqueue(i)
 	}
 	time.Sleep(500 * time.Millisecond)
+	mu.Lock()
 	require.Equal(t, 100, total) // One is missing, stuck in the last batch!
 	require.Equal(t, 4, len(batches))
+	mu.Unlock()
 }
 
 func TestBatchingQueue_WithTimeout(t *testing.T) {
 	q := util.NewBatchingQueue[int](25, 100*time.Millisecond)
-	batches := make([][]int, 0)
-	total := 0
+	batches, total := make([][]int, 0), 0
+	var mu sync.Mutex
 	go func() {
 		for batch := range q.Dequeue() {
+			mu.Lock()
 			batches = append(batches, batch)
 			total += len(batch)
+			mu.Unlock()
 		}
 	}()
 	for i := 0; i < 101; i++ {
@@ -44,9 +50,9 @@ func TestBatchingQueue_WithTimeout(t *testing.T) {
 		}(i)
 	}
 	time.Sleep(time.Second)
-	fmt.Println(len(batches))
-	fmt.Println(batches)
+	mu.Lock()
 	require.Equal(t, 101, total)
 	require.True(t, len(batches) > 4) // 101/25
 	require.True(t, len(batches) < 21)
+	mu.Unlock()
 }

From db9ca80b69978e33154dc0c22b5631fe62ac8348 Mon Sep 17 00:00:00 2001
From: Philipp Heckel <pheckel@datto.com>
Date: Wed, 16 Nov 2022 11:16:07 -0500
Subject: [PATCH 5/7] Fix race condition making it possible for batches to be
 >batchSize

---
 util/batching_queue.go      | 13 ++++++++-----
 util/batching_queue_test.go |  2 +-
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/util/batching_queue.go b/util/batching_queue.go
index 86901bcd..85ba9be9 100644
--- a/util/batching_queue.go
+++ b/util/batching_queue.go
@@ -48,10 +48,13 @@ func NewBatchingQueue[T any](batchSize int, timeout time.Duration) *BatchingQueu
 func (q *BatchingQueue[T]) Enqueue(element T) {
 	q.mu.Lock()
 	q.in = append(q.in, element)
-	limitReached := len(q.in) == q.batchSize
+	var elements []T
+	if len(q.in) == q.batchSize {
+		elements = q.dequeueAll()
+	}
 	q.mu.Unlock()
-	if limitReached {
-		q.out <- q.dequeueAll()
+	if len(elements) > 0 {
+		q.out <- elements
 	}
 }
 
@@ -61,8 +64,6 @@ func (q *BatchingQueue[T]) Dequeue() <-chan []T {
 }
 
 func (q *BatchingQueue[T]) dequeueAll() []T {
-	q.mu.Lock()
-	defer q.mu.Unlock()
 	elements := make([]T, len(q.in))
 	copy(elements, q.in)
 	q.in = q.in[:0]
@@ -75,7 +76,9 @@ func (q *BatchingQueue[T]) timeoutTicker() {
 	}
 	ticker := time.NewTicker(q.timeout)
 	for range ticker.C {
+		q.mu.Lock()
 		elements := q.dequeueAll()
+		q.mu.Unlock()
 		if len(elements) > 0 {
 			q.out <- elements
 		}
diff --git a/util/batching_queue_test.go b/util/batching_queue_test.go
index f7dccfab..b3c41a4c 100644
--- a/util/batching_queue_test.go
+++ b/util/batching_queue_test.go
@@ -24,7 +24,7 @@ func TestBatchingQueue_InfTimeout(t *testing.T) {
 	for i := 0; i < 101; i++ {
 		go q.Enqueue(i)
 	}
-	time.Sleep(500 * time.Millisecond)
+	time.Sleep(time.Second)
 	mu.Lock()
 	require.Equal(t, 100, total) // One is missing, stuck in the last batch!
 	require.Equal(t, 4, len(batches))

From 4a91da60dd3373c791575025901c659d0bf19203 Mon Sep 17 00:00:00 2001
From: Philipp Heckel <pheckel@datto.com>
Date: Wed, 16 Nov 2022 11:27:46 -0500
Subject: [PATCH 6/7] Docs

---
 docs/config.md | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/docs/config.md b/docs/config.md
index 655b56cf..a127a5ac 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -825,19 +825,27 @@ out [this discussion on Reddit](https://www.reddit.com/r/golang/comments/r9u4ee/
 
 Depending on *how you run it*, here are a few limits that are relevant:
 
-### WAL for message cache
+### Message cache
 By default, the [message cache](#message-cache) (defined by `cache-file`) uses the SQLite default settings, which means it
 syncs to disk on every write. For personal servers, this is perfectly adequate. For larger installations, such as ntfy.sh,
 the [write-ahead log (WAL)](https://sqlite.org/wal.html) should be enabled, and the sync mode should be adjusted. 
 See [this article](https://phiresky.github.io/blog/2020/sqlite-performance-tuning/) for details.
 
+In addition to that, for very high load servers (such as ntfy.sh), it may be beneficial to write messages to the cache
+in batches, and asynchronously. This can be enabled with the `cache-batch-size` and `cache-batch-timeout`. If you start
+seeing `database locked` messages in the logs, you should probably enable that.
+
 Here's how ntfy.sh has been tuned in the `server.yml` file:
 
 ``` yaml
+cache-batch-size: 25
+cache-batch-timeout: "1s"
 cache-startup-queries: |
     pragma journal_mode = WAL;
     pragma synchronous = normal;
     pragma temp_store = memory;
+    pragma busy_timeout = 15000;
+    vacuum;
 ```
 
 ### For systemd services
@@ -990,6 +998,8 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`).
 | `cache-file`                               | `NTFY_CACHE_FILE`                               | *filename*                                          | -                 | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache).             |
 | `cache-duration`                           | `NTFY_CACHE_DURATION`                           | *duration*                                          | 12h               | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely.                                        |
 | `cache-startup-queries`                    | `NTFY_CACHE_STARTUP_QUERIES`                    | *string (SQL queries)*                              | -                 | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#wal-for-message-cache)                                                                                                           |
+| `cache-batch-size`                         | `NTFY_CACHE_BATCH_SIZE`                         | *int*                                               | 0                 | Max size of messages to batch together when writing to message cache (if zero, writes are synchronous)                                                                                                                          |
+| `cache-batch-timeout`                      | `NTFY_CACHE_BATCH_TIMEOUT`                      | *duration*                                          | 0s                | Timeout for batched async writes to the message cache (if zero, writes are synchronous)                                                                                                                                         |
 | `auth-file`                                | `NTFY_AUTH_FILE`                                | *filename*                                          | -                 | Auth database file used for access control. If set, enables authentication and access control. See [access control](#access-control).                                                                                           |
 | `auth-default-access`                      | `NTFY_AUTH_DEFAULT_ACCESS`                      | `read-write`, `read-only`, `write-only`, `deny-all` | `read-write`      | Default permissions if no matching entries in the auth database are found. Default is `read-write`.                                                                                                                             |
 | `behind-proxy`                             | `NTFY_BEHIND_PROXY`                             | *bool*                                              | false             | If set, the X-Forwarded-For header is used to determine the visitor IP address instead of the remote address of the connection.                                                                                                 |
@@ -1054,6 +1064,8 @@ OPTIONS:
    --behind-proxy, --behind_proxy, -P                                                                  if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting) (default: false) [$NTFY_BEHIND_PROXY]
    --cache-duration since, --cache_duration since, -b since                                            buffer messages for this time to allow since requests (default: 12h0m0s) [$NTFY_CACHE_DURATION]
    --cache-file value, --cache_file value, -C value                                                    cache file used for message caching [$NTFY_CACHE_FILE]
+   --cache-batch-size value, --cache_batch_size value                                                  max size of messages to batch together when writing to message cache (if zero, writes are synchronous) (default: 0) [$NTFY_BATCH_SIZE]
+   --cache-batch-timeout value, --cache_batch_timeout value                                            timeout for batched async writes to the message cache (if zero, writes are synchronous) (default: 0s) [$NTFY_CACHE_BATCH_TIMEOUT]   
    --cache-startup-queries value, --cache_startup_queries value                                        queries run when the cache database is initialized [$NTFY_CACHE_STARTUP_QUERIES]
    --cert-file value, --cert_file value, -E value                                                      certificate file, if listen-https is set [$NTFY_CERT_FILE]
    --config value, -c value                                                                            config file (default: /etc/ntfy/server.yml) [$NTFY_CONFIG_FILE]

From 978118a4007456b1393c68da902ab254a0764f5e Mon Sep 17 00:00:00 2001
From: Philipp Heckel <pheckel@datto.com>
Date: Wed, 16 Nov 2022 11:31:29 -0500
Subject: [PATCH 7/7] Release notes

---
 docs/releases.md | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/docs/releases.md b/docs/releases.md
index f5fc9a49..e662faec 100644
--- a/docs/releases.md
+++ b/docs/releases.md
@@ -14,6 +14,10 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
 
 ## ntfy server v1.30.0 (UNRELREASED)
 
+**Features:**
+
+* High-load servers: Allow asynchronous batch-writing of messages to cache via `cache-batch-*` options ([#498](https://github.com/binwiederhier/ntfy/issues/498)/[#502](https://github.com/binwiederhier/ntfy/pull/502))   
+
 **Documentation:**
 
 * GitHub Actions example ([#492](https://github.com/binwiederhier/ntfy/pull/492), thanks to [@ksurl](https://github.com/ksurl))
@@ -22,6 +26,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
 **Other things:**
 
 * Put ntfy.sh docs on GitHub pages to reduce AWS outbound traffic cost ([#491](https://github.com/binwiederhier/ntfy/issues/491))
+* The ntfy.sh server hardware was upgraded to a bigger box. If you'd like to help out carrying the server cost, **[sponsorships and donations](https://github.com/sponsors/binwiederhier)** 💸 would be very much appreciated
 
 ## ntfy server v1.29.0
 Released November 12, 2022