1
0
Fork 0
mirror of https://github.com/binwiederhier/ntfy.git synced 2025-06-27 06:40:36 +02:00
This commit is contained in:
Philipp Heckel 2021-10-23 13:21:33 -04:00
parent 5c2b6d18ec
commit 630ecd351f
2 changed files with 108 additions and 60 deletions
server

View file

@ -5,6 +5,7 @@ import (
_ "embed" // required for go:embed
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"io"
"log"
@ -31,8 +32,9 @@ const (
var (
topicRegex = regexp.MustCompile(`^/[^/]+$`)
wsRegex = regexp.MustCompile(`^/[^/]+/ws$`)
jsonRegex = regexp.MustCompile(`^/[^/]+/json$`)
sseRegex = regexp.MustCompile(`^/[^/]+/sse$`)
wsRegex = regexp.MustCompile(`^/[^/]+/ws$`)
wsUpgrader = websocket.Upgrader{
ReadBufferSize: messageLimit,
WriteBufferSize: messageLimit,
@ -82,7 +84,9 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error {
} else if r.Method == http.MethodGet && wsRegex.MatchString(r.URL.Path) {
return s.handleSubscribeWS(w, r)
} else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) {
return s.handleSubscribeHTTP(w, r)
return s.handleSubscribeJSON(w, r)
} else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) {
return s.handleSubscribeSSE(w, r)
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicRegex.MatchString(r.URL.Path) {
return s.handlePublishHTTP(w, r)
}
@ -112,7 +116,7 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error
return t.Publish(msg)
}
func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request) error {
func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) error {
t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/json")) // Hack
subscriberID := t.Subscribe(func (msg *message) error {
if err := json.NewEncoder(w).Encode(&msg); err != nil {
@ -131,6 +135,32 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request) err
return nil
}
func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) error {
t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/sse")) // Hack
subscriberID := t.Subscribe(func (msg *message) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
return err
}
m := fmt.Sprintf("data: %s\n\n", buf.String())
if _, err := io.WriteString(w, m); err != nil {
return err
}
if fl, ok := w.(http.Flusher); ok {
fl.Flush()
}
return nil
})
defer t.Unsubscribe(subscriberID)
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
select {
case <-t.ctx.Done():
case <-r.Context().Done():
}
return nil
}
func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request) error {
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {