From bed60b71ffab41840a18824bd6f5002721253d2f Mon Sep 17 00:00:00 2001 From: binwiederhier Date: Sun, 12 Feb 2023 21:05:24 -0500 Subject: [PATCH] Tester feedback --- server/server.go | 147 ------------------------------ server/server_account.go | 1 + server/server_account_test.go | 21 ++++- server/server_manager.go | 163 ++++++++++++++++++++++++++++++++++ server/visitor.go | 7 +- 5 files changed, 187 insertions(+), 152 deletions(-) create mode 100644 server/server_manager.go diff --git a/server/server.go b/server/server.go index 288b1388..41eccecb 100644 --- a/server/server.go +++ b/server/server.go @@ -1300,153 +1300,6 @@ func (s *Server) topicFromID(id string) (*topic, error) { return topics[0], nil } -func (s *Server) execManager() { - // WARNING: Make sure to only selectively lock with the mutex, and be aware that this - // there is no mutex for the entire function. - - // Expire visitors from rate visitors map - staleVisitors := 0 - log. - Tag(tagManager). - Timing(func() { - s.mu.Lock() - defer s.mu.Unlock() - for ip, v := range s.visitors { - if v.Stale() { - log.Tag(tagManager).With(v).Trace("Deleting stale visitor") - delete(s.visitors, ip) - staleVisitors++ - } - } - }). - Field("stale_visitors", staleVisitors). - Debug("Deleted %d stale visitor(s)", staleVisitors) - - // Delete expired user tokens and users - if s.userManager != nil { - log. - Tag(tagManager). - Timing(func() { - if err := s.userManager.RemoveExpiredTokens(); err != nil { - log.Tag(tagManager).Err(err).Warn("Error expiring user tokens") - } - if err := s.userManager.RemoveDeletedUsers(); err != nil { - log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users") - } - }). - Debug("Removed expired tokens and users") - } - - // Delete expired attachments - if s.fileCache != nil { - log. - Tag(tagManager). - Timing(func() { - ids, err := s.messageCache.AttachmentsExpired() - if err != nil { - log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments") - } else if len(ids) > 0 { - if log.Tag(tagManager).IsDebug() { - log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", ")) - } - if err := s.fileCache.Remove(ids...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error deleting attachments") - } - if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") - } - } else { - log.Tag(tagManager).Debug("No expired attachments to delete") - } - }). - Debug("Deleted expired attachments") - } - - // Prune messages - log. - Tag(tagManager). - Timing(func() { - expiredMessageIDs, err := s.messageCache.MessagesExpired() - if err != nil { - log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages") - } else if len(expiredMessageIDs) > 0 { - if err := s.fileCache.Remove(expiredMessageIDs...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages") - } - if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") - } - } else { - log.Tag(tagManager).Debug("No expired messages to delete") - } - }). - Debug("Pruned messages") - - // Message count per topic - var messagesCached int - messageCounts, err := s.messageCache.MessageCounts() - if err != nil { - log.Tag(tagManager).Err(err).Warn("Cannot get message counts") - messageCounts = make(map[string]int) // Empty, so we can continue - } - for _, count := range messageCounts { - messagesCached += count - } - - // Remove subscriptions without subscribers - var emptyTopics, subscribers int - log. - Tag(tagManager). - Timing(func() { - s.mu.Lock() - defer s.mu.Unlock() - for _, t := range s.topics { - subs := t.SubscribersCount() - log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs) - msgs, exists := messageCounts[t.ID] - if subs == 0 && (!exists || msgs == 0) { - log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID) - emptyTopics++ - delete(s.topics, t.ID) - continue - } - subscribers += subs - } - }). - Debug("Removed %d empty topic(s)", emptyTopics) - - // Mail stats - var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64 - if s.smtpServerBackend != nil { - receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts() - } - var sentMailTotal, sentMailSuccess, sentMailFailure int64 - if s.smtpSender != nil { - sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts() - } - - // Print stats - s.mu.Lock() - messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors) - s.mu.Unlock() - log. - Tag(tagManager). - Fields(log.Context{ - "messages_published": messagesCount, - "messages_cached": messagesCached, - "topics_active": topicsCount, - "subscribers": subscribers, - "visitors": visitorsCount, - "emails_received": receivedMailTotal, - "emails_received_success": receivedMailSuccess, - "emails_received_failure": receivedMailFailure, - "emails_sent": sentMailTotal, - "emails_sent_success": sentMailSuccess, - "emails_sent_failure": sentMailFailure, - }). - Info("Server stats") -} - func (s *Server) runSMTPServer() error { s.smtpServerBackend = newMailBackend(s.config, s.handle) s.smtpServer = smtp.NewServer(s.smtpServerBackend) diff --git a/server/server_account.go b/server/server_account.go index d7dbd982..aff9f1b1 100644 --- a/server/server_account.go +++ b/server/server_account.go @@ -479,6 +479,7 @@ func (s *Server) handleAccountReservationDelete(w http.ResponseWriter, r *http.R if err := s.messageCache.ExpireMessages(topic); err != nil { return err } + s.pruneMessages() } return s.writeJSON(w, newSuccessResponse()) } diff --git a/server/server_account_test.go b/server/server_account_test.go index cc4f4cd5..0290303a 100644 --- a/server/server_account_test.go +++ b/server/server_account_test.go @@ -669,8 +669,8 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) { require.Equal(t, 200, rr.Code) // Verify that messages and attachments were deleted + // This does not explicitly call the manager! time.Sleep(time.Second) - s.execManager() ms, err := s.messageCache.Messages("mytopic1", sinceAllMessages, false) require.Nil(t, err) @@ -804,10 +804,27 @@ func TestAccount_Persist_UserStats_After_Tier_Change(t *testing.T) { "Authorization": util.BasicAuth("phil", "phil"), }) require.Equal(t, 200, rr.Code) + account, _ := util.UnmarshalJSON[apiAccountResponse](io.NopCloser(rr.Body)) + require.Equal(t, int64(1), account.Stats.Messages) // Is not reset! + + // Publish another message + rr = request(t, s, "POST", "/mytopic", "hi", map[string]string{ + "Authorization": util.BasicAuth("phil", "phil"), + }) + require.Equal(t, 200, rr.Code) // Verify that message stats were persisted time.Sleep(300 * time.Millisecond) u, err = s.userManager.User("phil") require.Nil(t, err) - require.Equal(t, int64(0), u.Stats.Messages) // v.EnqueueUserStats had run! + require.Equal(t, int64(2), u.Stats.Messages) // v.EnqueueUserStats had run! + + // Stats keep counting + rr = request(t, s, "GET", "/v1/account", "", map[string]string{ + "Authorization": util.BasicAuth("phil", "phil"), + }) + require.Equal(t, 200, rr.Code) + account, _ = util.UnmarshalJSON[apiAccountResponse](io.NopCloser(rr.Body)) + require.Equal(t, int64(2), account.Stats.Messages) // Is not reset! + } diff --git a/server/server_manager.go b/server/server_manager.go new file mode 100644 index 00000000..2b80ae18 --- /dev/null +++ b/server/server_manager.go @@ -0,0 +1,163 @@ +package server + +import ( + "heckel.io/ntfy/log" + "strings" +) + +func (s *Server) execManager() { + // WARNING: Make sure to only selectively lock with the mutex, and be aware that this + // there is no mutex for the entire function. + + // Prune all the things + s.pruneVisitors() + s.pruneTokens() + s.pruneAttachments() + s.pruneMessages() + + // Message count per topic + var messagesCached int + messageCounts, err := s.messageCache.MessageCounts() + if err != nil { + log.Tag(tagManager).Err(err).Warn("Cannot get message counts") + messageCounts = make(map[string]int) // Empty, so we can continue + } + for _, count := range messageCounts { + messagesCached += count + } + + // Remove subscriptions without subscribers + var emptyTopics, subscribers int + log. + Tag(tagManager). + Timing(func() { + s.mu.Lock() + defer s.mu.Unlock() + for _, t := range s.topics { + subs := t.SubscribersCount() + log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs) + msgs, exists := messageCounts[t.ID] + if subs == 0 && (!exists || msgs == 0) { + log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID) + emptyTopics++ + delete(s.topics, t.ID) + continue + } + subscribers += subs + } + }). + Debug("Removed %d empty topic(s)", emptyTopics) + + // Mail stats + var receivedMailTotal, receivedMailSuccess, receivedMailFailure int64 + if s.smtpServerBackend != nil { + receivedMailTotal, receivedMailSuccess, receivedMailFailure = s.smtpServerBackend.Counts() + } + var sentMailTotal, sentMailSuccess, sentMailFailure int64 + if s.smtpSender != nil { + sentMailTotal, sentMailSuccess, sentMailFailure = s.smtpSender.Counts() + } + + // Print stats + s.mu.Lock() + messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors) + s.mu.Unlock() + log. + Tag(tagManager). + Fields(log.Context{ + "messages_published": messagesCount, + "messages_cached": messagesCached, + "topics_active": topicsCount, + "subscribers": subscribers, + "visitors": visitorsCount, + "emails_received": receivedMailTotal, + "emails_received_success": receivedMailSuccess, + "emails_received_failure": receivedMailFailure, + "emails_sent": sentMailTotal, + "emails_sent_success": sentMailSuccess, + "emails_sent_failure": sentMailFailure, + }). + Info("Server stats") +} + +func (s *Server) pruneVisitors() { + staleVisitors := 0 + log. + Tag(tagManager). + Timing(func() { + s.mu.Lock() + defer s.mu.Unlock() + for ip, v := range s.visitors { + if v.Stale() { + log.Tag(tagManager).With(v).Trace("Deleting stale visitor") + delete(s.visitors, ip) + staleVisitors++ + } + } + }). + Field("stale_visitors", staleVisitors). + Debug("Deleted %d stale visitor(s)", staleVisitors) +} + +func (s *Server) pruneTokens() { + if s.userManager != nil { + log. + Tag(tagManager). + Timing(func() { + if err := s.userManager.RemoveExpiredTokens(); err != nil { + log.Tag(tagManager).Err(err).Warn("Error expiring user tokens") + } + if err := s.userManager.RemoveDeletedUsers(); err != nil { + log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users") + } + }). + Debug("Removed expired tokens and users") + } +} + +func (s *Server) pruneAttachments() { + if s.fileCache != nil { + log. + Tag(tagManager). + Timing(func() { + ids, err := s.messageCache.AttachmentsExpired() + if err != nil { + log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments") + } else if len(ids) > 0 { + if log.Tag(tagManager).IsDebug() { + log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", ")) + } + if err := s.fileCache.Remove(ids...); err != nil { + log.Tag(tagManager).Err(err).Warn("Error deleting attachments") + } + if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil { + log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") + } + } else { + log.Tag(tagManager).Debug("No expired attachments to delete") + } + }). + Debug("Deleted expired attachments") + } +} + +func (s *Server) pruneMessages() { + log. + Tag(tagManager). + Timing(func() { + expiredMessageIDs, err := s.messageCache.MessagesExpired() + if err != nil { + log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages") + } else if len(expiredMessageIDs) > 0 { + if err := s.fileCache.Remove(expiredMessageIDs...); err != nil { + log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages") + } + if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil { + log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") + } + } else { + log.Tag(tagManager).Debug("No expired messages to delete") + } + }). + Debug("Pruned messages") +} diff --git a/server/visitor.go b/server/visitor.go index 0ef06c75..04bd8222 100644 --- a/server/visitor.go +++ b/server/visitor.go @@ -159,8 +159,9 @@ func (v *visitor) contextNoLock() log.Context { fields["user_id"] = v.user.ID fields["user_name"] = v.user.Name if v.user.Tier != nil { - fields["tier_id"] = v.user.Tier.ID - fields["tier_name"] = v.user.Tier.Name + for field, value := range v.user.Tier.Context() { + fields[field] = value + } } if v.user.Billing.StripeCustomerID != "" { fields["stripe_customer_id"] = v.user.Billing.StripeCustomerID @@ -331,7 +332,7 @@ func (v *visitor) SetUser(u *user.User) { shouldResetLimiters := v.user.TierID() != u.TierID() // TierID works with nil receiver v.user = u if shouldResetLimiters { - v.resetLimitersNoLock(0, 0, true) + v.resetLimitersNoLock(u.Stats.Messages, u.Stats.Emails, true) } }