diff --git a/server/errors.go b/server/errors.go index 6300ce41..8e565197 100644 --- a/server/errors.go +++ b/server/errors.go @@ -127,5 +127,5 @@ var ( errHTTPInternalError = &errHTTP{50001, http.StatusInternalServerError, "internal server error", "", nil} errHTTPInternalErrorInvalidPath = &errHTTP{50002, http.StatusInternalServerError, "internal server error: invalid path", "", nil} errHTTPInternalErrorMissingBaseURL = &errHTTP{50003, http.StatusInternalServerError, "internal server error: base-url must be be configured for this feature", "https://ntfy.sh/docs/config/", nil} - errHTTPInsufficientStorage = &errHTTP{50701, http.StatusInsufficientStorage, "internal server error: cannot publish to UnifiedPush topic without previously active subscriber", "", nil} + errHTTPInsufficientStorageUnifiedPush = &errHTTP{50701, http.StatusInsufficientStorage, "cannot publish to UnifiedPush topic without previously active subscriber", "", nil} ) diff --git a/server/server.go b/server/server.go index c1eab9b5..99df6c83 100644 --- a/server/server.go +++ b/server/server.go @@ -602,7 +602,7 @@ func (s *Server) handlePublishWithoutResponse(r *http.Request, v *visitor) (*mes // Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove // the subscription as invalid if any 400-499 code (except 429/408) is returned. // See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46 - return nil, errHTTPInsufficientStorage.With(t) + return nil, errHTTPInsufficientStorageUnifiedPush.With(t) } else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() { return nil, errHTTPTooManyRequestsLimitMessages.With(t) } else if email != "" && !vrate.EmailAllowed() { @@ -680,6 +680,9 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error { _, err := s.handlePublishWithoutResponse(r, v) if err != nil { + if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode { + return writeMatrixResponse(w, e.rejectedPushKey) + } return err } return writeMatrixSuccess(w) @@ -1036,6 +1039,9 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v * case <-time.After(s.config.KeepaliveInterval): logvr(v, r).Tag(tagSubscribe).Trace("Sending keepalive message") v.Keepalive() + for _, t := range topics { + t.Keepalive() + } if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message return err } @@ -1123,6 +1129,9 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"} case <-time.After(s.config.KeepaliveInterval): v.Keepalive() + for _, t := range topics { + t.Keepalive() + } if err := ping(); err != nil { return err } diff --git a/server/server_manager.go b/server/server_manager.go index 04e3f327..7deab25f 100644 --- a/server/server_manager.go +++ b/server/server_manager.go @@ -3,6 +3,7 @@ package server import ( "heckel.io/ntfy/log" "strings" + "time" ) func (s *Server) execManager() { @@ -34,16 +35,20 @@ func (s *Server) execManager() { s.mu.Lock() defer s.mu.Unlock() for _, t := range s.topics { - subs := t.SubscribersCount() - log.Tag(tagManager).With(t).Trace("- topic %s: %d subscribers", t.ID, subs) - msgs, exists := messageCounts[t.ID] - if t.Stale() && (!exists || msgs == 0) { - log.Tag(tagManager).With(t).Trace("Deleting empty topic %s", t.ID) + subs, lastAccess := t.Stats() + ev := log.Tag(tagManager).With(t) + if t.Stale() { + if ev.IsTrace() { + ev.Trace("- topic %s: Deleting stale topic (%d subscribers, accessed %s)", t.ID, subs, lastAccess.Format(time.RFC822)) + } emptyTopics++ delete(s.topics, t.ID) - continue + } else { + if ev.IsTrace() { + ev.Trace("- topic %s: %d subscribers, accessed %s", t.ID, subs, lastAccess.Format(time.RFC822)) + } + subscribers += subs } - subscribers += subs } }). Debug("Removed %d empty topic(s)", emptyTopics) diff --git a/server/server_matrix.go b/server/server_matrix.go index 5355b6dd..bd96f43c 100644 --- a/server/server_matrix.go +++ b/server/server_matrix.go @@ -126,6 +126,7 @@ func newRequestFromMatrixJSON(r *http.Request, baseURL string, messageLimit int) if r.Header.Get("X-Forwarded-For") != "" { newRequest.Header.Set("X-Forwarded-For", r.Header.Get("X-Forwarded-For")) } + newRequest.Header.Set("X-Matrix-Pushkey", pushKey) return newRequest, nil } diff --git a/server/topic.go b/server/topic.go index d3637f83..23613683 100644 --- a/server/topic.go +++ b/server/topic.go @@ -4,6 +4,11 @@ import ( "heckel.io/ntfy/log" "math/rand" "sync" + "time" +) + +const ( + topicExpiryDuration = 6 * time.Hour ) // topic represents a channel to which subscribers can subscribe, and publishers @@ -12,6 +17,7 @@ type topic struct { ID string subscribers map[int]*topicSubscriber rateVisitor *visitor + lastAccess time.Time mu sync.RWMutex } @@ -29,6 +35,7 @@ func newTopic(id string) *topic { return &topic{ ID: id, subscribers: make(map[int]*topicSubscriber), + lastAccess: time.Now(), } } @@ -42,6 +49,7 @@ func (t *topic) Subscribe(s subscriber, userID string, cancel func()) int { subscriber: s, cancel: cancel, } + t.lastAccess = time.Now() return subscriberID } @@ -51,13 +59,14 @@ func (t *topic) Stale() bool { if t.rateVisitor != nil && !t.rateVisitor.Stale() { return false } - return len(t.subscribers) == 0 + return len(t.subscribers) == 0 && time.Since(t.lastAccess) > topicExpiryDuration } func (t *topic) SetRateVisitor(v *visitor) { t.mu.Lock() defer t.mu.Unlock() t.rateVisitor = v + t.lastAccess = time.Now() } func (t *topic) RateVisitor() *visitor { @@ -96,15 +105,23 @@ func (t *topic) Publish(v *visitor, m *message) error { } else { logvm(v, m).Tag(tagPublish).Trace("No stream or WebSocket subscribers, not forwarding") } + t.Keepalive() }() return nil } -// SubscribersCount returns the number of subscribers to this topic -func (t *topic) SubscribersCount() int { +// Stats returns the number of subscribers and last access to this topic +func (t *topic) Stats() (int, time.Time) { t.mu.RLock() defer t.mu.RUnlock() - return len(t.subscribers) + return len(t.subscribers), t.lastAccess +} + +// Keepalive sets the last access time and ensures that Stale does not return true +func (t *topic) Keepalive() { + t.mu.Lock() + defer t.mu.Unlock() + t.lastAccess = time.Now() } // CancelSubscribers calls the cancel function for all subscribers, forcing