From 1e8421e8ce90def541c5ddb097b2ad475a6b4729 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Thu, 16 Dec 2021 20:33:01 -0500 Subject: [PATCH] WIP: CLI, relates to #46 --- client/client.go | 135 ++++++++++++++++++++++++++++++++++++++++++++ client/options.go | 71 +++++++++++++++++++++++ cmd/app.go | 111 +++++++++--------------------------- cmd/publish.go | 70 +++++++++++++++++++++++ cmd/serve.go | 108 +++++++++++++++++++++++++++++++++++ cmd/subscribe.go | 132 +++++++++++++++++++++++++++++++++++++++++++ config/ntfy.service | 2 +- main.go | 2 +- server/server.go | 19 +------ util/util.go | 23 ++++++++ 10 files changed, 571 insertions(+), 102 deletions(-) create mode 100644 client/client.go create mode 100644 client/options.go create mode 100644 cmd/publish.go create mode 100644 cmd/serve.go create mode 100644 cmd/subscribe.go diff --git a/client/client.go b/client/client.go new file mode 100644 index 00000000..a670760b --- /dev/null +++ b/client/client.go @@ -0,0 +1,135 @@ +package client + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + "sync" + "time" +) + +const ( + DefaultBaseURL = "https://ntfy.sh" +) + +const ( + MessageEvent = "message" + KeepaliveEvent = "keepalive" + OpenEvent = "open" +) + +type Client struct { + BaseURL string + Messages chan *Message + subscriptions map[string]*subscription + mu sync.Mutex +} + +type Message struct { + ID string + Event string + Time int64 + Topic string + Message string + Title string + Priority int + Tags []string + BaseURL string + TopicURL string + Raw string +} + +type subscription struct { + cancel context.CancelFunc +} + +var DefaultClient = New() + +func New() *Client { + return &Client{ + Messages: make(chan *Message), + subscriptions: make(map[string]*subscription), + } +} + +func (c *Client) Publish(topicURL, message string, options ...PublishOption) error { + req, _ := http.NewRequest("POST", topicURL, strings.NewReader(message)) + for _, option := range options { + if err := option(req); err != nil { + return err + } + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected response %d from server", resp.StatusCode) + } + return err +} + +func (c *Client) Subscribe(topicURL string) { + c.mu.Lock() + defer c.mu.Unlock() + if _, ok := c.subscriptions[topicURL]; ok { + return + } + ctx, cancel := context.WithCancel(context.Background()) + c.subscriptions[topicURL] = &subscription{cancel} + go handleConnectionLoop(ctx, c.Messages, topicURL) +} + +func (c *Client) Unsubscribe(topicURL string) { + c.mu.Lock() + defer c.mu.Unlock() + sub, ok := c.subscriptions[topicURL] + if !ok { + return + } + sub.cancel() + return +} + +func handleConnectionLoop(ctx context.Context, msgChan chan *Message, topicURL string) { + for { + if err := handleConnection(ctx, msgChan, topicURL); err != nil { + log.Printf("connection to %s failed: %s", topicURL, err.Error()) + } + select { + case <-ctx.Done(): + log.Printf("connection to %s exited", topicURL) + return + case <-time.After(5 * time.Second): + } + } +} + +func handleConnection(ctx context.Context, msgChan chan *Message, topicURL string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil) + if err != nil { + return err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + var m *Message + line := scanner.Text() + if err := json.NewDecoder(strings.NewReader(line)).Decode(&m); err != nil { + return err + } + m.BaseURL = strings.TrimSuffix(topicURL, "/"+m.Topic) // FIXME hack! + m.TopicURL = topicURL + m.Raw = line + msgChan <- m + } + return nil +} diff --git a/client/options.go b/client/options.go new file mode 100644 index 00000000..8812e4df --- /dev/null +++ b/client/options.go @@ -0,0 +1,71 @@ +package client + +import ( + "net/http" +) + +type PublishOption func(r *http.Request) error + +func WithTitle(title string) PublishOption { + return func(r *http.Request) error { + if title != "" { + r.Header.Set("X-Title", title) + } + return nil + } +} + +func WithPriority(priority string) PublishOption { + return func(r *http.Request) error { + if priority != "" { + r.Header.Set("X-Priority", priority) + } + return nil + } +} + +func WithTags(tags string) PublishOption { + return func(r *http.Request) error { + if tags != "" { + r.Header.Set("X-Tags", tags) + } + return nil + } +} + +func WithDelay(delay string) PublishOption { + return func(r *http.Request) error { + if delay != "" { + r.Header.Set("X-Delay", delay) + } + return nil + } +} + +func WithNoCache() PublishOption { + return WithHeader("X-Cache", "no") +} + +func WithNoFirebase() PublishOption { + return WithHeader("X-Firebase", "no") +} + +func WithHeader(header, value string) PublishOption { + return func(r *http.Request) error { + r.Header.Set(header, value) + return nil + } +} + +type SubscribeOption func(r *http.Request) error + +func WithSince(since string) PublishOption { + return func(r *http.Request) error { + if since != "" { + q := r.URL.Query() + q.Add("since", since) + r.URL.RawQuery = q.Encode() + } + return nil + } +} diff --git a/cmd/app.go b/cmd/app.go index 379a58a0..e7234e01 100644 --- a/cmd/app.go +++ b/cmd/app.go @@ -2,112 +2,42 @@ package cmd import ( - "errors" "fmt" "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" - "heckel.io/ntfy/config" - "heckel.io/ntfy/server" + "heckel.io/ntfy/client" "heckel.io/ntfy/util" "log" "os" - "time" + "strings" ) // New creates a new CLI application func New() *cli.App { - flags := []cli.Flag{ - &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/config.yml", DefaultText: "/etc/ntfy/config.yml", Usage: "config file"}, - altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: config.DefaultListenHTTP, Usage: "ip:port used to as HTTP listen address"}), - altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-https", Aliases: []string{"L"}, EnvVars: []string{"NTFY_LISTEN_HTTPS"}, Usage: "ip:port used to as HTTPS listen address"}), - altsrc.NewStringFlag(&cli.StringFlag{Name: "key-file", Aliases: []string{"K"}, EnvVars: []string{"NTFY_KEY_FILE"}, Usage: "private key file, if listen-https is set"}), - altsrc.NewStringFlag(&cli.StringFlag{Name: "cert-file", Aliases: []string{"E"}, EnvVars: []string{"NTFY_CERT_FILE"}, Usage: "certificate file, if listen-https is set"}), - altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}), - altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}), - altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: config.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}), - altsrc.NewDurationFlag(&cli.DurationFlag{Name: "keepalive-interval", Aliases: []string{"k"}, EnvVars: []string{"NTFY_KEEPALIVE_INTERVAL"}, Value: config.DefaultKeepaliveInterval, Usage: "interval of keepalive messages"}), - altsrc.NewDurationFlag(&cli.DurationFlag{Name: "manager-interval", Aliases: []string{"m"}, EnvVars: []string{"NTFY_MANAGER_INTERVAL"}, Value: config.DefaultManagerInterval, Usage: "interval of for message pruning and stats printing"}), - altsrc.NewIntFlag(&cli.IntFlag{Name: "global-topic-limit", Aliases: []string{"T"}, EnvVars: []string{"NTFY_GLOBAL_TOPIC_LIMIT"}, Value: config.DefaultGlobalTopicLimit, Usage: "total number of topics allowed"}), - altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-subscription-limit", Aliases: []string{"V"}, EnvVars: []string{"NTFY_VISITOR_SUBSCRIPTION_LIMIT"}, Value: config.DefaultVisitorSubscriptionLimit, Usage: "number of subscriptions per visitor"}), - altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-request-limit-burst", Aliases: []string{"B"}, EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_BURST"}, Value: config.DefaultVisitorRequestLimitBurst, Usage: "initial limit of requests per visitor"}), - altsrc.NewDurationFlag(&cli.DurationFlag{Name: "visitor-request-limit-replenish", Aliases: []string{"R"}, EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_REPLENISH"}, Value: config.DefaultVisitorRequestLimitReplenish, Usage: "interval at which burst limit is replenished (one per x)"}), - altsrc.NewBoolFlag(&cli.BoolFlag{Name: "behind-proxy", Aliases: []string{"P"}, EnvVars: []string{"NTFY_BEHIND_PROXY"}, Value: false, Usage: "if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting)"}), - } return &cli.App{ Name: "ntfy", Usage: "Simple pub-sub notification service", UsageText: "ntfy [OPTION..]", - HideHelp: true, HideVersion: true, - EnableBashCompletion: true, UseShortOptionHandling: true, Reader: os.Stdin, Writer: os.Stdout, ErrWriter: os.Stderr, - Action: execRun, - Before: initConfigFileInputSource("config", flags), - Flags: flags, + Action: execMainApp, + Before: initConfigFileInputSource("config", flagsServe), // DEPRECATED, see deprecation notice + Flags: flagsServe, // DEPRECATED, see deprecation notice + Commands: []*cli.Command{ + cmdServe, + cmdPublish, + cmdSubscribe, + }, } } -func execRun(c *cli.Context) error { - // Read all the options - listenHTTP := c.String("listen-http") - listenHTTPS := c.String("listen-https") - keyFile := c.String("key-file") - certFile := c.String("cert-file") - firebaseKeyFile := c.String("firebase-key-file") - cacheFile := c.String("cache-file") - cacheDuration := c.Duration("cache-duration") - keepaliveInterval := c.Duration("keepalive-interval") - managerInterval := c.Duration("manager-interval") - globalTopicLimit := c.Int("global-topic-limit") - visitorSubscriptionLimit := c.Int("visitor-subscription-limit") - visitorRequestLimitBurst := c.Int("visitor-request-limit-burst") - visitorRequestLimitReplenish := c.Duration("visitor-request-limit-replenish") - behindProxy := c.Bool("behind-proxy") - - // Check values - if firebaseKeyFile != "" && !util.FileExists(firebaseKeyFile) { - return errors.New("if set, FCM key file must exist") - } else if keepaliveInterval < 5*time.Second { - return errors.New("keepalive interval cannot be lower than five seconds") - } else if managerInterval < 5*time.Second { - return errors.New("manager interval cannot be lower than five seconds") - } else if cacheDuration > 0 && cacheDuration < managerInterval { - return errors.New("cache duration cannot be lower than manager interval") - } else if keyFile != "" && !util.FileExists(keyFile) { - return errors.New("if set, key file must exist") - } else if certFile != "" && !util.FileExists(certFile) { - return errors.New("if set, certificate file must exist") - } else if listenHTTPS != "" && (keyFile == "" || certFile == "") { - return errors.New("if listen-https is set, both key-file and cert-file must be set") - } - - // Run server - conf := config.New(listenHTTP) - conf.ListenHTTPS = listenHTTPS - conf.KeyFile = keyFile - conf.CertFile = certFile - conf.FirebaseKeyFile = firebaseKeyFile - conf.CacheFile = cacheFile - conf.CacheDuration = cacheDuration - conf.KeepaliveInterval = keepaliveInterval - conf.ManagerInterval = managerInterval - conf.GlobalTopicLimit = globalTopicLimit - conf.VisitorSubscriptionLimit = visitorSubscriptionLimit - conf.VisitorRequestLimitBurst = visitorRequestLimitBurst - conf.VisitorRequestLimitReplenish = visitorRequestLimitReplenish - conf.BehindProxy = behindProxy - s, err := server.New(conf) - if err != nil { - log.Fatalln(err) - } - if err := s.Run(); err != nil { - log.Fatalln(err) - } - log.Printf("Exiting.") - return nil +func execMainApp(c *cli.Context) error { + log.Printf("\x1b[1;33mDeprecation notice: Please run the server using 'ntfy serve'; see 'ntfy -h' for help.\x1b[0m") + log.Printf("\x1b[1;33mThis way of running the server will be removed Feb 2022.\x1b[0m") + return execServe(c) } // initConfigFileInputSource is like altsrc.InitInputSourceWithContext and altsrc.NewYamlSourceFromFlagFunc, but checks @@ -127,3 +57,16 @@ func initConfigFileInputSource(configFlag string, flags []cli.Flag) cli.BeforeFu return altsrc.ApplyInputSourceValues(context, inputSource, flags) } } + +func expandTopicURL(s string) string { + if strings.HasPrefix(s, "http://") || strings.HasPrefix(s, "https://") { + return s + } else if strings.Contains(s, "/") { + return fmt.Sprintf("https://%s", s) + } + return fmt.Sprintf("%s/%s", client.DefaultBaseURL, s) +} + +func collapseTopicURL(s string) string { + return strings.TrimPrefix(strings.TrimPrefix(s, "https://"), "http://") +} diff --git a/cmd/publish.go b/cmd/publish.go new file mode 100644 index 00000000..274fee09 --- /dev/null +++ b/cmd/publish.go @@ -0,0 +1,70 @@ +package cmd + +import ( + "errors" + "github.com/urfave/cli/v2" + "heckel.io/ntfy/client" + "strings" +) + +var cmdPublish = &cli.Command{ + Name: "publish", + Aliases: []string{"pub", "send"}, + Usage: "Send message via a ntfy server", + UsageText: "ntfy send [OPTIONS..] TOPIC MESSAGE", + Action: execPublish, + Flags: []cli.Flag{ + &cli.StringFlag{Name: "title", Aliases: []string{"t"}, Usage: "message title"}, + &cli.StringFlag{Name: "priority", Aliases: []string{"p"}, Usage: "priority of the message (1=min, 2=low, 3=default, 4=high, 5=max)"}, + &cli.StringFlag{Name: "tags", Aliases: []string{"ta"}, Usage: "comma separated list of tags and emojis"}, + &cli.StringFlag{Name: "delay", Aliases: []string{"at", "in"}, Usage: "delay/schedule message"}, + &cli.BoolFlag{Name: "no-cache", Aliases: []string{"C"}, Usage: "do not cache message server-side"}, + &cli.BoolFlag{Name: "no-firebase", Aliases: []string{"F"}, Usage: "do not forward message to Firebase"}, + }, + Description: `Publish a message to a ntfy server. + +Examples: + ntfy publish mytopic This is my message # Send simple message + ntfy send myserver.com/mytopic "This is my message" # Send message to different default host + ntfy pub -p high backups "Backups failed" # Send high priority message + ntfy pub --tags=warning,skull backups "Backups failed" # Add tags/emojis to message + ntfy pub --delay=10s delayed_topic Laterzz # Delay message by 10s + ntfy pub --at=8:30am delayed_topic Laterzz # Send message at 8:30am + +Please also check out the docs on publishing messages. Especially for the --tags and --delay options, +it has incredibly useful information: https://ntfy.sh/docs/publish/.`, +} + +func execPublish(c *cli.Context) error { + if c.NArg() < 2 { + return errors.New("topic/message missing") + } + title := c.String("title") + priority := c.String("priority") + tags := c.String("tags") + delay := c.String("delay") + noCache := c.Bool("no-cache") + noFirebase := c.Bool("no-firebase") + topicURL := expandTopicURL(c.Args().Get(0)) + message := strings.Join(c.Args().Slice()[1:], " ") + var options []client.PublishOption + if title != "" { + options = append(options, client.WithTitle(title)) + } + if priority != "" { + options = append(options, client.WithPriority(priority)) + } + if tags != "" { + options = append(options, client.WithTags(tags)) + } + if delay != "" { + options = append(options, client.WithDelay(delay)) + } + if noCache { + options = append(options, client.WithNoCache()) + } + if noFirebase { + options = append(options, client.WithNoFirebase()) + } + return client.DefaultClient.Publish(topicURL, message, options...) +} diff --git a/cmd/serve.go b/cmd/serve.go new file mode 100644 index 00000000..2cdc8425 --- /dev/null +++ b/cmd/serve.go @@ -0,0 +1,108 @@ +// Package cmd provides the ntfy CLI application +package cmd + +import ( + "errors" + "github.com/urfave/cli/v2" + "github.com/urfave/cli/v2/altsrc" + "heckel.io/ntfy/config" + "heckel.io/ntfy/server" + "heckel.io/ntfy/util" + "log" + "time" +) + +var flagsServe = []cli.Flag{ + &cli.StringFlag{Name: "config", Aliases: []string{"c"}, EnvVars: []string{"NTFY_CONFIG_FILE"}, Value: "/etc/ntfy/config.yml", DefaultText: "/etc/ntfy/config.yml", Usage: "config file"}, + altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-http", Aliases: []string{"l"}, EnvVars: []string{"NTFY_LISTEN_HTTP"}, Value: config.DefaultListenHTTP, Usage: "ip:port used to as HTTP listen address"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "listen-https", Aliases: []string{"L"}, EnvVars: []string{"NTFY_LISTEN_HTTPS"}, Usage: "ip:port used to as HTTPS listen address"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "key-file", Aliases: []string{"K"}, EnvVars: []string{"NTFY_KEY_FILE"}, Usage: "private key file, if listen-https is set"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "cert-file", Aliases: []string{"E"}, EnvVars: []string{"NTFY_CERT_FILE"}, Usage: "certificate file, if listen-https is set"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "firebase-key-file", Aliases: []string{"F"}, EnvVars: []string{"NTFY_FIREBASE_KEY_FILE"}, Usage: "Firebase credentials file; if set additionally publish to FCM topic"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "cache-file", Aliases: []string{"C"}, EnvVars: []string{"NTFY_CACHE_FILE"}, Usage: "cache file used for message caching"}), + altsrc.NewDurationFlag(&cli.DurationFlag{Name: "cache-duration", Aliases: []string{"b"}, EnvVars: []string{"NTFY_CACHE_DURATION"}, Value: config.DefaultCacheDuration, Usage: "buffer messages for this time to allow `since` requests"}), + altsrc.NewDurationFlag(&cli.DurationFlag{Name: "keepalive-interval", Aliases: []string{"k"}, EnvVars: []string{"NTFY_KEEPALIVE_INTERVAL"}, Value: config.DefaultKeepaliveInterval, Usage: "interval of keepalive messages"}), + altsrc.NewDurationFlag(&cli.DurationFlag{Name: "manager-interval", Aliases: []string{"m"}, EnvVars: []string{"NTFY_MANAGER_INTERVAL"}, Value: config.DefaultManagerInterval, Usage: "interval of for message pruning and stats printing"}), + altsrc.NewIntFlag(&cli.IntFlag{Name: "global-topic-limit", Aliases: []string{"T"}, EnvVars: []string{"NTFY_GLOBAL_TOPIC_LIMIT"}, Value: config.DefaultGlobalTopicLimit, Usage: "total number of topics allowed"}), + altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-subscription-limit", Aliases: []string{"V"}, EnvVars: []string{"NTFY_VISITOR_SUBSCRIPTION_LIMIT"}, Value: config.DefaultVisitorSubscriptionLimit, Usage: "number of subscriptions per visitor"}), + altsrc.NewIntFlag(&cli.IntFlag{Name: "visitor-request-limit-burst", Aliases: []string{"B"}, EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_BURST"}, Value: config.DefaultVisitorRequestLimitBurst, Usage: "initial limit of requests per visitor"}), + altsrc.NewDurationFlag(&cli.DurationFlag{Name: "visitor-request-limit-replenish", Aliases: []string{"R"}, EnvVars: []string{"NTFY_VISITOR_REQUEST_LIMIT_REPLENISH"}, Value: config.DefaultVisitorRequestLimitReplenish, Usage: "interval at which burst limit is replenished (one per x)"}), + altsrc.NewBoolFlag(&cli.BoolFlag{Name: "behind-proxy", Aliases: []string{"P"}, EnvVars: []string{"NTFY_BEHIND_PROXY"}, Value: false, Usage: "if set, use X-Forwarded-For header to determine visitor IP address (for rate limiting)"}), +} + +var cmdServe = &cli.Command{ + Name: "serve", + Usage: "Run the ntfy server", + UsageText: "ntfy serve [OPTIONS..]", + Action: execServe, + Flags: flagsServe, + Before: initConfigFileInputSource("config", flagsServe), + Description: `Run the ntfy server and listen for incoming requests + +The command will load the configuration from /etc/ntfy/config.yml. Config options can +be overridden using the command line options. + +Examples: + ntfy serve # Starts server in the foreground (on port 80) + ntfy serve --listen-http :8080 # Starts server with alternate port`, +} + +func execServe(c *cli.Context) error { + // Read all the options + listenHTTP := c.String("listen-http") + listenHTTPS := c.String("listen-https") + keyFile := c.String("key-file") + certFile := c.String("cert-file") + firebaseKeyFile := c.String("firebase-key-file") + cacheFile := c.String("cache-file") + cacheDuration := c.Duration("cache-duration") + keepaliveInterval := c.Duration("keepalive-interval") + managerInterval := c.Duration("manager-interval") + globalTopicLimit := c.Int("global-topic-limit") + visitorSubscriptionLimit := c.Int("visitor-subscription-limit") + visitorRequestLimitBurst := c.Int("visitor-request-limit-burst") + visitorRequestLimitReplenish := c.Duration("visitor-request-limit-replenish") + behindProxy := c.Bool("behind-proxy") + + // Check values + if firebaseKeyFile != "" && !util.FileExists(firebaseKeyFile) { + return errors.New("if set, FCM key file must exist") + } else if keepaliveInterval < 5*time.Second { + return errors.New("keepalive interval cannot be lower than five seconds") + } else if managerInterval < 5*time.Second { + return errors.New("manager interval cannot be lower than five seconds") + } else if cacheDuration > 0 && cacheDuration < managerInterval { + return errors.New("cache duration cannot be lower than manager interval") + } else if keyFile != "" && !util.FileExists(keyFile) { + return errors.New("if set, key file must exist") + } else if certFile != "" && !util.FileExists(certFile) { + return errors.New("if set, certificate file must exist") + } else if listenHTTPS != "" && (keyFile == "" || certFile == "") { + return errors.New("if listen-https is set, both key-file and cert-file must be set") + } + + // Run server + conf := config.New(listenHTTP) + conf.ListenHTTPS = listenHTTPS + conf.KeyFile = keyFile + conf.CertFile = certFile + conf.FirebaseKeyFile = firebaseKeyFile + conf.CacheFile = cacheFile + conf.CacheDuration = cacheDuration + conf.KeepaliveInterval = keepaliveInterval + conf.ManagerInterval = managerInterval + conf.GlobalTopicLimit = globalTopicLimit + conf.VisitorSubscriptionLimit = visitorSubscriptionLimit + conf.VisitorRequestLimitBurst = visitorRequestLimitBurst + conf.VisitorRequestLimitReplenish = visitorRequestLimitReplenish + conf.BehindProxy = behindProxy + s, err := server.New(conf) + if err != nil { + log.Fatalln(err) + } + if err := s.Run(); err != nil { + log.Fatalln(err) + } + log.Printf("Exiting.") + return nil +} diff --git a/cmd/subscribe.go b/cmd/subscribe.go new file mode 100644 index 00000000..0b03b69d --- /dev/null +++ b/cmd/subscribe.go @@ -0,0 +1,132 @@ +package cmd + +import ( + "errors" + "fmt" + "github.com/urfave/cli/v2" + "heckel.io/ntfy/client" + "heckel.io/ntfy/util" + "log" + "os" + "os/exec" + "strings" +) + +var cmdSubscribe = &cli.Command{ + Name: "subscribe", + Aliases: []string{"sub"}, + Usage: "Subscribe to one or more topics on a ntfy server", + UsageText: "ntfy subscribe [OPTIONS..] TOPIC", + Action: execSubscribe, + Flags: []cli.Flag{ + &cli.StringFlag{Name: "exec", Aliases: []string{"e"}, Usage: "execute command for each message event"}, + &cli.StringFlag{Name: "since", Aliases: []string{"s"}, Usage: "return events since (Unix timestamp, or all)"}, + }, + Description: `(THIS COMMAND IS INCUBATING. IT MAY CHANGE WITHOUT NOTICE.) + +Subscribe to one or more topics on a ntfy server, and either print +or execute commands for every arriving message. + +By default, the subscribe command just prints the JSON representation of a message. +When --exec is passed, each incoming message will execute a command. The message fields +are passed to the command as environment variables: + + Variable Aliases Description + --------------- --------------- ----------------------------------- + $NTFY_MESSAGE $message, $m Message body + $NTFY_TITLE $title, $t Message title + $NTFY_PRIORITY $priority, $p Message priority (1=min, 5=max) + $NTFY_TAGS $tags, $ta Message tags (comma separated list) + $NTFY_ID $id Unique message ID + $NTFY_TIME $time Unix timestamp of the message delivery + $NTFY_TOPIC $topic Topic name + $NTFY_EVENT $event, $ev Event identifier (always "message") + +Examples: + ntfy subscribe mytopic # Prints JSON for incoming messages to stdout + ntfy sub home.lan/backups alerts # Subscribe to two different topics + ntfy sub --exec='notify-send "$m"' mytopic # Execute command for incoming messages' +`, +} + +func execSubscribe(c *cli.Context) error { + if c.NArg() < 1 { + return errors.New("topic missing") + } + log.Printf("\x1b[1;33mThis command is incubating. The interface may change without notice.\x1b[0m") + cl := client.DefaultClient + command := c.String("exec") + for _, topic := range c.Args().Slice() { + cl.Subscribe(expandTopicURL(topic)) + } + for m := range cl.Messages { + _ = dispatchMessage(c, command, m) + } + return nil +} + +func dispatchMessage(c *cli.Context, command string, m *client.Message) error { + if command != "" { + return execCommand(c, command, m) + } + fmt.Println(m.Raw) + return nil +} + +func execCommand(c *cli.Context, command string, m *client.Message) error { + if m.Event == client.OpenEvent { + log.Printf("[%s] Connection opened, subscribed to topic", collapseTopicURL(m.TopicURL)) + } else if m.Event == client.MessageEvent { + go func() { + if err := runCommandInternal(c, command, m); err != nil { + log.Printf("[%s] Command failed: %s", collapseTopicURL(m.TopicURL), err.Error()) + } + }() + } + return nil +} + +func runCommandInternal(c *cli.Context, command string, m *client.Message) error { + scriptFile, err := createTmpScript(command) + if err != nil { + return err + } + defer os.Remove(scriptFile) + log.Printf("[%s] Executing: %s (for message: %s)", collapseTopicURL(m.TopicURL), command, m.Raw) + cmd := exec.Command("sh", "-c", scriptFile) + cmd.Stdin = c.App.Reader + cmd.Stdout = c.App.Writer + cmd.Stderr = c.App.ErrWriter + cmd.Env = envVars(m) + return cmd.Run() +} + +func createTmpScript(command string) (string, error) { + scriptFile := fmt.Sprintf("%s/ntfy-subscribe-%s.sh.tmp", os.TempDir(), util.RandomString(10)) + script := fmt.Sprintf("#!/bin/sh\n%s", command) + if err := os.WriteFile(scriptFile, []byte(script), 0700); err != nil { + return "", err + } + return scriptFile, nil +} + +func envVars(m *client.Message) []string { + env := os.Environ() + env = append(env, envVar(m.ID, "NTFY_ID", "id")...) + env = append(env, envVar(m.Event, "NTFY_EVENT", "event", "ev")...) + env = append(env, envVar(m.Topic, "NTFY_TOPIC", "topic")...) + env = append(env, envVar(fmt.Sprintf("%d", m.Time), "NTFY_TIME", "time")...) + env = append(env, envVar(m.Message, "NTFY_MESSAGE", "message", "m")...) + env = append(env, envVar(m.Title, "NTFY_TITLE", "title", "t")...) + env = append(env, envVar(fmt.Sprintf("%d", m.Priority), "NTFY_PRIORITY", "priority", "prio", "p")...) + env = append(env, envVar(strings.Join(m.Tags, ","), "NTFY_TAGS", "tags", "ta")...) + return env +} + +func envVar(value string, vars ...string) []string { + env := make([]string, 0) + for _, v := range vars { + env = append(env, fmt.Sprintf("%s=%s", v, value)) + } + return env +} diff --git a/config/ntfy.service b/config/ntfy.service index 77899517..6645b21f 100644 --- a/config/ntfy.service +++ b/config/ntfy.service @@ -5,7 +5,7 @@ After=network.target [Service] User=ntfy Group=ntfy -ExecStart=/usr/bin/ntfy +ExecStart=/usr/bin/ntfy serve Restart=on-failure AmbientCapabilities=CAP_NET_BIND_SERVICE LimitNOFILE=10000 diff --git a/main.go b/main.go index cecae09d..f6cdbdce 100644 --- a/main.go +++ b/main.go @@ -19,7 +19,7 @@ func main() { Try 'ntfy COMMAND --help' for more information. ntfy %s (%s), runtime %s, built at %s -Copyright (C) 2021 Philipp C. Heckel, distributed under the Apache License 2.0 +Copyright (C) 2021 Philipp C. Heckel, licensed under Apache License 2.0 & GPLv2 `, version, commit[:7], runtime.Version(), date) app := cmd.New() diff --git a/server/server.go b/server/server.go index 1cb6edad..30342098 100644 --- a/server/server.go +++ b/server/server.go @@ -328,22 +328,9 @@ func (s *Server) parseParams(r *http.Request, m *message) (cache bool, firebase if messageStr != "" { m.Message = messageStr } - priorityStr := readParam(r, "x-priority", "priority", "prio", "p") - if priorityStr != "" { - switch strings.ToLower(priorityStr) { - case "1", "min": - m.Priority = 1 - case "2", "low": - m.Priority = 2 - case "3", "default": - m.Priority = 3 - case "4", "high": - m.Priority = 4 - case "5", "max", "urgent": - m.Priority = 5 - default: - return false, false, errHTTPBadRequest - } + m.Priority, err = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p")) + if err != nil { + return false, false, errHTTPBadRequest } tagsStr := readParam(r, "x-tags", "tag", "tags", "ta") if tagsStr != "" { diff --git a/util/util.go b/util/util.go index 331f8c5e..665166df 100644 --- a/util/util.go +++ b/util/util.go @@ -1,9 +1,11 @@ package util import ( + "errors" "fmt" "math/rand" "os" + "strings" "sync" "time" ) @@ -15,6 +17,8 @@ const ( var ( random = rand.New(rand.NewSource(time.Now().UnixNano())) randomMutex = sync.Mutex{} + + errInvalidPriority = errors.New("unknown priority") ) // FileExists checks if a file exists, and returns true if it does @@ -75,3 +79,22 @@ func DurationToHuman(d time.Duration) (str string) { } return } + +func ParsePriority(priority string) (int, error) { + switch strings.ToLower(priority) { + case "": + return 0, nil + case "1", "min": + return 1, nil + case "2", "low": + return 2, nil + case "3", "default": + return 3, nil + case "4", "high": + return 4, nil + case "5", "max", "urgent": + return 5, nil + default: + return 0, errInvalidPriority + } +}