diff --git a/server/log.go b/server/log.go index 0148be72..93dc0834 100644 --- a/server/log.go +++ b/server/log.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/emersion/go-smtp" "github.com/gorilla/websocket" - "golang.org/x/time/rate" "heckel.io/ntfy/log" "heckel.io/ntfy/util" "net/http" @@ -24,9 +23,7 @@ func logv(v *visitor) *log.Event { // logr creates a new log event with HTTP request and visitor fields func logvr(v *visitor, r *http.Request) *log.Event { - return logv(v). - Fields(httpContext(r)). - Fields(requestLimiterFields(v.RequestLimiter())) + return logv(v).Fields(httpContext(r)) } // logvrm creates a new log event with HTTP request, visitor fields and message fields @@ -73,13 +70,6 @@ func websocketErrorContext(err error) log.Context { } } -func requestLimiterFields(limiter *rate.Limiter) map[string]any { - return map[string]any{ - "visitor_request_limiter_limit": limiter.Limit(), - "visitor_request_limiter_tokens": limiter.Tokens(), - } -} - func renderHTTPRequest(r *http.Request) string { peekLimit := 4096 lines := fmt.Sprintf("%s %s %s\n", r.Method, r.URL.RequestURI(), r.Proto) diff --git a/server/server.go b/server/server.go index aadc637a..81b4b78b 100644 --- a/server/server.go +++ b/server/server.go @@ -37,7 +37,6 @@ import ( - HIGH Rate limiting: Sensitive endpoints (account/login/change-password/...) - HIGH Account limit creation triggers when account is taken! - HIGH Docs -- HIGH make request limit independent of message limit again - HIGH Self-review - MEDIUM: Test for expiring messages after reservation removal - MEDIUM: Test new token endpoints & never-expiring token @@ -138,6 +137,7 @@ const ( // Log tags const ( + tagStartup = "startup" tagPublish = "publish" tagFirebase = "firebase" tagEmail = "email" // Send email @@ -233,7 +233,7 @@ func (s *Server) Run() error { if s.config.SMTPServerListen != "" { listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen) } - log.Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String()) + log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String()) if log.IsFile() { fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.Version) fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File()) @@ -347,15 +347,15 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) { if !ok { httpErr = errHTTPInternalError } - isNormalError := httpErr.HTTPCode == http.StatusNotFound || httpErr.HTTPCode == http.StatusBadRequest + isNormalError := httpErr.HTTPCode == http.StatusNotFound || httpErr.HTTPCode == http.StatusBadRequest || httpErr.HTTPCode == http.StatusTooManyRequests if isNormalError { logvr(v, r). Err(httpErr). - Debug("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error()) + Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code) } else { logvr(v, r). Err(httpErr). - Info("Connection closed with HTTP %d (ntfy error %d): %s", httpErr.HTTPCode, httpErr.Code, err.Error()) + Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code) } w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests @@ -1294,21 +1294,21 @@ func (s *Server) execManager() { staleVisitors := 0 for ip, v := range s.visitors { if v.Stale() { - log.Trace("Deleting stale visitor %s", v.ip) + log.Tag(tagManager).With(v).Trace("Deleting stale visitor") delete(s.visitors, ip) staleVisitors++ } } s.mu.Unlock() - log.Debug("Manager: Deleted %d stale visitor(s)", staleVisitors) + log.Tag(tagManager).Field("stale_visitors", staleVisitors).Debug("Deleted %d stale visitor(s)", staleVisitors) // Delete expired user tokens and users if s.userManager != nil { if err := s.userManager.RemoveExpiredTokens(); err != nil { - log.Warn("Error expiring user tokens: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Error expiring user tokens") } if err := s.userManager.RemoveDeletedUsers(); err != nil { - log.Warn("Error deleting soft-deleted users: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users") } } @@ -1316,47 +1316,47 @@ func (s *Server) execManager() { if s.fileCache != nil { ids, err := s.messageCache.AttachmentsExpired() if err != nil { - log.Warn("Manager: Error retrieving expired attachments: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments") } else if len(ids) > 0 { - if log.IsDebug() { - log.Debug("Manager: Deleting attachments %s", strings.Join(ids, ", ")) + if log.Tag(tagManager).IsDebug() { + log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", ")) } if err := s.fileCache.Remove(ids...); err != nil { - log.Warn("Manager: Error deleting attachments: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Error deleting attachments") } if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil { - log.Warn("Manager: Error marking attachments deleted: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") } } else { - log.Debug("Manager: No expired attachments to delete") + log.Tag(tagManager).Debug("No expired attachments to delete") } } // Prune messages - log.Debug("Manager: Pruning messages") + log.Tag(tagManager).Debug("Manager: Pruning messages") expiredMessageIDs, err := s.messageCache.MessagesExpired() if err != nil { - log.Warn("Manager: Error retrieving expired messages: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages") } else if len(expiredMessageIDs) > 0 { if err := s.fileCache.Remove(expiredMessageIDs...); err != nil { - log.Warn("Manager: Error deleting attachments for expired messages: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages") } if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil { - log.Warn("Manager: Error marking attachments deleted: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") } } else { - log.Debug("Manager: No expired messages to delete") + log.Tag(tagManager).Debug("No expired messages to delete") } // Message count per topic - var messages int + var messagesCached int messageCounts, err := s.messageCache.MessageCounts() if err != nil { - log.Warn("Manager: Cannot get message counts: %s", err.Error()) + log.Tag(tagManager).Err(err).Warn("Cannot get message counts") messageCounts = make(map[string]int) // Empty, so we can continue } for _, count := range messageCounts { - messages += count + messagesCached += count } // Remove subscriptions without subscribers @@ -1364,10 +1364,10 @@ func (s *Server) execManager() { var subscribers int for _, t := range s.topics { subs := t.SubscribersCount() - log.Trace("- topic %s: %d subscribers", t.ID, subs) + log.Tag(tagManager).Trace("- topic %s: %d subscribers", t.ID, subs) msgs, exists := messageCounts[t.ID] if subs == 0 && (!exists || msgs == 0) { - log.Trace("Deleting empty topic %s", t.ID) + log.Tag(tagManager).Trace("Deleting empty topic %s", t.ID) delete(s.topics, t.ID) continue } @@ -1389,10 +1389,22 @@ func (s *Server) execManager() { s.mu.Lock() messagesCount, topicsCount, visitorsCount := s.messages, len(s.topics), len(s.visitors) s.mu.Unlock() - log.Info("Stats: %d messages published, %d in cache, %d topic(s) active, %d subscriber(s), %d visitor(s), %d mails received (%d successful, %d failed), %d mails sent (%d successful, %d failed)", - messagesCount, messages, topicsCount, subscribers, visitorsCount, - receivedMailTotal, receivedMailSuccess, receivedMailFailure, - sentMailTotal, sentMailSuccess, sentMailFailure) + log. + Tag(tagManager). + Fields(log.Context{ + "messages_published": messagesCount, + "messages_cached": messagesCached, + "topics_active": topicsCount, + "subscribers": subscribers, + "visitors": visitorsCount, + "emails_received": receivedMailTotal, + "emails_received_success": receivedMailSuccess, + "emails_received_failure": receivedMailFailure, + "emails_sent": sentMailTotal, + "emails_sent_success": sentMailSuccess, + "emails_sent_failure": sentMailFailure, + }). + Info("Server stats") } func (s *Server) runSMTPServer() error { @@ -1534,7 +1546,7 @@ func (s *Server) limitRequests(next handleFunc) handleFunc { if util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) { return next(w, r, v) } else if err := v.RequestAllowed(); err != nil { - logvr(v, r).Err(err).Fields(requestLimiterFields(v.RequestLimiter())).Trace("Request not allowed by rate limiter") + logvr(v, r).Err(err).Trace("Request not allowed by rate limiter") return errHTTPTooManyRequestsLimitRequests } return next(w, r, v) diff --git a/server/server_account.go b/server/server_account.go index 924a6096..48f5a0f6 100644 --- a/server/server_account.go +++ b/server/server_account.go @@ -42,11 +42,12 @@ func (s *Server) handleAccountCreate(w http.ResponseWriter, r *http.Request, v * return s.writeJSON(w, newSuccessResponse()) } -func (s *Server) handleAccountGet(w http.ResponseWriter, _ *http.Request, v *visitor) error { +func (s *Server) handleAccountGet(w http.ResponseWriter, r *http.Request, v *visitor) error { info, err := v.Info() if err != nil { return err } + logvr(v, r).Tag(tagAccount).Fields(visitorExtendedInfoContext(info)).Debug("Retrieving account stats") limits, stats := info.Limits, info.Stats response := &apiAccountResponse{ Limits: &apiAccountLimits{ diff --git a/server/visitor.go b/server/visitor.go index 1a945c4a..34c598a6 100644 --- a/server/visitor.go +++ b/server/visitor.go @@ -142,8 +142,17 @@ func (v *visitor) Context() log.Context { } func (v *visitor) contextNoLock() log.Context { + info := v.infoLightNoLock() fields := log.Context{ - "visitor_ip": v.ip.String(), + "visitor_ip": v.ip.String(), + "visitor_messages": info.Stats.Messages, + "visitor_messages_limit": info.Limits.MessageLimit, + "visitor_messages_remaining": info.Stats.MessagesRemaining, + "visitor_emails": info.Stats.Emails, + "visitor_emails_limit": info.Limits.EmailLimit, + "visitor_emails_remaining": info.Stats.EmailsRemaining, + "visitor_request_limiter_limit": v.requestLimiter.Limit(), + "visitor_request_limiter_tokens": v.requestLimiter.Tokens(), } if v.user != nil { fields["user_id"] = v.user.ID @@ -162,6 +171,17 @@ func (v *visitor) contextNoLock() log.Context { return fields } +func visitorExtendedInfoContext(info *visitorInfo) log.Context { + return log.Context{ + "visitor_reservations": info.Stats.Reservations, + "visitor_reservations_limit": info.Limits.ReservationsLimit, + "visitor_reservations_remaining": info.Stats.ReservationsRemaining, + "visitor_attachment_total_size": info.Stats.AttachmentTotalSize, + "visitor_attachment_total_size_limit": info.Limits.AttachmentTotalSizeLimit, + "visitor_attachment_total_size_remaining": info.Stats.AttachmentTotalSizeRemaining, + } + +} func (v *visitor) RequestAllowed() error { v.mu.Lock() // limiters could be replaced! defer v.mu.Unlock() @@ -309,7 +329,6 @@ func (v *visitor) MaybeUserID() string { } func (v *visitor) resetLimitersNoLock(messages, emails int64, enqueueUpdate bool) { - log.Fields(v.contextNoLock()).Debug("Resetting limiters for visitor") limits := v.limitsNoLock() v.requestLimiter = rate.NewLimiter(limits.RequestLimitReplenish, limits.RequestLimitBurst) v.messagesLimiter = util.NewFixedLimiterWithValue(limits.MessageLimit, messages) @@ -326,6 +345,7 @@ func (v *visitor) resetLimitersNoLock(messages, emails int64, enqueueUpdate bool Emails: emails, }) } + log.Fields(v.contextNoLock()).Debug("Rate limiters reset for visitor") // Must be after function, because contextNoLock() describes rate limiters } func (v *visitor) Limits() *visitorLimits { @@ -345,7 +365,7 @@ func tierBasedVisitorLimits(conf *Config, tier *user.Tier) *visitorLimits { return &visitorLimits{ Basis: visitorLimitBasisTier, RequestLimitBurst: util.MinMax(int(float64(tier.MessageLimit)*visitorMessageToRequestLimitBurstRate), conf.VisitorRequestLimitBurst, visitorMessageToRequestLimitBurstMax), - RequestLimitReplenish: dailyLimitToRate(tier.MessageLimit * visitorMessageToRequestLimitReplenishFactor), + RequestLimitReplenish: util.Max(rate.Every(conf.VisitorRequestLimitReplenish), dailyLimitToRate(tier.MessageLimit*visitorMessageToRequestLimitReplenishFactor)), MessageLimit: tier.MessageLimit, MessageExpiryDuration: tier.MessageExpiryDuration, EmailLimit: tier.EmailLimit, @@ -383,9 +403,10 @@ func configBasedVisitorLimits(conf *Config) *visitorLimits { func (v *visitor) Info() (*visitorInfo, error) { v.mu.Lock() - messages := v.messagesLimiter.Value() - emails := v.emailsLimiter.Value() + info := v.infoLightNoLock() v.mu.Unlock() + + // Attachment stats from database var attachmentsBytesUsed int64 var err error u := v.User() @@ -397,6 +418,10 @@ func (v *visitor) Info() (*visitorInfo, error) { if err != nil { return nil, err } + info.Stats.AttachmentTotalSize = attachmentsBytesUsed + info.Stats.AttachmentTotalSizeRemaining = zeroIfNegative(info.Limits.AttachmentTotalSizeLimit - attachmentsBytesUsed) + + // Reservation stats from database var reservations int64 if v.userManager != nil && u != nil { reservations, err = v.userManager.ReservationsCount(u.Name) @@ -404,23 +429,27 @@ func (v *visitor) Info() (*visitorInfo, error) { return nil, err } } - limits := v.Limits() + info.Stats.Reservations = reservations + info.Stats.ReservationsRemaining = zeroIfNegative(info.Limits.ReservationsLimit - reservations) + + return info, nil +} + +func (v *visitor) infoLightNoLock() *visitorInfo { + messages := v.messagesLimiter.Value() + emails := v.emailsLimiter.Value() + limits := v.limitsNoLock() stats := &visitorStats{ - Messages: messages, - MessagesRemaining: zeroIfNegative(limits.MessageLimit - messages), - Emails: emails, - EmailsRemaining: zeroIfNegative(limits.EmailLimit - emails), - Reservations: reservations, - ReservationsRemaining: zeroIfNegative(limits.ReservationsLimit - reservations), - AttachmentTotalSize: attachmentsBytesUsed, - AttachmentTotalSizeRemaining: zeroIfNegative(limits.AttachmentTotalSizeLimit - attachmentsBytesUsed), + Messages: messages, + MessagesRemaining: zeroIfNegative(limits.MessageLimit - messages), + Emails: emails, + EmailsRemaining: zeroIfNegative(limits.EmailLimit - emails), } return &visitorInfo{ Limits: limits, Stats: stats, - }, nil + } } - func zeroIfNegative(value int64) int64 { if value < 0 { return 0 diff --git a/util/util.go b/util/util.go index 3e0c1064..0ff0491f 100644 --- a/util/util.go +++ b/util/util.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "golang.org/x/time/rate" "io" "math/rand" "net/netip" @@ -365,6 +366,14 @@ func MinMax[T int | int64](value, min, max T) T { return value } +// Max returns the maximum value of the two given values +func Max[T int | int64 | rate.Limit](a, b T) T { + if a > b { + return a + } + return b +} + // String turns a string into a pointer of a string func String(v string) *string { return &v