diff --git a/README.md b/README.md index 93232bc8..af79562e 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,44 @@ +# ntfy +ntfy is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications +via scripts. I run a free version of it on *[ntfy.sh](https://ntfy.sh)*. No signups or cost. -echo "mychan:long process is done" | nc -N ntfy.sh 9999 -curl -d "long process is done" ntfy.sh/mychan - publish on channel +## Usage -curl ntfy.sh/mychan - subscribe to channel +### Subscribe to a topic -ntfy.sh/mychan/ws +You can subscribe to a topic either in a web UI, or in your own app by subscribing to an SSE/EventSource +or JSON feed. + +Here's how to do it via curl see the SSE stream in `curl`: + +``` +curl -s localhost:9997/mytopic/sse +``` + +You can easily script it to execute any command when a message arrives: +``` +while read json; do + msg="$(echo "$json" | jq -r .message)" + notify-send "$msg" +done < <(stdbuf -i0 -o0 curl -s localhost:9997/mytopic/json) +``` + +### Publish messages + +Publishing messages can be done via PUT or POST using. Here's an example using `curl`: +``` +curl -d "long process is done" ntfy.sh/mytopic +``` + +## TODO +- /raw endpoint +- netcat usage +- rate limiting / abuse protection +- release/packaging + +## Contributing +I welcome any and all contributions. Just create a PR or an issue. + +## License +Made with ❤️ by [Philipp C. Heckel](https://heckel.io), distributed under the [Apache License 2.0](LICENSE). diff --git a/server/index.html b/server/index.html index eaa4d5f0..50d99944 100644 --- a/server/index.html +++ b/server/index.html @@ -10,22 +10,22 @@

ntfy.sh is a super simple pub-sub notification service. It allows you to send desktop and (soon) phone notifications - via scripts, without signup or cost. It's entirely free and open source. + via scripts, without signup or cost. It's entirely free and open source. You can find the source code on GitHub.

- Usage: You can subscribe to a topic either in this web UI, or in your own app by subscribing to an SSE/EventSource + You can subscribe to a topic either in this web UI, or in your own app by subscribing to an SSE/EventSource or JSON feed. Once subscribed, you can publish messages via PUT or POST.

-
+

-Topics: +

Topics:

