From ab955d4d1c7bfbc8f492f63bd0a0d610c697e1b7 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Wed, 1 Jun 2022 16:57:35 -0400 Subject: [PATCH] Logging --- cmd/access.go | 2 +- cmd/app.go | 24 +++++-------- cmd/config_loader.go | 12 +++++-- cmd/publish.go | 2 +- cmd/serve.go | 45 +++++++++++++++++++++--- cmd/subscribe.go | 8 ++--- cmd/subscribe_darwin.go | 4 +-- cmd/subscribe_linux.go | 4 +-- cmd/subscribe_windows.go | 2 +- cmd/user.go | 2 +- log/log.go | 26 ++++++++++++-- server/ntfy.service | 1 + server/server.go | 76 +++++++++++++++++++++++++++------------- server/server.yml | 5 +++ server/topic.go | 13 ++++--- 15 files changed, 161 insertions(+), 65 deletions(-) diff --git a/cmd/access.go b/cmd/access.go index b36dc38b..e1f61bc1 100644 --- a/cmd/access.go +++ b/cmd/access.go @@ -28,7 +28,7 @@ var cmdAccess = &cli.Command{ Usage: "Grant/revoke access to a topic, or show access", UsageText: "ntfy access [USERNAME [TOPIC [PERMISSION]]]", Flags: flagsAccess, - Before: initLogFunc(initConfigFileInputSourceFunc("config", flagsAccess)), + Before: initConfigFileInputSourceFunc("config", flagsAccess, initLogFunc), Action: execUserAccess, Category: categoryServer, Description: `Manage the access control list for the ntfy server. diff --git a/cmd/app.go b/cmd/app.go index 89634a8b..adac9d73 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -3,6 +3,7 @@ package cmd import ( "github.com/urfave/cli/v2" + "github.com/urfave/cli/v2/altsrc" "heckel.io/ntfy/log" "os" ) @@ -16,7 +17,7 @@ var commands = make([]*cli.Command, 0) var flagsDefault = []cli.Flag{ &cli.BoolFlag{Name: "debug", Aliases: []string{"d"}, EnvVars: []string{"NTFY_DEBUG"}, Usage: "enable debug logging"}, - &cli.StringFlag{Name: "log-level", Aliases: []string{"log_level"}, Value: log.InfoLevel.String(), EnvVars: []string{"NTFY_LOG_LEVEL"}, Usage: "set log level"}, + altsrc.NewStringFlag(&cli.StringFlag{Name: "log-level", Aliases: []string{"log_level"}, Value: log.InfoLevel.String(), EnvVars: []string{"NTFY_LOG_LEVEL"}, Usage: "set log level"}), } // New creates a new CLI application @@ -32,22 +33,15 @@ func New() *cli.App { ErrWriter: os.Stderr, Commands: commands, Flags: flagsDefault, - Before: initLogFunc(nil), + Before: initLogFunc, } } -func initLogFunc(next cli.BeforeFunc) cli.BeforeFunc { - return func(c *cli.Context) error { - if c.Bool("debug") { - log.SetLevel(log.DebugLevel) - } else { - log.SetLevel(log.ToLevel(c.String("log-level"))) - } - if next != nil { - if err := next(c); err != nil { - return err - } - } - return nil +func initLogFunc(c *cli.Context) error { + if c.Bool("debug") { + log.SetLevel(log.DebugLevel) + } else { + log.SetLevel(log.ToLevel(c.String("log-level"))) } + return nil } diff --git a/cmd/config_loader.go b/cmd/config_loader.go index 7840c6e7..6d984840 100644 --- a/cmd/config_loader.go +++ b/cmd/config_loader.go @@ -11,7 +11,7 @@ import ( // initConfigFileInputSourceFunc is like altsrc.InitInputSourceWithContext and altsrc.NewYamlSourceFromFlagFunc, but checks // if the config flag is exists and only loads it if it does. If the flag is set and the file exists, it fails. -func initConfigFileInputSourceFunc(configFlag string, flags []cli.Flag) cli.BeforeFunc { +func initConfigFileInputSourceFunc(configFlag string, flags []cli.Flag, next cli.BeforeFunc) cli.BeforeFunc { return func(context *cli.Context) error { configFile := context.String(configFlag) if context.IsSet(configFlag) && !util.FileExists(configFile) { @@ -23,7 +23,15 @@ func initConfigFileInputSourceFunc(configFlag string, flags []cli.Flag) cli.Befo if err != nil { return err } - return altsrc.ApplyInputSourceValues(context, inputSource, flags) + if err := altsrc.ApplyInputSourceValues(context, inputSource, flags); err != nil { + return err + } + if next != nil { + if err := next(context); err != nil { + return err + } + } + return nil } } diff --git a/cmd/publish.go b/cmd/publish.go index 51d30b6a..c56aecad 100644 --- a/cmd/publish.go +++ b/cmd/publish.go @@ -44,7 +44,7 @@ var cmdPublish = &cli.Command{ Action: execPublish, Category: categoryClient, Flags: flagsPublish, - Before: initLogFunc(nil), + Before: initLogFunc, Description: `Publish a message to a ntfy server. Examples: diff --git a/cmd/serve.go b/cmd/serve.go index df1f5798..50969e03 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -8,7 +8,10 @@ import ( "heckel.io/ntfy/log" "math" "net" + "os" + "os/signal" "strings" + "syscall" "time" "github.com/urfave/cli/v2" @@ -21,9 +24,13 @@ func init() { commands = append(commands, cmdServe) } +const ( + defaultServerConfigFile = "/etc/ntfy/server.yml" +) + var flagsServe = append( flagsDefault, - &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/server.yml", DefaultText: "/etc/ntfy/server.yml", Usage: "config file"}, + &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: defaultServerConfigFile, DefaultText: defaultServerConfigFile, Usage: "config file"}, altsrc.NewStringFlag(&cli.StringFlag{Name: "base-url", Aliases: []string{"base_url", "B"}, EnvVars: []string{"NTFY_BASE_URL"}, Usage: "externally visible base URL for this host (e.g. https://ntfy.sh)"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"listen_http", "l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: server.DefaultListenHTTP, Usage: "ip:port used to as HTTP listen address"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-https", Aliases: []string{"listen_https", "L"}, EnvVars: []string{"NTFY_LISTEN_HTTPS"}, Usage: "ip:port used to as HTTPS listen address"}), @@ -69,7 +76,7 @@ var cmdServe = &cli.Command{ Action: execServe, Category: categoryServer, Flags: flagsServe, - Before: initLogFunc(initConfigFileInputSourceFunc("config", flagsServe)), + Before: initConfigFileInputSourceFunc("config", flagsServe, initLogFunc), Description: `Run the ntfy server and listen for incoming requests The command will load the configuration from /etc/ntfy/server.yml. Config options can @@ -86,6 +93,7 @@ func execServe(c *cli.Context) error { } // Read all the options + config := c.String("config") baseURL := c.String("base-url") listenHTTP := c.String("listen-http") listenHTTPS := c.String("listen-https") @@ -241,11 +249,15 @@ func execServe(c *cli.Context) error { conf.VisitorEmailLimitReplenish = visitorEmailLimitReplenish conf.BehindProxy = behindProxy conf.EnableWeb = enableWeb + + // Set up hot-reloading of config + go sigHandlerConfigReload(config) + + // Run server s, err := server.New(conf) if err != nil { log.Fatal(err) - } - if err := s.Run(); err != nil { + } else if err := s.Run(); err != nil { log.Fatal(err) } log.Info("Exiting.") @@ -262,3 +274,28 @@ func parseSize(s string, defaultValue int64) (v int64, err error) { } return v, nil } + +func sigHandlerConfigReload(config string) { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGHUP) + for range sigs { + log.Info("Partially hot reloading configuration ...") + inputSource, err := newYamlSourceFromFile(config, flagsServe) + if err != nil { + log.Warn("Hot reload failed: %s", err.Error()) + continue + } + reloadLogLevel(inputSource) + } +} + +func reloadLogLevel(inputSource altsrc.InputSourceContext) { + newLevelStr, err := inputSource.String("log-level") + if err != nil { + log.Warn("Cannot load log level: %s", err.Error()) + return + } + newLevel := log.ToLevel(newLevelStr) + log.SetLevel(newLevel) + log.Info("Log level is %s", newLevel.String()) +} diff --git a/cmd/subscribe.go b/cmd/subscribe.go index 618cdb9b..3b469344 100644 --- a/cmd/subscribe.go +++ b/cmd/subscribe.go @@ -43,7 +43,7 @@ var cmdSubscribe = &cli.Command{ Action: execSubscribe, Category: categoryClient, Flags: flagsSubscribe, - Before: initLogFunc(nil), + Before: initLogFunc, Description: `Subscribe to a topic from a ntfy server, and either print or execute a command for every arriving message. There are 3 modes in which the command can be run: @@ -253,7 +253,7 @@ func loadConfig(c *cli.Context) (*client.Config, error) { if filename != "" { return client.LoadConfig(filename) } - configFile := defaultConfigFile() + configFile := defaultClientConfigFile() if s, _ := os.Stat(configFile); s != nil { return client.LoadConfig(configFile) } @@ -261,7 +261,7 @@ func loadConfig(c *cli.Context) (*client.Config, error) { } //lint:ignore U1000 Conditionally used in different builds -func defaultConfigFileUnix() string { +func defaultClientConfigFileUnix() string { u, _ := user.Current() configFile := clientRootConfigFileUnixAbsolute if u.Uid != "0" { @@ -272,7 +272,7 @@ func defaultConfigFileUnix() string { } //lint:ignore U1000 Conditionally used in different builds -func defaultConfigFileWindows() string { +func defaultClientConfigFileWindows() string { homeDir, _ := os.UserConfigDir() return filepath.Join(homeDir, clientUserConfigFileWindowsRelative) } diff --git a/cmd/subscribe_darwin.go b/cmd/subscribe_darwin.go index e4f44ed6..0372a79f 100644 --- a/cmd/subscribe_darwin.go +++ b/cmd/subscribe_darwin.go @@ -11,6 +11,6 @@ var ( scriptLauncher = []string{"sh", "-c"} ) -func defaultConfigFile() string { - return defaultConfigFileUnix() +func defaultClientConfigFile() string { + return defaultClientConfigFileUnix() } diff --git a/cmd/subscribe_linux.go b/cmd/subscribe_linux.go index c57660e8..346606bd 100644 --- a/cmd/subscribe_linux.go +++ b/cmd/subscribe_linux.go @@ -11,6 +11,6 @@ var ( scriptLauncher = []string{"sh", "-c"} ) -func defaultConfigFile() string { - return defaultConfigFileUnix() +func defaultClientConfigFile() string { + return defaultClientConfigFileUnix() } diff --git a/cmd/subscribe_windows.go b/cmd/subscribe_windows.go index 1d5c6655..129e8f52 100644 --- a/cmd/subscribe_windows.go +++ b/cmd/subscribe_windows.go @@ -11,5 +11,5 @@ var ( ) func defaultConfigFile() string { - return defaultConfigFileWindows() + return defaultClientConfigFileWindows() } diff --git a/cmd/user.go b/cmd/user.go index 921aeda1..acc06d4c 100644 --- a/cmd/user.go +++ b/cmd/user.go @@ -29,7 +29,7 @@ var cmdUser = &cli.Command{ Usage: "Manage/show users", UsageText: "ntfy user [list|add|remove|change-pass|change-role] ...", Flags: flagsUser, - Before: initLogFunc(initConfigFileInputSourceFunc("config", flagsUser)), + Before: initConfigFileInputSourceFunc("config", flagsUser, initLogFunc), Category: categoryServer, Subcommands: []*cli.Command{ { diff --git a/log/log.go b/log/log.go index 8c134508..36abc0e3 100644 --- a/log/log.go +++ b/log/log.go @@ -3,10 +3,13 @@ package log import ( "log" "strings" + "sync" ) +// Level is a well-known log level, as defined below type Level int +// Well known log levels const ( DebugLevel Level = iota InfoLevel @@ -30,32 +33,50 @@ func (l Level) String() string { var ( level = InfoLevel + mu = &sync.Mutex{} ) +// Debug prints the given message, if the current log level is DEBUG func Debug(message string, v ...interface{}) { logIf(DebugLevel, message, v...) } +// Info prints the given message, if the current log level is INFO or lower func Info(message string, v ...interface{}) { logIf(InfoLevel, message, v...) } +// Warn prints the given message, if the current log level is WARN or lower func Warn(message string, v ...interface{}) { logIf(WarnLevel, message, v...) } +// Error prints the given message, if the current log level is ERROR or lower func Error(message string, v ...interface{}) { logIf(ErrorLevel, message, v...) } +// Fatal prints the given message, and exits the program func Fatal(v ...interface{}) { log.Fatalln(v...) } +// CurrentLevel returns the current log level +func CurrentLevel() Level { + mu.Lock() + defer mu.Unlock() + return level +} + +// SetLevel sets a new log level func SetLevel(newLevel Level) { + mu.Lock() + defer mu.Unlock() level = newLevel } +// ToLevel converts a string to a Level. It returns InfoLevel if the string +// does not match any known log levels. func ToLevel(s string) Level { switch strings.ToLower(s) { case "debug": @@ -67,13 +88,12 @@ func ToLevel(s string) Level { case "error": return ErrorLevel default: - log.Fatalf("unknown log level: %s", s) - return 0 + return InfoLevel } } func logIf(l Level, message string, v ...interface{}) { - if level <= l { + if CurrentLevel() <= l { log.Printf(l.String()+" "+message, v...) } } diff --git a/server/ntfy.service b/server/ntfy.service index 6645b21f..f32ed898 100644 --- a/server/ntfy.service +++ b/server/ntfy.service @@ -6,6 +6,7 @@ After=network.target User=ntfy Group=ntfy ExecStart=/usr/bin/ntfy serve +ExecReload=/bin/kill --signal HUP $MAINPID Restart=on-failure AmbientCapabilities=CAP_NET_BIND_SERVICE LimitNOFILE=10000 diff --git a/server/server.go b/server/server.go index 8ed5795c..4688a03f 100644 --- a/server/server.go +++ b/server/server.go @@ -179,7 +179,7 @@ func (s *Server) Run() error { if s.config.SMTPServerListen != "" { listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen) } - log.Info("Listening on%s", listenStr) + log.Info("Listening on%s, log level is %s", listenStr, log.CurrentLevel().String()) mux := http.NewServeMux() mux.HandleFunc("/", s.handle) errChan := make(chan error) @@ -246,18 +246,28 @@ func (s *Server) Stop() { func (s *Server) handle(w http.ResponseWriter, r *http.Request) { v := s.visitor(r) - log.Debug("[%s] %s %s", v.ip, r.Method, r.URL.Path) - + log.Debug("%s HTTP %s %s", v.ip, r.Method, r.URL.Path) if err := s.handleInternal(w, r, v); err != nil { if websocket.IsWebSocketUpgrade(r) { - log.Info("[%s] WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error()) + isNormalError := websocket.IsCloseError(err, websocket.CloseAbnormalClosure) || strings.Contains(err.Error(), "i/o timeout") + if isNormalError { + log.Debug("%s WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error()) + } else { + log.Warn("%s WS %s %s - %s", v.ip, r.Method, r.URL.Path, err.Error()) + } return // Do not attempt to write to upgraded connection } httpErr, ok := err.(*errHTTP) if !ok { httpErr = errHTTPInternalError } - log.Info("[%s] HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error()) + isNormalError := httpErr.Code == 404 + if isNormalError { + log.Debug("%s HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error()) + } else { + log.Info("%s HTTP %s %s - %d - %d - %s", v.ip, r.Method, r.URL.Path, httpErr.HTTPCode, httpErr.Code, err.Error()) + } + w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests w.WriteHeader(httpErr.HTTPCode) @@ -434,21 +444,23 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito m.Message = emptyMessageBody } delayed := m.Time > time.Now().Unix() - log.Debug("[%s] %s %s: ev=%s, body=%d bytes, delayed=%t, fb=%t, cache=%t, up=%t, email=%s", - v.ip, r.Method, r.URL.Path, m.Event, len(body.PeekedBytes), delayed, firebase, cache, unifiedpush, email) + log.Debug("%s Received message: ev=%s, body=%d bytes, delayed=%t, fb=%t, cache=%t, up=%t, email=%s", + logPrefix(v, m), m.Event, len(body.PeekedBytes), delayed, firebase, cache, unifiedpush, email) if !delayed { if err := t.Publish(v, m); err != nil { return err } - } - if s.firebaseClient != nil && firebase && !delayed { - go s.sendToFirebase(v, m) - } - if s.mailer != nil && email != "" && !delayed { - go s.sendEmail(v, m, email) - } - if s.config.UpstreamBaseURL != "" && !delayed { - go s.forwardPollRequest(v, m) + if s.firebaseClient != nil && firebase { + go s.sendToFirebase(v, m) + } + if s.mailer != nil && email != "" { + go s.sendEmail(v, m, email) + } + if s.config.UpstreamBaseURL != "" { + go s.forwardPollRequest(v, m) + } + } else { + log.Debug("%s Message delayed, will process later", logPrefix(v, m)) } if cache { if err := s.messageCache.AddMessage(m); err != nil { @@ -467,14 +479,16 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito } func (s *Server) sendToFirebase(v *visitor, m *message) { + log.Debug("%s Publishing to Firebase", logPrefix(v, m)) if err := s.firebaseClient.Send(v, m); err != nil { - log.Warn("[%s] FB - Unable to publish to Firebase: %v", v.ip, err.Error()) + log.Warn("%s Unable to publish to Firebase: %v", logPrefix(v, m), err.Error()) } } func (s *Server) sendEmail(v *visitor, m *message, email string) { + log.Debug("%s Sending email to %s", logPrefix(v, m), email) if err := s.mailer.Send(v.ip, email, m); err != nil { - log.Warn("[%s] MAIL - Unable to send email: %v", v.ip, err.Error()) + log.Warn("%s Unable to send email: %v", logPrefix(v, m), err.Error()) } } @@ -482,9 +496,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) { topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic) topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL))) forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash) + log.Debug("%s Publishing poll request to %s", logPrefix(v, m), forwardURL) req, err := http.NewRequest("POST", forwardURL, strings.NewReader("")) if err != nil { - log.Warn("[%s] FWD - Unable to forward poll request: %v", v.ip, err.Error()) + log.Warn("%s Unable to publish poll request: %v", logPrefix(v, m), err.Error()) return } req.Header.Set("X-Poll-ID", m.ID) @@ -493,10 +508,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) { } response, err := httpClient.Do(req) if err != nil { - log.Warn("[%s] FWD - Unable to forward poll request: %v", v.ip, err.Error()) + log.Warn("%s Unable to publish poll request: %v", logPrefix(v, m), err.Error()) return } else if response.StatusCode != http.StatusOK { - log.Warn("[%s] FWD - Unable to forward poll request, unexpected status: %d", v.ip, response.StatusCode) + log.Warn("%s Unable to publish poll request, unexpected HTTP status: %d", logPrefix(v, m), response.StatusCode) return } } @@ -1012,6 +1027,7 @@ func (s *Server) updateStatsAndPrune() { // Expire visitors from rate visitors map for ip, v := range s.visitors { if v.Stale() { + log.Debug("Deleting stale visitor %s", v.ip) delete(s.visitors, ip) } } @@ -1019,17 +1035,21 @@ func (s *Server) updateStatsAndPrune() { // Delete expired attachments if s.fileCache != nil { ids, err := s.messageCache.AttachmentsExpired() - if err == nil { + if err != nil { + log.Warn("Error retrieving expired attachments: %s", err.Error()) + } else if len(ids) > 0 { + log.Debug("Deleting expired attachments: %v", ids) if err := s.fileCache.Remove(ids...); err != nil { log.Warn("Error deleting attachments: %s", err.Error()) } } else { - log.Warn("Error retrieving expired attachments: %s", err.Error()) + log.Debug("No expired attachments to delete") } } // Prune message cache olderThan := time.Now().Add(-1 * s.config.CacheDuration) + log.Debug("Pruning messages older tha %v", olderThan) if err := s.messageCache.Prune(olderThan); err != nil { log.Warn("Error pruning cache: %s", err.Error()) } @@ -1079,6 +1099,7 @@ func (s *Server) runManager() { for { select { case <-time.After(s.config.ManagerInterval): + log.Debug("Running manager") s.updateStatsAndPrune() case <-s.closeChan: return @@ -1124,7 +1145,7 @@ func (s *Server) sendDelayedMessages() error { for _, m := range messages { v := s.visitorFromIP(m.Sender) if err := s.sendDelayedMessage(v, m); err != nil { - log.Warn("error sending delayed message: %s", err.Error()) + log.Warn("%s Error sending delayed message: %s", logPrefix(v, m), err.Error()) } } return nil @@ -1133,12 +1154,13 @@ func (s *Server) sendDelayedMessages() error { func (s *Server) sendDelayedMessage(v *visitor, m *message) error { s.mu.Lock() defer s.mu.Unlock() + log.Debug("%s Sending delayed message", logPrefix(v, m)) t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published if ok { go func() { // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler if err := t.Publish(v, m); err != nil { - log.Warn("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error()) + log.Warn("%s Unable to publish message: %v", logPrefix(v, m), err.Error()) } }() } @@ -1311,3 +1333,7 @@ func (s *Server) visitorFromIP(ip string) *visitor { v.Keepalive() return v } + +func logPrefix(v *visitor, m *message) string { + return fmt.Sprintf("%s/%s/%s", v.ip, m.Topic, m.ID) +} diff --git a/server/server.yml b/server/server.yml index ce7b1c75..f6c14a64 100644 --- a/server/server.yml +++ b/server/server.yml @@ -178,3 +178,8 @@ # # visitor-attachment-total-size-limit: "100M" # visitor-attachment-daily-bandwidth-limit: "500M" + +# Log level, can be DEBUG, INFO, WARN or ERROR +# This option can be hot-reloaded by calling "kill -HUP $pid" or "systemctl reload ntfy". +# +# log-level: INFO diff --git a/server/topic.go b/server/topic.go index eb53225b..3cb11394 100644 --- a/server/topic.go +++ b/server/topic.go @@ -1,7 +1,7 @@ package server import ( - "log" + "heckel.io/ntfy/log" "math/rand" "sync" ) @@ -46,10 +46,15 @@ func (t *topic) Publish(v *visitor, m *message) error { go func() { t.mu.Lock() defer t.mu.Unlock() - for _, s := range t.subscribers { - if err := s(v, m); err != nil { - log.Printf("error publishing message to subscriber") + if len(t.subscribers) > 0 { + log.Debug("%s Forwarding to %d subscriber(s)", logPrefix(v, m), len(t.subscribers)) + for _, s := range t.subscribers { + if err := s(v, m); err != nil { + log.Warn("%s Error forwarding to subscriber", logPrefix(v, m)) + } } + } else { + log.Debug("%s No subscribers, not forwarding", logPrefix(v, m)) } }() return nil