diff --git a/docs/config.md b/docs/config.md index 2f1c390d..70a48fd0 100644 --- a/docs/config.md +++ b/docs/config.md @@ -549,7 +549,7 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`). | `smtp-server-listen` | `NTFY_SMTP_SERVER_LISTEN` | `[ip]:port` | - | Defines the IP address and port the SMTP server will listen on, e.g. `:25` or `1.2.3.4:25` | | `smtp-server-domain` | `NTFY_SMTP_SERVER_DOMAIN` | *domain name* | - | SMTP server e-mail domain, e.g. `ntfy.sh` | | `smtp-server-addr-prefix` | `NTFY_SMTP_SERVER_ADDR_PREFIX` | `[ip]:port` | - | Optional prefix for the e-mail addresses to prevent spam, e.g. `ntfy-` | -| `keepalive-interval` | `NTFY_KEEPALIVE_INTERVAL` | *duration* | 55s | Interval in which keepalive messages are sent to the client. This is to prevent intermediaries closing the connection for inactivity. Note that the Android app has a hardcoded timeout at 77s, so it should be less than that. | +| `keepalive-interval` | `NTFY_KEEPALIVE_INTERVAL` | *duration* | 45s | Interval in which keepalive messages are sent to the client. This is to prevent intermediaries closing the connection for inactivity. Note that the Android app has a hardcoded timeout at 77s, so it should be less than that. | | `manager-interval` | `$NTFY_MANAGER_INTERVAL` | *duration* | 1m | Interval in which the manager prunes old messages, deletes topics and prints the stats. | | `global-topic-limit` | `NTFY_GLOBAL_TOPIC_LIMIT` | *number* | 15,000 | Rate limiting: Total number of topics before the server rejects new topics. | | `visitor-subscription-limit` | `NTFY_VISITOR_SUBSCRIPTION_LIMIT` | *number* | 30 | Rate limiting: Number of subscriptions per visitor (IP address) | @@ -597,7 +597,7 @@ OPTIONS: --attachment-total-size-limit value, -A value limit of the on-disk attachment cache (default: 5G) [$NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT] --attachment-file-size-limit value, -Y value per-file attachment size limit (e.g. 300k, 2M, 100M) (default: 15M) [$NTFY_ATTACHMENT_FILE_SIZE_LIMIT] --attachment-expiry-duration value, -X value duration after which uploaded attachments will be deleted (e.g. 3h, 20h) (default: 3h) [$NTFY_ATTACHMENT_EXPIRY_DURATION] - --keepalive-interval value, -k value interval of keepalive messages (default: 55s) [$NTFY_KEEPALIVE_INTERVAL] + --keepalive-interval value, -k value interval of keepalive messages (default: 45s) [$NTFY_KEEPALIVE_INTERVAL] --manager-interval value, -m value interval of for message pruning and stats printing (default: 1m0s) [$NTFY_MANAGER_INTERVAL] --smtp-sender-addr value SMTP server address (host:port) for outgoing emails [$NTFY_SMTP_SENDER_ADDR] --smtp-sender-user value SMTP user (if e-mail sending is enabled) [$NTFY_SMTP_SENDER_USER] diff --git a/go.mod b/go.mod index 816766c2..05f44d6e 100644 --- a/go.mod +++ b/go.mod @@ -35,11 +35,13 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211210111614-af8b64212486 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect diff --git a/go.sum b/go.sum index ef752ff8..a9127297 100644 --- a/go.sum +++ b/go.sum @@ -189,6 +189,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= github.com/googleapis/gax-go/v2 v2.1.1 h1:dp3bWCh+PPO1zjRRiCSczJav13sBvG4UhNyVTa1KqdU= github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -356,6 +358,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/config.go b/server/config.go index 4f0c1adb..1e2b31c0 100644 --- a/server/config.go +++ b/server/config.go @@ -8,7 +8,7 @@ import ( const ( DefaultListenHTTP = ":80" DefaultCacheDuration = 12 * time.Hour - DefaultKeepaliveInterval = 55 * time.Second // Not too frequently to save battery (Android read timeout is 77s!) + DefaultKeepaliveInterval = 45 * time.Second // Not too frequently to save battery (Android read timeout used to be 77s!) DefaultManagerInterval = time.Minute DefaultAtSenderInterval = 10 * time.Second DefaultMinDelay = 10 * time.Second diff --git a/server/server.go b/server/server.go index 14581daf..1168c628 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,8 @@ import ( "firebase.google.com/go/messaging" "fmt" "github.com/emersion/go-smtp" + "github.com/gorilla/websocket" + "golang.org/x/sync/errgroup" "google.golang.org/api/option" "heckel.io/ntfy/util" "html/template" @@ -99,6 +101,7 @@ var ( jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`) ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`) rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`) + wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`) publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/(publish|send|trigger)$`) staticRegex = regexp.MustCompile(`^/static/.+`) @@ -156,6 +159,10 @@ const ( emptyMessageBody = "triggered" // Used if message body is empty defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment fcmMessageLimit = 4000 // see maybeTruncateFCMMessage for details + wsWriteWait = 2 * time.Second + wsBufferSize = 1024 + wsReadLimit = 64 // We only ever receive PINGs + wsPongWait = 15 * time.Second ) // New instantiates a new Server. It creates the cache and adds a Firebase @@ -404,6 +411,8 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { return s.withRateLimit(w, r, s.handleSubscribeSSE) } else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) { return s.withRateLimit(w, r, s.handleSubscribeRaw) + } else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) { + return s.withRateLimit(w, r, s.handleSubscribeWS) } return errHTTPNotFound } @@ -805,6 +814,106 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi } } +func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error { + if err := v.SubscriptionAllowed(); err != nil { + return errHTTPTooManyRequestsLimitSubscriptions + } + defer v.RemoveSubscription() + topicsStr := strings.TrimSuffix(r.URL.Path[1:], "/ws") // Hack + topicIDs := util.SplitNoEmpty(topicsStr, ",") + topics, err := s.topicsFromIDs(topicIDs...) + if err != nil { + return err + } + poll := readParam(r, "x-poll", "poll", "po") == "1" + scheduled := readParam(r, "x-scheduled", "scheduled", "sched") == "1" + since, err := parseSince(r, poll) + if err != nil { + return err + } + messageFilter, titleFilter, priorityFilter, tagsFilter, err := parseQueryFilters(r) + if err != nil { + return err + } + upgrader := &websocket.Upgrader{ + ReadBufferSize: wsBufferSize, + WriteBufferSize: wsBufferSize, + CheckOrigin: func(r *http.Request) bool { + return true // We're open for business! + }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return err + } + defer conn.Close() + g, ctx := errgroup.WithContext(context.Background()) + g.Go(func() error { + pongWait := s.config.KeepaliveInterval + wsPongWait + conn.SetReadLimit(wsReadLimit) + if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil { + return err + } + conn.SetPongHandler(func(appData string) error { + return conn.SetReadDeadline(time.Now().Add(pongWait)) + }) + for { + _, _, err := conn.NextReader() + if err != nil { + return err + } + } + }) + g.Go(func() error { + ping := func() error { + if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { + return err + } + return conn.WriteMessage(websocket.PingMessage, nil) + } + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(s.config.KeepaliveInterval): + v.Keepalive() + if err := ping(); err != nil { + return err + } + } + } + }) + sub := func(msg *message) error { + if !passesQueryFilter(msg, messageFilter, titleFilter, priorityFilter, tagsFilter) { + return nil + } + if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { + return err + } + return conn.WriteJSON(msg) + } + w.Header().Set("Access-Control-Allow-Origin", "*") // CORS, allow cross-origin requests + if poll { + return s.sendOldMessages(topics, since, scheduled, sub) + } + subscriberIDs := make([]int, 0) + for _, t := range topics { + subscriberIDs = append(subscriberIDs, t.Subscribe(sub)) + } + defer func() { + for i, subscriberID := range subscriberIDs { + topics[i].Unsubscribe(subscriberID) // Order! + } + }() + if err := sub(newOpenMessage(topicsStr)); err != nil { // Send out open message + return err + } + if err := s.sendOldMessages(topics, since, scheduled, sub); err != nil { + return err + } + return g.Wait() +} + func parseQueryFilters(r *http.Request) (messageFilter string, titleFilter string, priorityFilter []int, tagsFilter []string, err error) { messageFilter = readParam(r, "x-message", "message", "m") titleFilter = readParam(r, "x-title", "title", "t") diff --git a/server/server.yml b/server/server.yml index c167d039..c65abd7d 100644 --- a/server/server.yml +++ b/server/server.yml @@ -98,7 +98,7 @@ # # Note that the Android app has a hardcoded timeout at 77s, so it should be less than that. # -# keepalive-interval: "30s" +# keepalive-interval: "45s" # Interval in which the manager prunes old messages, deletes topics # and prints the stats.