diff --git a/docs/releases.md b/docs/releases.md index 83e46cb4..4ef99022 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -1169,6 +1169,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release **Features:** * [ntfy CLI](subscribe/cli.md) (`ntfy publish` and `ntfy subscribe` only) can now be installed via Homebrew (thanks to [@Moulick](https://github.com/Moulick)) +* Added `v1/stats` endpoint to expose messages stats (no ticket) **Bug fixes + maintenance:** diff --git a/server/server.go b/server/server.go index e545e94d..ebbe147e 100644 --- a/server/server.go +++ b/server/server.go @@ -573,13 +573,15 @@ func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) // handleStats returns the publicly available server stats func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error { s.mu.RLock() - n := len(s.messagesHistory) - rate := float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds()) - response := &apiStatsResponse{ - Messages: s.messages, - MessagesRate: rate, + messages, n, rate := s.messages, len(s.messagesHistory), float64(0) + if n > 1 { + rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds()) } s.mu.RUnlock() + response := &apiStatsResponse{ + Messages: messages, + MessagesRate: rate, + } return s.writeJSON(w, response) } @@ -1847,3 +1849,17 @@ func (s *Server) writeJSON(w http.ResponseWriter, v any) error { } return nil } + +func (s *Server) updateAndWriteStats(messagesCount int64) { + s.mu.Lock() + s.messagesHistory = append(s.messagesHistory, messagesCount) + if len(s.messagesHistory) > messagesHistoryMax { + s.messagesHistory = s.messagesHistory[1:] + } + s.mu.Unlock() + go func() { + if err := s.messageCache.UpdateStats(messagesCount); err != nil { + log.Tag(tagManager).Err(err).Warn("Cannot write messages stats") + } + }() +} diff --git a/server/server_manager.go b/server/server_manager.go index 445f830f..52e3621e 100644 --- a/server/server_manager.go +++ b/server/server_manager.go @@ -76,6 +76,11 @@ func (s *Server) execManager() { s.mu.RLock() messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors) s.mu.RUnlock() + + // Update stats + s.updateAndWriteStats(messagesCount) + + // Log stats log. Tag(tagManager). Fields(log.Context{ @@ -98,19 +103,6 @@ func (s *Server) execManager() { mset(metricUsers, usersCount) mset(metricSubscribers, subscribers) mset(metricTopics, topicsCount) - - // Write stats - s.mu.Lock() - s.messagesHistory = append(s.messagesHistory, messagesCount) - if len(s.messagesHistory) > messagesHistoryMax { - s.messagesHistory = s.messagesHistory[1:] - } - s.mu.Unlock() - go func() { - if err := s.messageCache.UpdateStats(messagesCount); err != nil { - log.Tag(tagManager).Err(err).Warn("Cannot write messages stats") - } - }() } func (s *Server) pruneVisitors() { diff --git a/server/server_test.go b/server/server_test.go index 943fc3a8..ceee187e 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -2399,6 +2399,76 @@ func TestServer_SubscriberRateLimiting_ProtectedTopics_WithDefaultReadWrite(t *t require.Nil(t, s.topics["announcements"].rateVisitor) } +func TestServer_MessageHistoryAndStatsEndpoint(t *testing.T) { + c := newTestConfig(t) + c.ManagerInterval = 2 * time.Second + s := newTestServer(t, c) + + // Publish some messages, and get stats + for i := 0; i < 5; i++ { + response := request(t, s, "POST", "/mytopic", "some message", nil) + require.Equal(t, 200, response.Code) + } + require.Equal(t, int64(5), s.messages) + require.Equal(t, []int64{0}, s.messagesHistory) + + response := request(t, s, "GET", "/v1/stats", "", nil) + require.Equal(t, 200, response.Code) + require.Equal(t, `{"messages":5,"messages_rate":0}`+"\n", response.Body.String()) + + // Run manager and see message history update + s.execManager() + require.Equal(t, []int64{0, 5}, s.messagesHistory) + + response = request(t, s, "GET", "/v1/stats", "", nil) + require.Equal(t, 200, response.Code) + require.Equal(t, `{"messages":5,"messages_rate":2.5}`+"\n", response.Body.String()) // 5 messages in 2 seconds = 2.5 messages per second + + // Publish some more messages + for i := 0; i < 10; i++ { + response := request(t, s, "POST", "/mytopic", "some message", nil) + require.Equal(t, 200, response.Code) + } + require.Equal(t, int64(15), s.messages) + require.Equal(t, []int64{0, 5}, s.messagesHistory) + + response = request(t, s, "GET", "/v1/stats", "", nil) + require.Equal(t, 200, response.Code) + require.Equal(t, `{"messages":15,"messages_rate":2.5}`+"\n", response.Body.String()) // Rate did not update yet + + // Run manager and see message history update + s.execManager() + require.Equal(t, []int64{0, 5, 15}, s.messagesHistory) + + response = request(t, s, "GET", "/v1/stats", "", nil) + require.Equal(t, 200, response.Code) + require.Equal(t, `{"messages":15,"messages_rate":3.75}`+"\n", response.Body.String()) // 15 messages in 4 seconds = 3.75 messages per second +} + +func TestServer_MessageHistoryMaxSize(t *testing.T) { + s := newTestServer(t, newTestConfig(t)) + for i := 0; i < 20; i++ { + s.messages = int64(i) + s.execManager() + } + require.Equal(t, []int64{10, 11, 12, 13, 14, 15, 16, 17, 18, 19}, s.messagesHistory) +} + +func TestServer_MessageCountPersistence(t *testing.T) { + c := newTestConfig(t) + s := newTestServer(t, c) + s.messages = 1234 + s.execManager() + waitFor(t, func() bool { + messages, err := s.messageCache.Stats() + require.Nil(t, err) + return messages == 1234 + }) + + s = newTestServer(t, c) + require.Equal(t, int64(1234), s.messages) +} + func newTestConfig(t *testing.T) *Config { conf := NewConfig() conf.BaseURL = "http://127.0.0.1:12345"