@@ -38,50 +38,68 @@ Topics: const subscribeForm = document.getElementById("subscribeForm"); const errorField = document.getElementById("error"); - const subscribe = function (topic) { + const subscribe = (topic) => { if (Notification.permission !== "granted") { - Notification.requestPermission().then(function (permission) { + Notification.requestPermission().then((permission) => { if (permission === "granted") { - subscribeInternal(topic); + subscribeInternal(topic, 0); } }); } else { - subscribeInternal(topic); + subscribeInternal(topic, 0); } }; - const subscribeInternal = function (topic) { - let eventSource = new EventSource(`${topic}/sse`); - eventSource.onerror = function (e) { - console.log(e); - errorField.innerHTML = "Error " + e; - }; - eventSource.onmessage = function (e) { - const event = JSON.parse(e.data); - new Notification(event.message); - }; - topics[topic] = eventSource; + const subscribeInternal = (topic, delaySec) => { + setTimeout(() => { + // Render list entry + let topicEntry = document.getElementById(`topic-${topic}`); + if (!topicEntry) { + topicEntry = document.createElement('li'); + topicEntry.id = `topic-${topic}`; + topicEntry.innerHTML = `${topic} `; + topicsList.appendChild(topicEntry); + } - let topicEntry = document.createElement('li'); - topicEntry.id = `topic-${topic}`; - topicEntry.innerHTML = `${topic} `; - topicsList.appendChild(topicEntry); + // Open event source + let eventSource = new EventSource(`${topic}/sse`); + eventSource.onopen = () => { + topicEntry.innerHTML = `${topic} `; + delaySec = 0; // Reset on successful connection + }; + eventSource.onerror = (e) => { + console.log("onerror") + const newDelaySec = (delaySec + 5 <= 30) ? delaySec + 5 : 30; + topicEntry.innerHTML = `${topic} (Reconnecting in ${newDelaySec}s ...) `; + eventSource.close() + subscribeInternal(topic, newDelaySec); + }; + eventSource.onmessage = (e) => { + const event = JSON.parse(e.data); + new Notification(event.message); + }; + topics[topic] = eventSource; + localStorage.setItem('topics', JSON.stringify(Object.keys(topics))); + }, delaySec * 1000); }; - const unsubscribe = function(topic) { + const unsubscribe = (topic) => { topics[topic].close(); + delete topics[topic]; + localStorage.setItem('topics', JSON.stringify(Object.keys(topics))); document.getElementById(`topic-${topic}`).remove(); }; subscribeForm.onsubmit = function () { - alert("hi") if (!topicField.value) { return false; } subscribe(topicField.value); + topicField.value = ""; return false; }; + // Disable Web UI if notifications of EventSource are not available if (!window["Notification"] || !window["EventSource"]) { errorField.innerHTML = "Your browser is not compatible to use the web-based desktop notifications."; topicField.disabled = true; @@ -91,6 +109,17 @@ Topics: topicField.disabled = true; subscribeButton.disabled = true; } + + // Reset UI + topicField.value = ""; + + // Restore topics + const storedTopics = localStorage.getItem('topics'); + if (storedTopics) { + JSON.parse(storedTopics).forEach((topic) => { + subscribeInternal(topic, 0); + }); + } diff --git a/server/server.go b/server/server.go index a78019c5..f620114e 100644 --- a/server/server.go +++ b/server/server.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/gorilla/websocket" "io" "log" "net/http" @@ -18,11 +17,11 @@ import ( type Server struct { topics map[string]*topic - mu sync.Mutex + mu sync.Mutex } type message struct { - Time int64 `json:"time"` + Time int64 `json:"time"` Message string `json:"message"` } @@ -31,14 +30,9 @@ const ( ) var ( - topicRegex = regexp.MustCompile(`^/[^/]+$`) - jsonRegex = regexp.MustCompile(`^/[^/]+/json$`) - sseRegex = regexp.MustCompile(`^/[^/]+/sse$`) - wsRegex = regexp.MustCompile(`^/[^/]+/ws$`) - wsUpgrader = websocket.Upgrader{ - ReadBufferSize: messageLimit, - WriteBufferSize: messageLimit, - } + topicRegex = regexp.MustCompile(`^/[^/]+$`) + jsonRegex = regexp.MustCompile(`^/[^/]+/json$`) + sseRegex = regexp.MustCompile(`^/[^/]+/sse$`) //go:embed "index.html" indexSource string @@ -51,26 +45,32 @@ func New() *Server { } func (s *Server) Run() error { - go func() { - for { - time.Sleep(5 * time.Second) - s.mu.Lock() - log.Printf("topics: %d", len(s.topics)) - for _, t := range s.topics { - t.mu.Lock() - log.Printf("- %s: %d subscriber(s), %d message(s) sent, last active = %s", - t.id, len(t.subscribers), t.messages, t.last.String()) - t.mu.Unlock() - } - // TODO kill dead topics - s.mu.Unlock() - } - }() + go s.runMonitor() + return s.listenAndServe() +} + +func (s *Server) listenAndServe() error { log.Printf("Listening on :9997") http.HandleFunc("/", s.handle) return http.ListenAndServe(":9997", nil) } +func (s *Server) runMonitor() { + for { + time.Sleep(5 * time.Second) + s.mu.Lock() + log.Printf("topics: %d", len(s.topics)) + for _, t := range s.topics { + t.mu.Lock() + log.Printf("- %s: %d subscriber(s), %d message(s) sent, last active = %s", + t.id, len(t.subscribers), t.messages, t.last.String()) + t.mu.Unlock() + } + // TODO kill dead topics + s.mu.Unlock() + } +} + func (s *Server) handle(w http.ResponseWriter, r *http.Request) { if err := s.handleInternal(w, r); err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -81,8 +81,6 @@ func (s *Server) handle(w http.ResponseWriter, r *http.Request) { func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request) error { if r.Method == http.MethodGet && r.URL.Path == "/" { return s.handleHome(w, r) - } else if r.Method == http.MethodGet && wsRegex.MatchString(r.URL.Path) { - return s.handleSubscribeWS(w, r) } else if r.Method == http.MethodGet && jsonRegex.MatchString(r.URL.Path) { return s.handleSubscribeJSON(w, r) } else if r.Method == http.MethodGet && sseRegex.MatchString(r.URL.Path) { @@ -118,7 +116,7 @@ func (s *Server) handlePublishHTTP(w http.ResponseWriter, r *http.Request) error func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) error { t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/json")) // Hack - subscriberID := t.Subscribe(func (msg *message) error { + subscriberID := t.Subscribe(func(msg *message) error { if err := json.NewEncoder(w).Encode(&msg); err != nil { return err } @@ -137,12 +135,12 @@ func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request) err func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) error { t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/sse")) // Hack - subscriberID := t.Subscribe(func (msg *message) error { + subscriberID := t.Subscribe(func(msg *message) error { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(&msg); err != nil { return err } - m := fmt.Sprintf("data: %s\n\n", buf.String()) + m := fmt.Sprintf("data: %s\n", buf.String()) if _, err := io.WriteString(w, m); err != nil { return err } @@ -154,6 +152,12 @@ func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) erro defer t.Unsubscribe(subscriberID) w.Header().Set("Content-Type", "text/event-stream") w.WriteHeader(http.StatusOK) + if _, err := io.WriteString(w, "event: open\n\n"); err != nil { + return err + } + if fl, ok := w.(http.Flusher); ok { + fl.Flush() + } select { case <-t.ctx.Done(): case <-r.Context().Done(): @@ -161,40 +165,6 @@ func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request) erro return nil } -func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request) error { - conn, err := wsUpgrader.Upgrade(w, r, nil) - if err != nil { - return err - } - t := s.createTopic(strings.TrimSuffix(r.URL.Path[1:], "/ws")) // Hack - t.Subscribe(func (msg *message) error { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(&msg); err != nil { - return err - } - defer conn.Close() - /*conn.SetWriteDeadline(time.Now().Add(writeWait)) - if !ok { - // The hub closed the channel. - c.conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - }*/ - - w, err := conn.NextWriter(websocket.TextMessage) - if err != nil { - return err - } - if _, err := w.Write([]byte(msg.Message)); err != nil { - return err - } - if err := w.Close(); err != nil { - return err - } - return nil - }) - return nil -} - func (s *Server) createTopic(id string) *topic { s.mu.Lock() defer s.mu.Unlock() diff --git a/server/topic.go b/server/topic.go index 65dd7415..283b6da4 100644 --- a/server/topic.go +++ b/server/topic.go @@ -12,11 +12,11 @@ import ( type topic struct { id string subscribers map[int]subscriber - messages int + messages int last time.Time - ctx context.Context - cancel context.CancelFunc - mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + mu sync.Mutex } type subscriber func(msg *message) error