1
0
Fork 0
mirror of https://github.com/binwiederhier/ntfy.git synced 2025-05-24 15:57:35 +02:00
This commit is contained in:
Philipp Heckel 2022-06-01 16:57:35 -04:00
parent bd865fd55d
commit ab955d4d1c
15 changed files with 161 additions and 65 deletions
server

View file

@ -179,7 +179,7 @@ func (s *Server) Run() error {
if s.config.SMTPServerListen != "" {
listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
}
log.Info("Listening on%s", listenStr)
log.Info("Listening on%s, log level is %s", listenStr, log.CurrentLevel().String())
mux := http.NewServeMux()
mux.HandleFunc("/", s.handle)
errChan := make(chan error)
@ -246,18 +246,28 @@ func (s *Server) Stop() {
func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
v := s.visitor(r)
log.Debug("[%s] %s %s", v.ip, r.Method, r.URL.Path)
log.Debug("%s HTTP %s %s", v.ip, r.Method, r.URL.Path)
if err := s.handleInternal(w, r, v); err != nil {
if websocket.IsWebSocketUpgrade(r) {
log.Info("[%s] WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error())
isNormalError := websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || strings.Contains(err.Error(), "i/o timeout")
if isNormalError {
log.Debug("%s WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error())
} else {
log.Warn("%s WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error())
}
return // Do not attempt to write to upgraded connection
}
httpErr, ok := err.(*errHTTP)
if !ok {
httpErr = errHTTPInternalError
}
log.Info("[%s] HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error())
isNormalError := httpErr.Code == 404
if isNormalError {
log.Debug("%s HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error())
} else {
log.Info("%s HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error())
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests
w.WriteHeader(httpErr.HTTPCode)
@ -434,21 +444,23 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
m.Message = emptyMessageBody
}
delayed := m.Time > time.Now().Unix()
log.Debug("[%s] %s %s: ev=%s, body=%d bytes, delayed=%t, fb=%t, cache=%t, up=%t, email=%s",
v.ip, r.Method, r.URL.Path, m.Event, len(body.PeekedBytes), delayed, firebase, cache, unifiedpush, email)
log.Debug("%s Received message: ev=%s, body=%d bytes, delayed=%t, fb=%t, cache=%t, up=%t, email=%s",
logPrefix(v, m), m.Event, len(body.PeekedBytes), delayed, firebase, cache, unifiedpush, email)
if !delayed {
if err := t.Publish(v, m); err != nil {
return err
}
}
if s.firebaseClient != nil && firebase && !delayed {
go s.sendToFirebase(v, m)
}
if s.mailer != nil && email != "" && !delayed {
go s.sendEmail(v, m, email)
}
if s.config.UpstreamBaseURL != "" && !delayed {
go s.forwardPollRequest(v, m)
if s.firebaseClient != nil && firebase {
go s.sendToFirebase(v, m)
}
if s.mailer != nil && email != "" {
go s.sendEmail(v, m, email)
}
if s.config.UpstreamBaseURL != "" {
go s.forwardPollRequest(v, m)
}
} else {
log.Debug("%s Message delayed, will process later", logPrefix(v, m))
}
if cache {
if err := s.messageCache.AddMessage(m); err != nil {
@ -467,14 +479,16 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito
}
func (s *Server) sendToFirebase(v *visitor, m *message) {
log.Debug("%s Publishing to Firebase", logPrefix(v, m))
if err := s.firebaseClient.Send(v, m); err != nil {
log.Warn("[%s] FB - Unable to publish to Firebase: %v", v.ip, err.Error())
log.Warn("%s Unable to publish to Firebase: %v", logPrefix(v, m), err.Error())
}
}
func (s *Server) sendEmail(v *visitor, m *message, email string) {
log.Debug("%s Sending email to %s", logPrefix(v, m), email)
if err := s.mailer.Send(v.ip, email, m); err != nil {
log.Warn("[%s] MAIL - Unable to send email: %v", v.ip, err.Error())
log.Warn("%s Unable to send email: %v", logPrefix(v, m), err.Error())
}
}
@ -482,9 +496,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) {
topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
log.Debug("%s Publishing poll request to %s", logPrefix(v, m), forwardURL)
req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
if err != nil {
log.Warn("[%s] FWD - Unable to forward poll request: %v", v.ip, err.Error())
log.Warn("%s Unable to publish poll request: %v", logPrefix(v, m), err.Error())
return
}
req.Header.Set("X-Poll-ID", m.ID)
@ -493,10 +508,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) {
}
response, err := httpClient.Do(req)
if err != nil {
log.Warn("[%s] FWD - Unable to forward poll request: %v", v.ip, err.Error())
log.Warn("%s Unable to publish poll request: %v", logPrefix(v, m), err.Error())
return
} else if response.StatusCode != http.StatusOK {
log.Warn("[%s] FWD - Unable to forward poll request, unexpected status: %d", v.ip, response.StatusCode)
log.Warn("%s Unable to publish poll request, unexpected HTTP status: %d", logPrefix(v, m), response.StatusCode)
return
}
}
@ -1012,6 +1027,7 @@ func (s *Server) updateStatsAndPrune() {
// Expire visitors from rate visitors map
for ip, v := range s.visitors {
if v.Stale() {
log.Debug("Deleting stale visitor %s", v.ip)
delete(s.visitors, ip)
}
}
@ -1019,17 +1035,21 @@ func (s *Server) updateStatsAndPrune() {
// Delete expired attachments
if s.fileCache != nil {
ids, err := s.messageCache.AttachmentsExpired()
if err == nil {
if err != nil {
log.Warn("Error retrieving expired attachments: %s", err.Error())
} else if len(ids) > 0 {
log.Debug("Deleting expired attachments: %v", ids)
if err := s.fileCache.Remove(ids...); err != nil {
log.Warn("Error deleting attachments: %s", err.Error())
}
} else {
log.Warn("Error retrieving expired attachments: %s", err.Error())
log.Debug("No expired attachments to delete")
}
}
// Prune message cache
olderThan := time.Now().Add(-1 * s.config.CacheDuration)
log.Debug("Pruning messages older tha %v", olderThan)
if err := s.messageCache.Prune(olderThan); err != nil {
log.Warn("Error pruning cache: %s", err.Error())
}
@ -1079,6 +1099,7 @@ func (s *Server) runManager() {
for {
select {
case <-time.After(s.config.ManagerInterval):
log.Debug("Running manager")
s.updateStatsAndPrune()
case <-s.closeChan:
return
@ -1124,7 +1145,7 @@ func (s *Server) sendDelayedMessages() error {
for _, m := range messages {
v := s.visitorFromIP(m.Sender)
if err := s.sendDelayedMessage(v, m); err != nil {
log.Warn("error sending delayed message: %s", err.Error())
log.Warn("%s Error sending delayed message: %s", logPrefix(v, m), err.Error())
}
}
return nil
@ -1133,12 +1154,13 @@ func (s *Server) sendDelayedMessages() error {
func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
s.mu.Lock()
defer s.mu.Unlock()
log.Debug("%s Sending delayed message", logPrefix(v, m))
t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
if ok {
go func() {
// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
if err := t.Publish(v, m); err != nil {
log.Warn("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error())
log.Warn("%s Unable to publish message: %v", logPrefix(v, m), err.Error())
}
}()
}
@ -1311,3 +1333,7 @@ func (s *Server) visitorFromIP(ip string) *visitor {
v.Keepalive()
return v
}
func logPrefix(v *visitor, m *message) string {
return fmt.Sprintf("%s/%s/%s", v.ip, m.Topic, m.ID)
}