ntfy/server/server.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1936 lines
71 KiB
Go
Raw Normal View History

2021-10-23 03:26:01 +02:00
package server
import (
"bytes"
2021-10-29 05:50:38 +02:00
"context"
2022-05-27 13:55:57 +02:00
"crypto/sha256"
2021-12-24 00:03:04 +01:00
"embed"
2022-01-17 19:28:07 +01:00
"encoding/base64"
2021-10-23 03:26:01 +02:00
"encoding/json"
2022-12-03 21:20:59 +01:00
"errors"
2021-10-23 19:21:33 +02:00
"fmt"
2023-02-23 03:00:56 +01:00
"github.com/emersion/go-smtp"
"github.com/gorilla/websocket"
2023-03-07 04:16:10 +01:00
"github.com/prometheus/client_golang/prometheus/promhttp"
2023-02-23 03:00:56 +01:00
"golang.org/x/sync/errgroup"
"heckel.io/ntfy/log"
"heckel.io/ntfy/user"
"heckel.io/ntfy/util"
2021-10-23 03:26:01 +02:00
"io"
2021-10-24 04:49:50 +02:00
"net"
2021-10-23 03:26:01 +02:00
"net/http"
2023-03-28 20:41:16 +02:00
"net/http/pprof"
"net/netip"
2022-01-14 18:13:14 +01:00
"net/url"
2022-01-10 22:28:13 +01:00
"os"
2022-01-14 18:13:14 +01:00
"path"
2022-01-02 23:56:12 +01:00
"path/filepath"
2021-10-23 03:26:01 +02:00
"regexp"
"sort"
2021-10-29 19:58:14 +02:00
"strconv"
2021-10-23 03:26:01 +02:00
"strings"
"sync"
"time"
2022-01-02 23:56:12 +01:00
"unicode/utf8"
2021-10-23 03:26:01 +02:00
)
2021-12-07 17:45:15 +01:00
// Server is the main server, providing the UI and API for ntfy
2021-10-23 03:26:01 +02:00
type Server struct {
2022-06-02 05:24:44 +02:00
config *Config
httpServer *http.Server
httpsServer *http.Server
2023-03-07 04:16:10 +01:00
httpMetricsServer *http.Server
2023-03-28 20:41:16 +02:00
httpProfileServer *http.Server
2022-06-02 05:24:44 +02:00
unixListener net.Listener
smtpServer *smtp.Server
smtpServerBackend *smtpBackend
smtpSender mailer
topics map[string]*topic
2022-12-02 21:37:48 +01:00
visitors map[string]*visitor // ip:<ip> or user:<user>
2022-06-02 05:24:44 +02:00
firebaseClient *firebaseClient
2023-04-21 04:04:11 +02:00
messages int64 // Total number of messages (persisted if messageCache enabled)
messagesHistory []int64 // Last n values of the messages counter, used to determine rate
2023-02-22 04:44:30 +01:00
userManager *user.Manager // Might be nil!
messageCache *messageCache // Database that stores the messages
fileCache *fileCache // File system based cache that stores attachments
stripe stripeAPI // Stripe API, can be replaced with a mock
priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!)
metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set
2022-06-02 05:24:44 +02:00
closeChan chan bool
2023-04-21 04:04:11 +02:00
mu sync.RWMutex
2021-10-23 03:26:01 +02:00
}
2022-01-22 04:22:27 +01:00
// handleFunc extends the normal http.HandlerFunc to be able to easily return errors
type handleFunc func(http.ResponseWriter, *http.Request, *visitor) error
2021-10-23 03:26:01 +02:00
var (
2022-01-31 17:44:58 +01:00
// If changed, don't forget to update Android App and auth_sqlite.go
topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // No /!
topicPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}$`) // Regex must match JS & Android app!
externalTopicPathRegex = regexp.MustCompile(`^/[^/]+\.[^/]+/[-_A-Za-z0-9]{1,64}$`) // Extended topic path, for web-app, e.g. /example.com/mytopic
jsonPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/json$`)
ssePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/sse$`)
rawPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/raw$`)
wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
2021-10-29 19:58:14 +02:00
2023-01-17 16:09:37 +01:00
webConfigPath = "/config.js"
accountPath = "/account"
matrixPushPath = "/_matrix/push/v1/notify"
metricsPath = "/metrics"
2023-01-17 16:09:37 +01:00
apiHealthPath = "/v1/health"
2023-04-21 04:04:11 +02:00
apiStatsPath = "/v1/stats"
apiTiersPath = "/v1/tiers"
apiUsersPath = "/v1/users"
apiUsersAccessPath = "/v1/users/access"
2023-01-17 16:09:37 +01:00
apiAccountPath = "/v1/account"
apiAccountTokenPath = "/v1/account/token"
apiAccountPasswordPath = "/v1/account/password"
apiAccountSettingsPath = "/v1/account/settings"
apiAccountSubscriptionPath = "/v1/account/subscription"
apiAccountReservationPath = "/v1/account/reservation"
2023-05-11 19:50:10 +02:00
apiAccountPhonePath = "/v1/account/phone"
2023-05-16 20:15:58 +02:00
apiAccountPhoneVerifyPath = "/v1/account/phone/verify"
2023-01-17 16:09:37 +01:00
apiAccountBillingPortalPath = "/v1/account/billing/portal"
apiAccountBillingWebhookPath = "/v1/account/billing/webhook"
apiAccountBillingSubscriptionPath = "/v1/account/billing/subscription"
apiAccountBillingSubscriptionCheckoutSuccessTemplate = "/v1/account/billing/subscription/success/{CHECKOUT_SESSION_ID}"
apiAccountBillingSubscriptionCheckoutSuccessRegex = regexp.MustCompile(`/v1/account/billing/subscription/success/(.+)$`)
apiAccountReservationSingleRegex = regexp.MustCompile(`/v1/account/reservation/([-_A-Za-z0-9]{1,64})$`)
staticRegex = regexp.MustCompile(`^/static/.+`)
docsRegex = regexp.MustCompile(`^/docs(|/.*)$`)
fileRegex = regexp.MustCompile(`^/file/([-_A-Za-z0-9]{1,64})(?:\.[A-Za-z0-9]{1,16})?$`)
urlRegex = regexp.MustCompile(`^https?://`)
2023-05-06 20:23:48 +02:00
phoneNumberRegex = regexp.MustCompile(`^\+\d{1,100}$`)
2021-10-23 03:26:01 +02:00
2022-03-06 02:24:10 +01:00
//go:embed site
2023-05-01 17:58:49 +02:00
webFs embed.FS
webFsCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: webFs}
webSiteDir = "/site"
webAppIndex = "/app.html" // React app
2021-10-24 20:22:53 +02:00
2021-12-02 23:27:31 +01:00
//go:embed docs
2021-12-07 16:38:58 +01:00
docsStaticFs embed.FS
2021-12-02 23:27:31 +01:00
docsStaticCached = &util.CachingEmbedFS{ModTime: time.Now(), FS: docsStaticFs}
2021-10-23 03:26:01 +02:00
)
2021-12-14 04:30:28 +01:00
const (
2023-02-23 04:26:43 +01:00
firebaseControlTopic = "~control" // See Android if changed
firebasePollTopic = "~poll" // See iOS if changed
emptyMessageBody = "triggered" // Used if message body is empty
newMessageBody = "New message" // Used in poll requests as generic message
defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
2023-04-21 04:04:11 +02:00
jsonBodyBytesLimit = 16384 // Max number of bytes for a JSON request body
unifiedPushTopicPrefix = "up" // Temporarily, we rate limit all "up*" topics based on the subscriber
unifiedPushTopicLength = 14 // Length of UnifiedPush topics, including the "up" part
messagesHistoryMax = 10 // Number of message count values to keep in memory
2022-01-16 04:33:35 +01:00
)
// WebSocket constants
const (
wsWriteWait = 2 * time.Second
wsBufferSize = 1024
wsReadLimit = 64 // We only ever receive PINGs
wsPongWait = 15 * time.Second
2021-12-14 04:30:28 +01:00
)
2021-12-07 17:45:15 +01:00
// New instantiates a new Server. It creates the cache and adds a Firebase
// subscriber (if configured).
2021-12-19 04:02:36 +01:00
func New(conf *Config) (*Server, error) {
2021-12-24 00:03:04 +01:00
var mailer mailer
2021-12-27 16:39:28 +01:00
if conf.SMTPSenderAddr != "" {
mailer = &smtpSender{config: conf}
2021-12-24 00:03:04 +01:00
}
2023-01-19 05:01:26 +01:00
var stripe stripeAPI
if conf.StripeSecretKey != "" {
stripe = newStripeAPI()
}
2022-02-27 20:47:28 +01:00
messageCache, err := createMessageCache(conf)
2021-11-02 19:08:21 +01:00
if err != nil {
return nil, err
}
2022-02-27 20:47:28 +01:00
topics, err := messageCache.Topics()
2021-11-03 02:09:49 +01:00
if err != nil {
return nil, err
2021-11-02 19:08:21 +01:00
}
2023-04-21 04:04:11 +02:00
messages, err := messageCache.Stats()
if err != nil {
return nil, err
}
var fileCache *fileCache
2022-01-02 23:56:12 +01:00
if conf.AttachmentCacheDir != "" {
2022-12-21 19:19:07 +01:00
fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit)
if err != nil {
2022-01-02 23:56:12 +01:00
return nil, err
}
}
2022-12-28 04:14:14 +01:00
var userManager *user.Manager
2022-01-23 05:01:20 +01:00
if conf.AuthFile != "" {
2023-01-29 02:29:06 +01:00
userManager, err = user.NewManager(conf.AuthFile, conf.AuthStartupQueries, conf.AuthDefault, conf.AuthBcryptCost, conf.AuthStatsQueueWriterInterval)
2022-01-23 05:01:20 +01:00
if err != nil {
return nil, err
}
2022-01-22 20:47:27 +01:00
}
2022-06-01 05:16:44 +02:00
var firebaseClient *firebaseClient
if conf.FirebaseKeyFile != "" {
2022-06-01 05:16:44 +02:00
sender, err := newFirebaseSender(conf.FirebaseKeyFile)
if err != nil {
return nil, err
}
// This awkward logic is required because Go is weird about nil types and interfaces.
// See issue #641, and https://go.dev/play/p/uur1flrv1t3 for an example
var auther user.Auther
if userManager != nil {
auther = userManager
}
firebaseClient = newFirebaseClient(sender, auther)
}
2023-01-19 05:01:26 +01:00
s := &Server{
2023-04-21 04:04:11 +02:00
config: conf,
messageCache: messageCache,
fileCache: fileCache,
firebaseClient: firebaseClient,
smtpSender: mailer,
topics: topics,
userManager: userManager,
messages: messages,
messagesHistory: []int64{messages},
visitors: make(map[string]*visitor),
stripe: stripe,
2023-01-19 05:01:26 +01:00
}
s.priceCache = util.NewLookupCache(s.fetchStripePrices, conf.StripePriceCacheDuration)
return s, nil
2021-10-23 03:26:01 +02:00
}
2022-02-27 20:47:28 +01:00
func createMessageCache(conf *Config) (*messageCache, error) {
2021-12-09 16:23:17 +01:00
if conf.CacheDuration == 0 {
return newNopCache()
2021-12-09 16:23:17 +01:00
} else if conf.CacheFile != "" {
return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
2021-11-02 19:08:21 +01:00
}
return newMemCache()
2021-11-02 19:08:21 +01:00
}
2021-12-07 17:45:15 +01:00
// Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts
// a manager go routine to print stats and prune messages.
2021-10-23 03:26:01 +02:00
func (s *Server) Run() error {
2022-01-10 22:28:13 +01:00
var listenStr string
if s.config.ListenHTTP != "" {
listenStr += fmt.Sprintf(" %s[http]", s.config.ListenHTTP)
}
2021-12-02 14:52:48 +01:00
if s.config.ListenHTTPS != "" {
2022-01-10 22:28:13 +01:00
listenStr += fmt.Sprintf(" %s[https]", s.config.ListenHTTPS)
}
if s.config.ListenUnix != "" {
2022-07-04 01:33:01 +02:00
listenStr += fmt.Sprintf(" %s[unix]", s.config.ListenUnix)
2021-12-02 14:52:48 +01:00
}
2021-12-27 22:06:40 +01:00
if s.config.SMTPServerListen != "" {
2022-01-10 22:28:13 +01:00
listenStr += fmt.Sprintf(" %s[smtp]", s.config.SMTPServerListen)
2021-12-27 22:06:40 +01:00
}
if s.config.MetricsListenHTTP != "" {
listenStr += fmt.Sprintf(" %s[http/metrics]", s.config.MetricsListenHTTP)
}
2023-03-28 20:41:16 +02:00
if s.config.ProfileListenHTTP != "" {
listenStr += fmt.Sprintf(" %s[http/profile]", s.config.ProfileListenHTTP)
}
2023-02-07 22:20:49 +01:00
log.Tag(tagStartup).Info("Listening on%s, ntfy %s, log level is %s", listenStr, s.config.Version, log.CurrentLevel().String())
2023-02-06 05:34:27 +01:00
if log.IsFile() {
2023-02-07 18:02:25 +01:00
fmt.Fprintf(os.Stderr, "Listening on%s, ntfy %s\n", listenStr, s.config.Version)
fmt.Fprintf(os.Stderr, "Logs are written to %s\n", log.File())
2023-02-06 05:34:27 +01:00
}
2021-12-22 23:45:19 +01:00
mux := http.NewServeMux()
mux.HandleFunc("/", s.handle)
2021-12-02 14:52:48 +01:00
errChan := make(chan error)
2021-12-22 14:17:50 +01:00
s.mu.Lock()
s.closeChan = make(chan bool)
2022-01-10 22:28:13 +01:00
if s.config.ListenHTTP != "" {
s.httpServer = &http.Server{Addr: s.config.ListenHTTP, Handler: mux}
go func() {
errChan <- s.httpServer.ListenAndServe()
}()
}
2021-12-02 14:52:48 +01:00
if s.config.ListenHTTPS != "" {
s.httpsServer = &http.Server{Addr: s.config.ListenHTTPS, Handler: mux}
2021-12-02 14:52:48 +01:00
go func() {
2021-12-22 14:17:50 +01:00
errChan <- s.httpsServer.ListenAndServeTLS(s.config.CertFile, s.config.KeyFile)
2021-12-02 14:52:48 +01:00
}()
}
2022-01-10 22:28:13 +01:00
if s.config.ListenUnix != "" {
go func() {
var err error
s.mu.Lock()
os.Remove(s.config.ListenUnix)
s.unixListener, err = net.Listen("unix", s.config.ListenUnix)
if err != nil {
2022-07-04 01:33:01 +02:00
s.mu.Unlock()
2022-01-10 22:28:13 +01:00
errChan <- err
return
}
2022-07-04 01:33:01 +02:00
defer s.unixListener.Close()
if s.config.ListenUnixMode > 0 {
if err := os.Chmod(s.config.ListenUnix, s.config.ListenUnixMode); err != nil {
s.mu.Unlock()
errChan <- err
return
}
2022-07-03 21:27:36 +02:00
}
2022-01-10 22:28:13 +01:00
s.mu.Unlock()
httpServer := &http.Server{Handler: mux}
errChan <- httpServer.Serve(s.unixListener)
}()
}
if s.config.MetricsListenHTTP != "" {
initMetrics()
s.httpMetricsServer = &http.Server{Addr: s.config.MetricsListenHTTP, Handler: promhttp.Handler()}
2023-03-07 04:16:10 +01:00
go func() {
errChan <- s.httpMetricsServer.ListenAndServe()
}()
} else if s.config.EnableMetrics {
initMetrics()
s.metricsHandler = promhttp.Handler()
2023-03-07 04:16:10 +01:00
}
2023-03-28 20:41:16 +02:00
if s.config.ProfileListenHTTP != "" {
profileMux := http.NewServeMux()
profileMux.HandleFunc("/debug/pprof/", pprof.Index)
profileMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
profileMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
profileMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
profileMux.HandleFunc("/debug/pprof/trace", pprof.Trace)
s.httpProfileServer = &http.Server{Addr: s.config.ProfileListenHTTP, Handler: profileMux}
go func() {
errChan <- s.httpProfileServer.ListenAndServe()
}()
}
2021-12-27 16:39:28 +01:00
if s.config.SMTPServerListen != "" {
2021-12-27 15:48:09 +01:00
go func() {
2021-12-27 22:06:40 +01:00
errChan <- s.runSMTPServer()
2021-12-27 15:48:09 +01:00
}()
}
2021-12-22 14:17:50 +01:00
s.mu.Unlock()
2021-12-22 23:20:43 +01:00
go s.runManager()
2023-01-11 04:51:51 +01:00
go s.runStatsResetter()
2022-06-01 02:38:56 +02:00
go s.runDelayedSender()
2022-01-21 20:17:59 +01:00
go s.runFirebaseKeepaliver()
2021-12-27 15:48:09 +01:00
2021-12-02 14:52:48 +01:00
return <-errChan
2021-10-23 03:26:01 +02:00
}
2021-12-22 14:17:50 +01:00
// Stop stops HTTP (+HTTPS) server and all managers
func (s *Server) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
if s.httpServer != nil {
s.httpServer.Close()
}
if s.httpsServer != nil {
s.httpsServer.Close()
}
2022-01-10 22:28:13 +01:00
if s.unixListener != nil {
s.unixListener.Close()
}
2021-12-27 22:06:40 +01:00
if s.smtpServer != nil {
s.smtpServer.Close()
}
2023-01-28 15:03:14 +01:00
s.closeDatabases()
2021-12-22 14:17:50 +01:00
close(s.closeChan)
}
2023-01-28 15:03:14 +01:00
func (s *Server) closeDatabases() {
if s.userManager != nil {
s.userManager.Close()
}
s.messageCache.Close()
}
2023-02-10 02:49:45 +01:00
// handle is the main entry point for all HTTP requests
2021-10-23 03:26:01 +02:00
func (s *Server) handle(w http.ResponseWriter, r *http.Request) {
2023-01-27 04:57:18 +01:00
v, err := s.maybeAuthenticate(r) // Note: Always returns v, even when error is returned
2022-12-02 21:37:48 +01:00
if err != nil {
2023-02-10 02:49:45 +01:00
s.handleError(w, r, v, err)
return
}
2023-02-15 16:55:01 +01:00
ev := logvr(v, r)
if ev.IsTrace() {
ev.Field("http_request", renderHTTPRequest(r)).Trace("HTTP request started")
} else if logvr(v, r).IsDebug() {
ev.Debug("HTTP request started")
2023-02-10 02:49:45 +01:00
}
logvr(v, r).
Timing(func() {
if err := s.handleInternal(w, r, v); err != nil {
s.handleError(w, r, v, err)
return
2022-06-01 22:57:35 +02:00
}
if metricHTTPRequests != nil {
metricHTTPRequests.WithLabelValues("200", "20000", r.Method).Inc()
}
2023-02-10 02:49:45 +01:00
}).
Debug("HTTP request finished")
}
func (s *Server) handleError(w http.ResponseWriter, r *http.Request, v *visitor, err error) {
httpErr, ok := err.(*errHTTP)
if !ok {
httpErr = errHTTPInternalError
}
if metricHTTPRequests != nil {
metricHTTPRequests.WithLabelValues(fmt.Sprintf("%d", httpErr.HTTPCode), fmt.Sprintf("%d", httpErr.Code), r.Method).Inc()
}
2023-02-28 03:13:15 +01:00
isRateLimiting := util.Contains(rateLimitingErrorCodes, httpErr.HTTPCode)
2023-02-25 21:31:12 +01:00
isNormalError := strings.Contains(err.Error(), "i/o timeout") || util.Contains(normalErrorCodes, httpErr.HTTPCode)
ev := logvr(v, r).Err(err)
2023-02-10 02:49:45 +01:00
if websocket.IsWebSocketUpgrade(r) {
2023-02-25 21:31:12 +01:00
ev.Tag(tagWebsocket).Fields(websocketErrorContext(err))
2022-06-01 22:57:35 +02:00
if isNormalError {
2023-02-25 21:31:12 +01:00
ev.Debug("WebSocket error (this error is okay, it happens a lot): %s", err.Error())
2022-06-01 22:57:35 +02:00
} else {
2023-02-25 21:31:12 +01:00
ev.Info("WebSocket error: %s", err.Error())
2022-06-01 22:57:35 +02:00
}
2023-02-10 02:49:45 +01:00
return // Do not attempt to write to upgraded connection
}
if isNormalError {
2023-02-25 21:31:12 +01:00
ev.Debug("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
2023-02-10 02:49:45 +01:00
} else {
2023-02-25 21:31:12 +01:00
ev.Info("Connection closed with HTTP %d (ntfy error %d)", httpErr.HTTPCode, httpErr.Code)
2021-10-23 03:26:01 +02:00
}
2023-02-28 03:13:15 +01:00
if isRateLimiting && s.config.StripeSecretKey != "" {
u := v.User()
if u == nil || u.Tier == nil {
httpErr = httpErr.Wrap("increase your limits with a paid plan, see %s", s.config.BaseURL)
}
}
2023-02-10 02:49:45 +01:00
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
w.WriteHeader(httpErr.HTTPCode)
io.WriteString(w, httpErr.JSON()+"\n")
2021-10-23 03:26:01 +02:00
}
2022-02-14 22:09:59 +01:00
func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visitor) error {
2023-05-01 17:58:49 +02:00
if r.Method == http.MethodGet && r.URL.Path == "/" && s.config.WebRoot == "/" {
return s.ensureWebEnabled(s.handleRoot)(w, r, v)
} else if r.Method == http.MethodHead && r.URL.Path == "/" {
2022-05-13 20:42:25 +02:00
return s.ensureWebEnabled(s.handleEmpty)(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodGet && r.URL.Path == apiHealthPath {
2022-12-24 18:22:54 +01:00
return s.handleHealth(w, r, v)
2022-05-13 20:42:25 +02:00
} else if r.Method == http.MethodGet && r.URL.Path == webConfigPath {
return s.ensureWebEnabled(s.handleWebConfig)(w, r, v)
} else if r.Method == http.MethodGet && r.URL.Path == apiUsersPath {
return s.ensureAdmin(s.handleUsersGet)(w, r, v)
} else if r.Method == http.MethodPut && r.URL.Path == apiUsersPath {
return s.ensureAdmin(s.handleUsersAdd)(w, r, v)
} else if r.Method == http.MethodDelete && r.URL.Path == apiUsersPath {
return s.ensureAdmin(s.handleUsersDelete)(w, r, v)
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == apiUsersAccessPath {
2023-05-13 20:39:31 +02:00
return s.ensureAdmin(s.handleAccessAllow)(w, r, v)
} else if r.Method == http.MethodDelete && r.URL.Path == apiUsersAccessPath {
2023-05-13 20:39:31 +02:00
return s.ensureAdmin(s.handleAccessReset)(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPost && r.URL.Path == apiAccountPath {
return s.ensureUserManager(s.handleAccountCreate)(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodGet && r.URL.Path == apiAccountPath {
2022-12-28 04:14:14 +01:00
return s.handleAccountGet(w, r, v) // Allowed by anonymous
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPath {
return s.ensureUser(s.withAccountSync(s.handleAccountDelete))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPost && r.URL.Path == apiAccountPasswordPath {
return s.ensureUser(s.handleAccountPasswordChange)(w, r, v)
2023-01-28 05:10:59 +01:00
} else if r.Method == http.MethodPost && r.URL.Path == apiAccountTokenPath {
return s.ensureUser(s.withAccountSync(s.handleAccountTokenCreate))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPatch && r.URL.Path == apiAccountTokenPath {
2023-01-28 05:10:59 +01:00
return s.ensureUser(s.withAccountSync(s.handleAccountTokenUpdate))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodDelete && r.URL.Path == apiAccountTokenPath {
2023-01-28 05:10:59 +01:00
return s.ensureUser(s.withAccountSync(s.handleAccountTokenDelete))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSettingsPath {
return s.ensureUser(s.withAccountSync(s.handleAccountSettingsChange))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPost && r.URL.Path == apiAccountSubscriptionPath {
return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionAdd))(w, r, v)
2023-02-12 20:09:44 +01:00
} else if r.Method == http.MethodPatch && r.URL.Path == apiAccountSubscriptionPath {
return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionChange))(w, r, v)
2023-02-12 20:09:44 +01:00
} else if r.Method == http.MethodDelete && r.URL.Path == apiAccountSubscriptionPath {
return s.ensureUser(s.withAccountSync(s.handleAccountSubscriptionDelete))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPost && r.URL.Path == apiAccountReservationPath {
return s.ensureUser(s.withAccountSync(s.handleAccountReservationAdd))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodDelete && apiAccountReservationSingleRegex.MatchString(r.URL.Path) {
return s.ensureUser(s.withAccountSync(s.handleAccountReservationDelete))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingSubscriptionPath {
return s.ensurePaymentsEnabled(s.ensureUser(s.handleAccountBillingSubscriptionCreate))(w, r, v) // Account sync via incoming Stripe webhook
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodGet && apiAccountBillingSubscriptionCheckoutSuccessRegex.MatchString(r.URL.Path) {
return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingSubscriptionCreateSuccess))(w, r, v) // No user context!
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPut && r.URL.Path == apiAccountBillingSubscriptionPath {
return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionUpdate))(w, r, v) // Account sync via incoming Stripe webhook
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodDelete && r.URL.Path == apiAccountBillingSubscriptionPath {
return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingSubscriptionDelete))(w, r, v) // Account sync via incoming Stripe webhook
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingPortalPath {
return s.ensurePaymentsEnabled(s.ensureStripeCustomer(s.handleAccountBillingPortalSessionCreate))(w, r, v)
2023-01-17 16:09:37 +01:00
} else if r.Method == http.MethodPost && r.URL.Path == apiAccountBillingWebhookPath {
return s.ensurePaymentsEnabled(s.ensureUserManager(s.handleAccountBillingWebhook))(w, r, v) // This request comes from Stripe!
2023-05-16 20:15:58 +02:00
} else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhoneVerifyPath {
return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberVerify)))(w, r, v)
2023-05-11 19:50:10 +02:00
} else if r.Method == http.MethodPut && r.URL.Path == apiAccountPhonePath {
2023-05-16 20:15:58 +02:00
return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberAdd)))(w, r, v)
2023-05-13 03:47:41 +02:00
} else if r.Method == http.MethodDelete && r.URL.Path == apiAccountPhonePath {
2023-05-16 20:15:58 +02:00
return s.ensureUser(s.ensureCallsEnabled(s.withAccountSync(s.handleAccountPhoneNumberDelete)))(w, r, v)
2023-04-21 04:04:11 +02:00
} else if r.Method == http.MethodGet && r.URL.Path == apiStatsPath {
return s.handleStats(w, r, v)
} else if r.Method == http.MethodGet && r.URL.Path == apiTiersPath {
return s.ensurePaymentsEnabled(s.handleBillingTiersGet)(w, r, v)
2022-06-15 22:03:12 +02:00
} else if r.Method == http.MethodGet && r.URL.Path == matrixPushPath {
return s.handleMatrixDiscovery(w)
} else if r.Method == http.MethodGet && r.URL.Path == metricsPath && s.metricsHandler != nil {
return s.handleMetrics(w, r, v)
2022-05-13 20:42:25 +02:00
} else if r.Method == http.MethodGet && staticRegex.MatchString(r.URL.Path) {
return s.ensureWebEnabled(s.handleStatic)(w, r, v)
} else if r.Method == http.MethodGet && docsRegex.MatchString(r.URL.Path) {
return s.ensureWebEnabled(s.handleDocs)(w, r, v)
} else if (r.Method == http.MethodGet || r.Method == http.MethodHead) && fileRegex.MatchString(r.URL.Path) && s.config.AttachmentCacheDir != "" {
2022-01-22 04:22:27 +01:00
return s.limitRequests(s.handleFile)(w, r, v)
} else if r.Method == http.MethodOptions {
2023-02-03 01:07:16 +01:00
return s.limitRequests(s.handleOptions)(w, r, v) // Should work even if the web app is not enabled, see #598
2022-03-16 19:16:54 +01:00
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && r.URL.Path == "/" {
2023-02-22 05:40:15 +01:00
return s.transformBodyJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish)))(w, r, v)
2022-06-14 04:07:30 +02:00
} else if r.Method == http.MethodPost && r.URL.Path == matrixPushPath {
2023-02-22 05:40:15 +01:00
return s.transformMatrixJSON(s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublishMatrix)))(w, r, v)
2021-12-27 22:06:40 +01:00
} else if (r.Method == http.MethodPut || r.Method == http.MethodPost) && topicPathRegex.MatchString(r.URL.Path) {
2023-02-22 05:40:15 +01:00
return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
2021-12-27 22:06:40 +01:00
} else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
2023-02-22 05:40:15 +01:00
return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
2021-12-27 22:06:40 +01:00
} else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
2022-12-03 21:20:59 +01:00
return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeJSON))(w, r, v)
2021-12-27 22:06:40 +01:00
} else if r.Method == http.MethodGet && ssePathRegex.MatchString(r.URL.Path) {
2022-12-03 21:20:59 +01:00
return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeSSE))(w, r, v)
2021-12-27 22:06:40 +01:00
} else if r.Method == http.MethodGet && rawPathRegex.MatchString(r.URL.Path) {
2022-12-03 21:20:59 +01:00
return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeRaw))(w, r, v)
2022-01-15 19:23:35 +01:00
} else if r.Method == http.MethodGet && wsPathRegex.MatchString(r.URL.Path) {
2022-12-03 21:20:59 +01:00
return s.limitRequests(s.authorizeTopicRead(s.handleSubscribeWS))(w, r, v)
2022-01-26 05:04:09 +01:00
} else if r.Method == http.MethodGet && authPathRegex.MatchString(r.URL.Path) {
2022-12-03 21:20:59 +01:00
return s.limitRequests(s.authorizeTopicRead(s.handleTopicAuth))(w, r, v)
} else if r.Method == http.MethodGet && (topicPathRegex.MatchString(r.URL.Path) || externalTopicPathRegex.MatchString(r.URL.Path)) {
2022-05-13 20:42:25 +02:00
return s.ensureWebEnabled(s.handleTopic)(w, r, v)
2021-10-23 03:26:01 +02:00
}
2021-10-24 04:49:50 +02:00
return errHTTPNotFound
2021-10-23 03:26:01 +02:00
}
2023-05-01 17:58:49 +02:00
func (s *Server) handleRoot(w http.ResponseWriter, r *http.Request, v *visitor) error {
r.URL.Path = webAppIndex
2022-05-13 20:42:25 +02:00
return s.handleStatic(w, r, v)
2021-10-23 03:26:01 +02:00
}
2022-05-13 20:42:25 +02:00
func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request, v *visitor) error {
2022-01-16 05:17:46 +01:00
unifiedpush := readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see PUT/POST too!
2021-12-25 22:07:55 +01:00
if unifiedpush {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
2021-12-25 22:07:55 +01:00
_, err := io.WriteString(w, `{"unifiedpush":{"version":1}}`+"\n")
return err
}
2022-03-06 02:24:10 +01:00
r.URL.Path = webAppIndex
2022-05-13 20:42:25 +02:00
return s.handleStatic(w, r, v)
2021-12-25 22:07:55 +01:00
}
2022-01-26 05:04:09 +01:00
func (s *Server) handleEmpty(_ http.ResponseWriter, _ *http.Request, _ *visitor) error {
return nil
}
2022-01-26 05:04:09 +01:00
func (s *Server) handleTopicAuth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
2023-01-18 21:50:06 +01:00
return s.writeJSON(w, newSuccessResponse())
2022-01-26 05:04:09 +01:00
}
2022-12-24 18:22:54 +01:00
func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
response := &apiHealthResponse{
Healthy: true,
}
2023-01-18 21:50:06 +01:00
return s.writeJSON(w, response)
2022-12-24 18:22:54 +01:00
}
2022-05-13 20:42:25 +02:00
func (s *Server) handleWebConfig(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
2023-01-05 02:34:22 +01:00
response := &apiConfigResponse{
2023-01-11 04:51:51 +01:00
BaseURL: "", // Will translate to window.location.origin
2023-05-01 17:58:49 +02:00
AppRoot: s.config.WebRoot,
2023-01-11 04:51:51 +01:00
EnableLogin: s.config.EnableLogin,
EnableSignup: s.config.EnableSignup,
EnablePayments: s.config.StripeSecretKey != "",
EnableCalls: s.config.TwilioAccount != "",
2023-05-17 16:58:28 +02:00
EnableEmails: s.config.SMTPSenderFrom != "",
2023-01-11 04:51:51 +01:00
EnableReservations: s.config.EnableReservations,
2023-02-28 20:38:31 +01:00
BillingContact: s.config.BillingContact,
2023-02-09 14:32:51 +01:00
DisallowedTopics: s.config.DisallowedTopics,
2023-01-05 02:34:22 +01:00
}
2023-01-10 02:37:13 +01:00
b, err := json.MarshalIndent(response, "", " ")
2023-01-05 02:34:22 +01:00
if err != nil {
return err
}
2022-03-11 21:56:54 +01:00
w.Header().Set("Content-Type", "text/javascript")
2023-01-05 02:34:22 +01:00
_, err = io.WriteString(w, fmt.Sprintf("// Generated server configuration\nvar config = %s;\n", string(b)))
return err
}
// handleMetrics returns Prometheus metrics. This endpoint is only called if enable-metrics is set,
// and listen-metrics-http is not set.
func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request, _ *visitor) error {
s.metricsHandler.ServeHTTP(w, r)
return nil
}
2023-04-21 04:04:11 +02:00
// handleStatic returns all static resources (excluding the docs), including the web app
2022-05-13 20:42:25 +02:00
func (s *Server) handleStatic(w http.ResponseWriter, r *http.Request, _ *visitor) error {
2022-03-06 02:24:10 +01:00
r.URL.Path = webSiteDir + r.URL.Path
2022-03-11 03:55:56 +01:00
util.Gzip(http.FileServer(http.FS(webFsCached))).ServeHTTP(w, r)
2021-10-29 19:58:14 +02:00
return nil
}
2023-04-21 04:04:11 +02:00
// handleDocs returns static resources related to the docs
2022-05-13 20:42:25 +02:00
func (s *Server) handleDocs(w http.ResponseWriter, r *http.Request, _ *visitor) error {
2022-03-11 03:55:56 +01:00
util.Gzip(http.FileServer(http.FS(docsStaticCached))).ServeHTTP(w, r)
2021-12-02 23:27:31 +01:00
return nil
}
2023-04-21 04:04:11 +02:00
// handleStats returns the publicly available server stats
func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
s.mu.RLock()
2023-04-21 17:09:13 +02:00
messages, n, rate := s.messages, len(s.messagesHistory), float64(0)
if n > 1 {
rate = float64(s.messagesHistory[n-1]-s.messagesHistory[0]) / (float64(n-1) * s.config.ManagerInterval.Seconds())
}
s.mu.RUnlock()
2023-04-21 04:04:11 +02:00
response := &apiStatsResponse{
2023-04-21 17:09:13 +02:00
Messages: messages,
2023-04-21 04:04:11 +02:00
MessagesRate: rate,
}
return s.writeJSON(w, response)
}
2023-01-29 21:11:26 +01:00
// handleFile processes the download of attachment files. The method handles GET and HEAD requests against a file.
// Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it
// can associate the download bandwidth with the uploader.
2022-01-13 00:52:07 +01:00
func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error {
2022-01-02 23:56:12 +01:00
if s.config.AttachmentCacheDir == "" {
2022-01-04 00:55:08 +01:00
return errHTTPInternalError
2022-01-02 23:56:12 +01:00
}
matches := fileRegex.FindStringSubmatch(r.URL.Path)
if len(matches) != 2 {
2022-12-29 15:57:42 +01:00
return errHTTPInternalErrorInvalidPath
2022-01-02 23:56:12 +01:00
}
messageID := matches[1]
file := filepath.Join(s.config.AttachmentCacheDir, messageID)
stat, err := os.Stat(file)
if err != nil {
2023-02-28 03:13:15 +01:00
return errHTTPNotFound.Fields(log.Context{
"message_id": messageID,
"error_context": "filesystem",
})
2022-01-02 23:56:12 +01:00
}
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
2023-01-29 21:11:26 +01:00
w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
if r.Method == http.MethodHead {
return nil
}
// Find message in database, and associate bandwidth to the uploader user
// This is an easy way to
// - avoid abuse (e.g. 1 uploader, 1k downloaders)
// - and also uses the higher bandwidth limits of a paying user
m, err := s.messageCache.Message(messageID)
if err == errMessageNotFound {
if s.config.CacheBatchTimeout > 0 {
// Strange edge case: If we immediately after upload request the file (the web app does this for images),
// and messages are persisted asynchronously, retry fetching from the database
m, err = util.Retry(func() (*message, error) {
return s.messageCache.Message(messageID)
}, s.config.CacheBatchTimeout, 100*time.Millisecond, 300*time.Millisecond, 600*time.Millisecond)
}
if err != nil {
2023-02-28 03:13:15 +01:00
return errHTTPNotFound.Fields(log.Context{
"message_id": messageID,
"error_context": "message_cache",
})
}
2023-01-29 21:11:26 +01:00
} else if err != nil {
return err
}
bandwidthVisitor := v
if s.userManager != nil && m.User != "" {
u, err := s.userManager.UserByID(m.User)
if err != nil {
return err
}
2023-01-29 21:11:26 +01:00
bandwidthVisitor = s.visitor(v.IP(), u)
2023-02-08 04:10:51 +01:00
} else if m.Sender.IsValid() {
2023-01-29 21:11:26 +01:00
bandwidthVisitor = s.visitor(m.Sender, nil)
}
if !bandwidthVisitor.BandwidthAllowed(stat.Size()) {
2023-02-28 03:13:15 +01:00
return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m)
2023-01-29 21:11:26 +01:00
}
// Actually send file
f, err := os.Open(file)
if err != nil {
2022-01-02 23:56:12 +01:00
return err
}
2023-01-29 21:11:26 +01:00
defer f.Close()
if m.Attachment.Name != "" {
w.Header().Set("Content-Disposition", "attachment; filename="+strconv.Quote(m.Attachment.Name))
}
2023-01-29 21:11:26 +01:00
_, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f)
return err
2022-01-02 23:56:12 +01:00
}
2022-06-15 22:03:12 +02:00
func (s *Server) handleMatrixDiscovery(w http.ResponseWriter) error {
if s.config.BaseURL == "" {
return errHTTPInternalErrorMissingBaseURL
}
2022-06-16 17:40:56 +02:00
return writeMatrixDiscoveryResponse(w)
2022-06-15 22:03:12 +02:00
}
2023-03-04 04:22:07 +01:00
func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, error) {
2023-03-17 03:19:20 +01:00
start := time.Now()
2023-03-14 15:19:15 +01:00
t, err := fromContext[*topic](r, contextTopic)
if err != nil {
return nil, err
}
vrate, err := fromContext[*visitor](r, contextRateVisitor)
if err != nil {
return nil, err
}
2022-04-03 18:39:52 +02:00
body, err := util.Peek(r.Body, s.config.MessageLimit)
2021-10-23 03:26:01 +02:00
if err != nil {
2022-06-15 02:43:17 +02:00
return nil, err
2021-10-23 03:26:01 +02:00
}
2022-01-02 23:56:12 +01:00
m := newDefaultMessage(t.ID, "")
2023-05-13 02:01:12 +02:00
cache, firebase, email, call, unifiedpush, e := s.parsePublishParams(r, m)
if e != nil {
return nil, e.With(t)
2021-10-29 05:50:38 +02:00
}
2023-03-04 02:23:18 +01:00
if unifiedpush && s.config.VisitorSubscriberRateLimiting && t.RateVisitor() == nil {
// UnifiedPush clients must subscribe before publishing to allow proper subscriber-based rate limiting (see
// Rate-Topics header). The 5xx response is because some app servers (in particular Mastodon) will remove
// the subscription as invalid if any 400-499 code (except 429/408) is returned.
// See https://github.com/mastodon/mastodon/blob/730bb3e211a84a2f30e3e2bbeae3f77149824a68/app/workers/web/push_notification_worker.rb#L35-L46
2023-03-01 04:25:13 +01:00
return nil, errHTTPInsufficientStorageUnifiedPush.With(t)
} else if !util.ContainsIP(s.config.VisitorRequestExemptIPAddrs, v.ip) && !vrate.MessageAllowed() {
2023-02-26 02:23:22 +01:00
return nil, errHTTPTooManyRequestsLimitMessages.With(t)
2023-02-25 03:10:41 +01:00
} else if email != "" && !vrate.EmailAllowed() {
2023-02-26 02:23:22 +01:00
return nil, errHTTPTooManyRequestsLimitEmails.With(t)
2023-05-13 18:26:14 +02:00
} else if call != "" {
var httpErr *errHTTP
call, httpErr = s.convertPhoneNumber(v.User(), call)
if httpErr != nil {
return nil, httpErr.With(t)
} else if !vrate.CallAllowed() {
2023-05-13 18:26:14 +02:00
return nil, errHTTPTooManyRequestsLimitCalls.With(t)
}
2023-02-24 02:46:53 +01:00
}
2022-05-27 13:55:57 +02:00
if m.PollID != "" {
m = newPollRequestMessage(t.ID, m.PollID)
}
2023-02-08 04:10:51 +01:00
m.Sender = v.IP()
m.User = v.MaybeUserID()
if cache {
m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
}
2022-01-17 19:28:07 +01:00
if err := s.handlePublishBody(r, v, m, body, unifiedpush); err != nil {
2022-06-15 02:43:17 +02:00
return nil, err
2021-12-24 00:03:04 +01:00
}
2021-12-15 15:41:55 +01:00
if m.Message == "" {
2021-12-24 00:03:04 +01:00
m.Message = emptyMessageBody
2021-12-15 15:41:55 +01:00
}
delayed := m.Time > time.Now().Unix()
2023-02-24 02:46:53 +01:00
ev := logvrm(v, r, m).
2023-02-04 04:21:50 +01:00
Tag(tagPublish).
2023-02-26 02:23:22 +01:00
With(t).
2023-02-06 05:34:27 +01:00
Fields(log.Context{
"message_delayed": delayed,
"message_firebase": firebase,
"message_unifiedpush": unifiedpush,
"message_email": email,
"message_call": call,
2023-02-13 19:20:05 +01:00
})
if ev.IsTrace() {
ev.Field("message_body", util.MaybeMarshalJSON(m)).Trace("Received message")
} else if ev.IsDebug() {
ev.Debug("Received message")
2022-06-02 05:24:44 +02:00
}
if !delayed {
2022-06-01 02:38:56 +02:00
if err := t.Publish(v, m); err != nil {
2022-06-15 02:43:17 +02:00
return nil, err
}
2022-06-01 22:57:35 +02:00
if s.firebaseClient != nil && firebase {
2023-02-24 02:46:53 +01:00
go s.sendToFirebase(v, m)
2022-06-01 22:57:35 +02:00
}
2022-06-02 05:24:44 +02:00
if s.smtpSender != nil && email != "" {
2022-06-01 22:57:35 +02:00
go s.sendEmail(v, m, email)
}
2023-05-07 17:59:15 +02:00
if s.config.TwilioAccount != "" && call != "" {
2023-05-05 22:22:54 +02:00
go s.callPhone(v, r, m, call)
}
2023-05-31 21:36:02 +02:00
if s.config.UpstreamBaseURL != "" && !unifiedpush { // UP messages are not sent to upstream
2022-06-01 22:57:35 +02:00
go s.forwardPollRequest(v, m)
}
} else {
2023-02-04 04:21:50 +01:00
logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later")
2022-05-27 13:55:57 +02:00
}
2021-12-09 16:23:17 +01:00
if cache {
2023-02-04 04:21:50 +01:00
logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
2022-11-16 16:28:20 +01:00
if err := s.messageCache.AddMessage(m); err != nil {
2022-06-15 02:43:17 +02:00
return nil, err
2022-11-16 16:28:20 +01:00
}
2021-11-02 19:08:21 +01:00
}
u := v.User()
if s.userManager != nil && u != nil && u.Tier != nil {
2023-02-09 21:24:12 +01:00
go s.userManager.EnqueueUserStats(u.ID, v.Stats())
}
2022-06-15 02:43:17 +02:00
s.mu.Lock()
s.messages++
s.mu.Unlock()
2023-03-14 15:19:15 +01:00
if unifiedpush {
minc(metricUnifiedPushPublishedSuccess)
2023-03-14 15:19:15 +01:00
}
2023-03-17 03:19:20 +01:00
mset(metricMessagePublishDurationMillis, time.Since(start).Milliseconds())
2022-06-15 02:43:17 +02:00
return m, nil
}
func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visitor) error {
2023-03-04 04:22:07 +01:00
m, err := s.handlePublishInternal(r, v)
2022-06-15 02:43:17 +02:00
if err != nil {
minc(metricMessagesPublishedFailure)
2022-06-15 02:43:17 +02:00
return err
}
minc(metricMessagesPublishedSuccess)
2023-01-18 21:50:06 +01:00
return s.writeJSON(w, m)
2022-06-15 02:43:17 +02:00
}
func (s *Server) handlePublishMatrix(w http.ResponseWriter, r *http.Request, v *visitor) error {
2023-03-04 04:22:07 +01:00
_, err := s.handlePublishInternal(r, v)
2022-06-15 02:43:17 +02:00
if err != nil {
minc(metricMessagesPublishedFailure)
minc(metricMatrixPublishedFailure)
2023-03-01 04:25:13 +01:00
if e, ok := err.(*errHTTP); ok && e.HTTPCode == errHTTPInsufficientStorageUnifiedPush.HTTPCode {
2023-03-14 15:19:15 +01:00
topic, err := fromContext[*topic](r, contextTopic)
if err != nil {
return err
}
pushKey, err := fromContext[string](r, contextMatrixPushKey)
if err != nil {
return err
}
2023-03-04 04:22:07 +01:00
if time.Since(topic.LastAccess()) > matrixRejectPushKeyForUnifiedPushTopicWithoutRateVisitorAfter {
return writeMatrixResponse(w, pushKey)
}
2023-03-01 04:25:13 +01:00
}
2023-02-25 06:56:57 +01:00
return err
2022-06-15 02:43:17 +02:00
}
minc(metricMessagesPublishedSuccess)
minc(metricMatrixPublishedSuccess)
2022-06-15 22:03:12 +02:00
return writeMatrixSuccess(w)
2021-10-23 03:26:01 +02:00
}
2022-05-28 02:30:20 +02:00
func (s *Server) sendToFirebase(v *visitor, m *message) {
2023-02-04 04:21:50 +01:00
logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
2022-06-01 05:16:44 +02:00
if err := s.firebaseClient.Send(v, m); err != nil {
minc(metricFirebasePublishedFailure)
if err == errFirebaseTemporarilyBanned {
2023-02-04 04:21:50 +01:00
logvm(v, m).Tag(tagFirebase).Err(err).Debug("Unable to publish to Firebase: %v", err.Error())
} else {
2023-02-04 04:21:50 +01:00
logvm(v, m).Tag(tagFirebase).Err(err).Warn("Unable to publish to Firebase: %v", err.Error())
}
2023-03-07 04:16:10 +01:00
return
2022-05-28 02:30:20 +02:00
}
minc(metricFirebasePublishedSuccess)
2022-05-28 02:30:20 +02:00
}
func (s *Server) sendEmail(v *visitor, m *message, email string) {
2023-02-04 04:21:50 +01:00
logvm(v, m).Tag(tagEmail).Field("email", email).Debug("Sending email to %s", email)
2022-06-02 05:24:44 +02:00
if err := s.smtpSender.Send(v, m, email); err != nil {
2023-02-04 04:21:50 +01:00
logvm(v, m).Tag(tagEmail).Field("email", email).Err(err).Warn("Unable to send email to %s: %v", email, err.Error())
minc(metricEmailsPublishedFailure)
2023-03-07 04:16:10 +01:00
return
2022-05-28 02:30:20 +02:00
}
minc(metricEmailsPublishedSuccess)
2022-05-28 02:30:20 +02:00
}
func (s *Server) forwardPollRequest(v *visitor, m *message) {
topicURL := fmt.Sprintf("%s/%s", s.config.BaseURL, m.Topic)
topicHash := fmt.Sprintf("%x", sha256.Sum256([]byte(topicURL)))
forwardURL := fmt.Sprintf("%s/%s", s.config.UpstreamBaseURL, topicHash)
2023-02-04 04:21:50 +01:00
logvm(v, m).Debug("Publishing poll request to %s", forwardURL)
2022-05-28 02:30:20 +02:00
req, err := http.NewRequest("POST", forwardURL, strings.NewReader(""))
if err != nil {
2023-02-04 04:21:50 +01:00
logvm(v, m).Err(err).Warn("Unable to publish poll request")
2022-05-28 02:30:20 +02:00
return
}
2023-05-18 19:08:10 +02:00
req.Header.Set("User-Agent", "ntfy/"+s.config.Version)
2022-05-28 02:30:20 +02:00
req.Header.Set("X-Poll-ID", m.ID)
2023-05-18 19:08:10 +02:00
if s.config.UpstreamAccessToken != "" {
req.Header.Set("Authorization", util.BearerAuth(s.config.UpstreamAccessToken))
}
var httpClient = &http.Client{
Timeout: time.Second * 10,
}
response, err := httpClient.Do(req)
2022-05-28 02:30:20 +02:00
if err != nil {
2023-02-04 04:21:50 +01:00
logvm(v, m).Err(err).Warn("Unable to publish poll request")
2022-05-28 02:30:20 +02:00
return
} else if response.StatusCode != http.StatusOK {
if response.StatusCode == http.StatusTooManyRequests {
logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s; you may solve this by sending fewer daily messages, or by configuring upstream-access-token (assuming you have an account with higher rate limits) ", s.config.UpstreamBaseURL, response.Status)
} else {
2023-05-23 20:24:11 +02:00
logvm(v, m).Err(err).Warn("Unable to publish poll request, the upstream server %s responded with HTTP %s", s.config.UpstreamBaseURL, response.Status)
}
2022-05-28 02:30:20 +02:00
return
}
}
2023-05-13 02:01:12 +02:00
func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, firebase bool, email, call string, unifiedpush bool, err *errHTTP) {
2022-01-16 05:17:46 +01:00
cache = readBoolParam(r, true, "x-cache", "cache")
firebase = readBoolParam(r, true, "x-firebase", "firebase")
m.Title = readParam(r, "x-title", "title", "t")
2022-01-04 23:40:41 +01:00
m.Click = readParam(r, "x-click", "click")
2022-07-17 23:47:21 +02:00
icon := readParam(r, "x-icon", "icon")
filename := readParam(r, "x-filename", "filename", "file", "f")
2022-01-14 18:13:14 +01:00
attach := readParam(r, "x-attach", "attach", "a")
if attach != "" || filename != "" {
m.Attachment = &attachment{}
}
2022-01-14 18:13:14 +01:00
if filename != "" {
m.Attachment.Name = filename
}
if attach != "" {
2022-07-17 23:47:21 +02:00
if !urlRegex.MatchString(attach) {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestAttachmentURLInvalid
}
m.Attachment.URL = attach
2022-01-14 18:13:14 +01:00
if m.Attachment.Name == "" {
u, err := url.Parse(m.Attachment.URL)
if err == nil {
m.Attachment.Name = path.Base(u.Path)
if m.Attachment.Name == "." || m.Attachment.Name == "/" {
m.Attachment.Name = ""
}
}
}
if m.Attachment.Name == "" {
m.Attachment.Name = "attachment"
}
}
2022-07-17 23:47:21 +02:00
if icon != "" {
if !urlRegex.MatchString(icon) {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestIconURLInvalid
2022-07-17 23:47:21 +02:00
}
m.Icon = icon
}
email = readParam(r, "x-email", "x-e-mail", "email", "e-mail", "mail", "e")
2022-06-02 05:24:44 +02:00
if s.smtpSender == nil && email != "" {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestEmailDisabled
2023-05-05 22:22:54 +02:00
}
call = readParam(r, "x-call", "call")
2023-05-17 16:58:28 +02:00
if call != "" && (s.config.TwilioAccount == "" || s.userManager == nil) {
return false, false, "", "", false, errHTTPBadRequestPhoneCallsDisabled
2023-05-13 18:26:14 +02:00
} else if call != "" && !isBoolValue(call) && !phoneNumberRegex.MatchString(call) {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestPhoneNumberInvalid
}
messageStr := strings.ReplaceAll(readParam(r, "x-message", "message", "m"), "\\n", "\n")
2021-12-15 15:41:55 +01:00
if messageStr != "" {
m.Message = messageStr
2021-12-15 15:41:55 +01:00
}
var e error
m.Priority, e = util.ParsePriority(readParam(r, "x-priority", "priority", "prio", "p"))
if e != nil {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestPriorityInvalid
2021-11-27 22:12:08 +01:00
}
2023-02-23 03:33:18 +01:00
m.Tags = readCommaSeparatedParam(r, "x-tags", "tags", "tag", "ta")
2021-12-15 15:41:55 +01:00
delayStr := readParam(r, "x-delay", "delay", "x-at", "at", "x-in", "in")
2021-12-11 06:06:25 +01:00
if delayStr != "" {
if !cache {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestDelayNoCache
}
2021-12-24 00:03:04 +01:00
if email != "" {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestDelayNoEmail // we cannot store the email address (yet)
2021-12-24 00:03:04 +01:00
}
2023-05-17 16:39:15 +02:00
if call != "" {
return false, false, "", "", false, errHTTPBadRequestDelayNoCall // we cannot store the phone number (yet)
}
2021-12-11 06:06:25 +01:00
delay, err := util.ParseFutureTime(delayStr, time.Now())
if err != nil {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestDelayCannotParse
2021-12-11 06:06:25 +01:00
} else if delay.Unix() < time.Now().Add(s.config.MinDelay).Unix() {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestDelayTooSmall
2021-12-11 06:06:25 +01:00
} else if delay.Unix() > time.Now().Add(s.config.MaxDelay).Unix() {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestDelayTooLarge
}
2021-12-11 06:06:25 +01:00
m.Time = delay.Unix()
}
2022-04-16 22:17:58 +02:00
actionsStr := readParam(r, "x-actions", "actions", "action")
if actionsStr != "" {
m.Actions, e = parseActions(actionsStr)
if e != nil {
2023-05-13 02:01:12 +02:00
return false, false, "", "", false, errHTTPBadRequestActionsInvalid.Wrap(e.Error())
2022-04-17 20:29:43 +02:00
}
2022-04-16 22:17:58 +02:00
}
2022-01-17 19:28:07 +01:00
unifiedpush = readBoolParam(r, false, "x-unifiedpush", "unifiedpush", "up") // see GET too!
2021-12-25 22:07:55 +01:00
if unifiedpush {
firebase = false
2022-01-17 19:28:07 +01:00
unifiedpush = true
2021-12-25 22:07:55 +01:00
}
2022-05-29 04:06:46 +02:00
m.PollID = readParam(r, "x-poll-id", "poll-id")
2022-05-27 13:55:57 +02:00
if m.PollID != "" {
unifiedpush = false
cache = false
email = ""
}
2023-05-13 02:01:12 +02:00
return cache, firebase, email, call, unifiedpush, nil
2021-11-27 22:12:08 +01:00
}
// handlePublishBody consumes the PUT/POST body and decides whether the body is an attachment or the message.
//
2022-09-27 18:37:02 +02:00
// 1. curl -X POST -H "Poll: 1234" ntfy.sh/...
// If a message is flagged as poll request, the body does not matter and is discarded
// 2. curl -T somebinarydata.bin "ntfy.sh/mytopic?up=1"
// If body is binary, encode as base64, if not do not encode
// 3. curl -H "Attach: http://example.com/file.jpg" ntfy.sh/mytopic
// Body must be a message, because we attached an external URL
// 4. curl -T short.txt -H "Filename: short.txt" ntfy.sh/mytopic
// Body must be attachment, because we passed a filename
// 5. curl -T file.txt ntfy.sh/mytopic
// If file.txt is <= 4096 (message limit) and valid UTF-8, treat it as a message
// 6. curl -T file.txt ntfy.sh/mytopic
// If file.txt is > message limit, treat it as an attachment
2022-04-03 18:39:52 +02:00
func (s *Server) handlePublishBody(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser, unifiedpush bool) error {
2022-05-28 02:30:20 +02:00
if m.Event == pollRequestEvent { // Case 1
2022-06-02 05:24:44 +02:00
return s.handleBodyDiscard(body)
2022-05-27 13:55:57 +02:00
} else if unifiedpush {
2022-05-28 02:30:20 +02:00
return s.handleBodyAsMessageAutoDetect(m, body) // Case 2
2022-01-17 19:28:07 +01:00
} else if m.Attachment != nil && m.Attachment.URL != "" {
2022-05-28 02:30:20 +02:00
return s.handleBodyAsTextMessage(m, body) // Case 3
} else if m.Attachment != nil && m.Attachment.Name != "" {
2022-05-28 02:30:20 +02:00
return s.handleBodyAsAttachment(r, v, m, body) // Case 4
2022-04-03 18:39:52 +02:00
} else if !body.LimitReached && utf8.Valid(body.PeekedBytes) {
2022-05-28 02:30:20 +02:00
return s.handleBodyAsTextMessage(m, body) // Case 5
}
2022-05-28 02:30:20 +02:00
return s.handleBodyAsAttachment(r, v, m, body) // Case 6
2022-01-17 19:28:07 +01:00
}
2022-06-02 05:24:44 +02:00
func (s *Server) handleBodyDiscard(body *util.PeekedReadCloser) error {
_, err := io.Copy(io.Discard, body)
_ = body.Close()
return err
}
2022-04-03 18:39:52 +02:00
func (s *Server) handleBodyAsMessageAutoDetect(m *message, body *util.PeekedReadCloser) error {
if utf8.Valid(body.PeekedBytes) {
m.Message = string(body.PeekedBytes) // Do not trim
2022-01-17 19:28:07 +01:00
} else {
2022-04-03 18:39:52 +02:00
m.Message = base64.StdEncoding.EncodeToString(body.PeekedBytes)
2022-01-17 19:28:07 +01:00
m.Encoding = encodingBase64
}
return nil
}
2022-04-03 18:39:52 +02:00
func (s *Server) handleBodyAsTextMessage(m *message, body *util.PeekedReadCloser) error {
if !utf8.Valid(body.PeekedBytes) {
2023-02-26 02:23:22 +01:00
return errHTTPBadRequestMessageNotUTF8.With(m)
}
2022-04-03 18:39:52 +02:00
if len(body.PeekedBytes) > 0 { // Empty body should not override message (publish via GET!)
m.Message = strings.TrimSpace(string(body.PeekedBytes)) // Truncates the message to the peek limit if required
}
2022-01-10 19:38:51 +01:00
if m.Attachment != nil && m.Attachment.Name != "" && m.Message == "" {
m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
}
return nil
}
2022-04-03 18:39:52 +02:00
func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, body *util.PeekedReadCloser) error {
2022-01-12 17:05:04 +01:00
if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" {
2023-02-26 02:23:22 +01:00
return errHTTPBadRequestAttachmentsDisallowed.With(m)
}
2023-01-09 21:40:46 +01:00
vinfo, err := v.Info()
if err != nil {
return err
}
2023-01-09 21:40:46 +01:00
attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
if m.Time > attachmentExpiry {
2023-02-28 03:13:15 +01:00
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
}
2022-01-11 18:58:11 +01:00
contentLengthStr := r.Header.Get("Content-Length")
if contentLengthStr != "" { // Early "do-not-trust" check, hard limit see below
contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
2023-01-09 21:40:46 +01:00
if err == nil && (contentLength > vinfo.Stats.AttachmentTotalSizeRemaining || contentLength > vinfo.Limits.AttachmentFileSizeLimit) {
2023-02-28 03:13:15 +01:00
return errHTTPEntityTooLargeAttachment.With(m).Fields(log.Context{
"message_content_length": contentLength,
"attachment_total_size_remaining": vinfo.Stats.AttachmentTotalSizeRemaining,
"attachment_file_size_limit": vinfo.Limits.AttachmentFileSizeLimit,
})
2022-01-11 18:58:11 +01:00
}
}
if m.Attachment == nil {
m.Attachment = &attachment{}
}
2022-01-10 19:38:51 +01:00
var ext string
m.Attachment.Expires = attachmentExpiry
2022-04-03 18:39:52 +02:00
m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name)
m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext)
if m.Attachment.Name == "" {
m.Attachment.Name = fmt.Sprintf("attachment%s", ext)
}
if m.Message == "" {
2022-01-10 19:38:51 +01:00
m.Message = fmt.Sprintf(defaultAttachmentMessage, m.Attachment.Name)
2022-01-04 00:55:08 +01:00
}
2022-12-21 19:19:07 +01:00
limiters := []util.Limiter{
v.BandwidthLimiter(),
2023-01-09 21:40:46 +01:00
util.NewFixedLimiter(vinfo.Limits.AttachmentFileSizeLimit),
util.NewFixedLimiter(vinfo.Stats.AttachmentTotalSizeRemaining),
2022-12-21 19:19:07 +01:00
}
m.Attachment.Size, err = s.fileCache.Write(m.ID, body, limiters...)
if err == util.ErrLimitReached {
2023-02-26 02:23:22 +01:00
return errHTTPEntityTooLargeAttachment.With(m)
} else if err != nil {
2022-01-02 23:56:12 +01:00
return err
}
return nil
}
2021-11-01 20:21:38 +01:00
func (s *Server) handleSubscribeJSON(w http.ResponseWriter, r *http.Request, v *visitor) error {
encoder := func(msg *message) (string, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
return "", err
2021-10-23 03:26:01 +02:00
}
return buf.String(), nil
2021-10-23 03:26:01 +02:00
}
2022-01-16 05:17:46 +01:00
return s.handleSubscribeHTTP(w, r, v, "application/x-ndjson", encoder)
2021-10-23 03:26:01 +02:00
}
2021-11-01 20:21:38 +01:00
func (s *Server) handleSubscribeSSE(w http.ResponseWriter, r *http.Request, v *visitor) error {
encoder := func(msg *message) (string, error) {
2021-10-23 19:21:33 +02:00
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&msg); err != nil {
return "", err
2021-10-23 19:21:33 +02:00
}
2021-10-29 14:29:27 +02:00
if msg.Event != messageEvent {
return fmt.Sprintf("event: %s\ndata: %s\n", msg.Event, buf.String()), nil // Browser's .onmessage() does not fire on this!
2021-10-23 19:21:33 +02:00
}
return fmt.Sprintf("data: %s\n", buf.String()), nil
2021-10-23 21:22:17 +02:00
}
2022-01-16 05:17:46 +01:00
return s.handleSubscribeHTTP(w, r, v, "text/event-stream", encoder)
2021-10-23 19:21:33 +02:00
}
2021-11-01 20:21:38 +01:00
func (s *Server) handleSubscribeRaw(w http.ResponseWriter, r *http.Request, v *visitor) error {
encoder := func(msg *message) (string, error) {
2021-11-02 19:10:56 +01:00
if msg.Event == messageEvent { // only handle default events
return strings.ReplaceAll(msg.Message, "\n", " ") + "\n", nil
}
return "\n", nil // "keepalive" and "open" events just send an empty line
}
2022-01-16 05:17:46 +01:00
return s.handleSubscribeHTTP(w, r, v, "text/plain", encoder)
}
2022-01-16 05:17:46 +01:00
func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *visitor, contentType string, encoder messageEncoder) error {
2023-02-09 21:24:12 +01:00
logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection opened")
defer logvr(v, r).Tag(tagSubscribe).Debug("HTTP stream connection closed")
if !v.SubscriptionAllowed() {
2021-12-25 15:15:05 +01:00
return errHTTPTooManyRequestsLimitSubscriptions
2021-11-01 20:21:38 +01:00
}
defer v.RemoveSubscription()
2022-01-16 05:17:46 +01:00
topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
2021-11-01 21:39:40 +01:00
if err != nil {
return err
}
2023-02-23 04:26:43 +01:00
poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
2021-12-21 21:22:27 +01:00
if err != nil {
return err
}
2021-12-22 09:44:16 +01:00
var wlock sync.Mutex
2022-06-22 19:47:54 +02:00
defer func() {
// Hack: This is the fix for a horrible data race that I have not been able to figure out in quite some time.
// It appears to be happening when the Go HTTP code reads from the socket when closing the request (i.e. AFTER
// this function returns), and causes a data race with the ResponseWriter. Locking wlock here silences the
// data race detector. See https://github.com/binwiederhier/ntfy/issues/338#issuecomment-1163425889.
wlock.TryLock()
}()
2022-06-01 02:38:56 +02:00
sub := func(v *visitor, msg *message) error {
2022-01-16 05:17:46 +01:00
if !filters.Pass(msg) {
2021-12-21 21:22:27 +01:00
return nil
}
m, err := encoder(msg)
if err != nil {
return err
}
2021-12-21 21:22:27 +01:00
wlock.Lock()
defer wlock.Unlock()
if _, err := w.Write([]byte(m)); err != nil {
return err
}
if fl, ok := w.(http.Flusher); ok {
fl.Flush()
}
return nil
}
2023-02-24 20:45:30 +01:00
if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
return err
}
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
w.Header().Set("Content-Type", contentType+"; charset=utf-8") // Android/Volley client needs charset!
2021-10-29 19:58:14 +02:00
if poll {
2023-03-04 04:22:07 +01:00
for _, t := range topics {
t.Keepalive()
}
2022-06-01 02:38:56 +02:00
return s.sendOldMessages(topics, since, scheduled, v, sub)
2021-10-29 19:58:14 +02:00
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2021-11-15 13:56:58 +01:00
subscriberIDs := make([]int, 0)
for _, t := range topics {
2023-02-24 02:46:53 +01:00
subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
2021-11-15 13:56:58 +01:00
}
defer func() {
for i, subscriberID := range subscriberIDs {
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
2022-06-01 02:38:56 +02:00
if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
2021-10-29 19:58:14 +02:00
return err
}
2022-06-01 02:38:56 +02:00
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case <-r.Context().Done():
return nil
case <-time.After(s.config.KeepaliveInterval):
2023-03-04 04:22:07 +01:00
ev := logvr(v, r).Tag(tagSubscribe)
if len(topics) == 1 {
ev.With(topics[0]).Trace("Sending keepalive message to %s", topics[0].ID)
} else {
ev.Trace("Sending keepalive message to %d topics", len(topics))
}
2021-11-01 20:21:38 +01:00
v.Keepalive()
2023-03-01 04:25:13 +01:00
for _, t := range topics {
t.Keepalive()
}
2022-06-01 02:38:56 +02:00
if err := sub(v, newKeepaliveMessage(topicsStr)); err != nil { // Send keepalive message
return err
}
}
}
}
2022-01-15 19:23:35 +01:00
func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *visitor) error {
if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
return errHTTPBadRequestWebSocketsUpgradeHeaderMissing
}
if !v.SubscriptionAllowed() {
2022-01-15 19:23:35 +01:00
return errHTTPTooManyRequestsLimitSubscriptions
}
defer v.RemoveSubscription()
2023-02-04 04:21:50 +01:00
logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection opened")
defer logvr(v, r).Tag(tagWebsocket).Debug("WebSocket connection closed")
2022-01-16 05:17:46 +01:00
topics, topicsStr, err := s.topicsFromPath(r.URL.Path)
2022-01-15 19:23:35 +01:00
if err != nil {
return err
}
2023-02-23 04:26:43 +01:00
poll, since, scheduled, filters, rateTopics, err := parseSubscribeParams(r)
2022-01-15 19:23:35 +01:00
if err != nil {
return err
}
upgrader := &websocket.Upgrader{
ReadBufferSize: wsBufferSize,
WriteBufferSize: wsBufferSize,
CheckOrigin: func(r *http.Request) bool {
return true // We're open for business!
},
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}
defer conn.Close()
2023-05-13 20:39:31 +02:00
// Subscription connections can be canceled externally, see topic.CancelSubscribersExceptUser
2023-01-24 21:31:39 +01:00
cancelCtx, cancel := context.WithCancel(context.Background())
defer cancel()
// Use errgroup to run WebSocket reader and writer in Go routines
2022-01-16 06:07:32 +01:00
var wlock sync.Mutex
2023-01-24 21:31:39 +01:00
g, gctx := errgroup.WithContext(cancelCtx)
2022-01-15 19:23:35 +01:00
g.Go(func() error {
pongWait := s.config.KeepaliveInterval + wsPongWait
conn.SetReadLimit(wsReadLimit)
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
return err
}
conn.SetPongHandler(func(appData string) error {
2023-02-04 04:21:50 +01:00
logvr(v, r).Tag(tagWebsocket).Trace("Received WebSocket pong")
2022-01-15 19:23:35 +01:00
return conn.SetReadDeadline(time.Now().Add(pongWait))
})
for {
_, _, err := conn.NextReader()
if err != nil {
return err
}
select {
case <-gctx.Done():
return nil
default:
}
2022-01-15 19:23:35 +01:00
}
})
g.Go(func() error {
ping := func() error {
2022-01-16 06:07:32 +01:00
wlock.Lock()
defer wlock.Unlock()
2022-01-15 19:23:35 +01:00
if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
return err
}
2023-02-04 04:21:50 +01:00
logvr(v, r).Tag(tagWebsocket).Trace("Sending WebSocket ping")
2022-01-15 19:23:35 +01:00
return conn.WriteMessage(websocket.PingMessage, nil)
}
for {
select {
case <-gctx.Done():
2022-01-15 19:23:35 +01:00
return nil
2023-01-24 21:31:39 +01:00
case <-cancelCtx.Done():
2023-02-04 04:21:50 +01:00
logvr(v, r).Tag(tagWebsocket).Trace("Cancel received, closing subscriber connection")
2023-01-24 20:44:14 +01:00
conn.Close()
return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"}
2022-01-15 19:23:35 +01:00
case <-time.After(s.config.KeepaliveInterval):
v.Keepalive()
2023-03-01 04:25:13 +01:00
for _, t := range topics {
t.Keepalive()
}
2022-01-15 19:23:35 +01:00
if err := ping(); err != nil {
return err
}
}
}
})
2022-06-01 02:38:56 +02:00
sub := func(v *visitor, msg *message) error {
2022-01-16 05:17:46 +01:00
if !filters.Pass(msg) {
2022-01-15 19:23:35 +01:00
return nil
}
2022-01-16 06:07:32 +01:00
wlock.Lock()
defer wlock.Unlock()
2022-01-15 19:23:35 +01:00
if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
return err
}
return conn.WriteJSON(msg)
}
2023-02-24 20:45:30 +01:00
if err := s.maybeSetRateVisitors(r, v, topics, rateTopics); err != nil {
return err
}
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
2022-01-15 19:23:35 +01:00
if poll {
2023-03-04 04:22:07 +01:00
for _, t := range topics {
t.Keepalive()
}
2022-06-01 02:38:56 +02:00
return s.sendOldMessages(topics, since, scheduled, v, sub)
2022-01-15 19:23:35 +01:00
}
subscriberIDs := make([]int, 0)
for _, t := range topics {
2023-02-24 02:46:53 +01:00
subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
2022-01-15 19:23:35 +01:00
}
defer func() {
for i, subscriberID := range subscriberIDs {
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
2022-06-01 02:38:56 +02:00
if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
2022-01-15 19:23:35 +01:00
return err
}
2022-06-01 02:38:56 +02:00
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
2022-01-15 19:23:35 +01:00
return err
}
2022-01-16 04:33:35 +01:00
err = g.Wait()
2023-02-16 20:21:19 +01:00
if err != nil && websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
2023-02-06 22:01:32 +01:00
logvr(v, r).Tag(tagWebsocket).Err(err).Fields(websocketErrorContext(err)).Trace("WebSocket connection closed")
2022-06-02 05:24:44 +02:00
return nil // Normal closures are not errors; note: "1006 (abnormal closure)" is treated as normal, because people disconnect a lot
2022-01-16 04:33:35 +01:00
}
return err
2022-01-15 19:23:35 +01:00
}
2023-02-23 04:26:43 +01:00
func parseSubscribeParams(r *http.Request) (poll bool, since sinceMarker, scheduled bool, filters *queryFilter, rateTopics []string, err error) {
2022-01-16 05:17:46 +01:00
poll = readBoolParam(r, false, "x-poll", "poll", "po")
scheduled = readBoolParam(r, false, "x-scheduled", "scheduled", "sched")
since, err = parseSince(r, poll)
if err != nil {
return
2021-12-21 21:22:27 +01:00
}
2022-01-16 05:17:46 +01:00
filters, err = parseQueryFilters(r)
if err != nil {
return
2021-12-21 21:22:27 +01:00
}
2023-02-23 04:26:43 +01:00
rateTopics = readCommaSeparatedParam(r, "x-rate-topics", "rate-topics")
2022-01-16 05:17:46 +01:00
return
2021-12-21 21:22:27 +01:00
}
2023-02-24 20:45:30 +01:00
// maybeSetRateVisitors sets the rate visitor on a topic (v.SetRateVisitor), indicating that all messages published
// to that topic will be rate limited against the rate visitor instead of the publishing visitor.
//
2023-03-04 02:23:18 +01:00
// Setting the rate visitor is ony allowed if the `visitor-subscriber-rate-limiting` setting is enabled, AND
2023-02-24 20:45:30 +01:00
// - auth-file is not set (everything is open by default)
2023-03-03 20:55:37 +01:00
// - or the topic is reserved, and v.user is the owner
// - or the topic is not reserved, and v.user has write access
2023-02-23 04:26:43 +01:00
//
// Note: This TEMPORARILY also registers all topics starting with "up" (= UnifiedPush). This is to ease the transition
// until the Android app will send the "Rate-Topics" header.
2023-02-24 20:45:30 +01:00
func (s *Server) maybeSetRateVisitors(r *http.Request, v *visitor, topics []*topic, rateTopics []string) error {
2023-03-03 20:55:37 +01:00
// Bail out if not enabled
2023-03-04 02:23:18 +01:00
if !s.config.VisitorSubscriberRateLimiting {
2023-03-03 20:55:37 +01:00
return nil
}
2023-02-24 20:45:30 +01:00
// Make a list of topics that we'll actually set the RateVisitor on
eligibleRateTopics := make([]*topic, 0)
for _, t := range topics {
2023-02-25 03:10:41 +01:00
if (strings.HasPrefix(t.ID, unifiedPushTopicPrefix) && len(t.ID) == unifiedPushTopicLength) || util.Contains(rateTopics, t.ID) {
2023-02-24 20:45:30 +01:00
eligibleRateTopics = append(eligibleRateTopics, t)
2023-02-23 04:26:43 +01:00
}
2023-02-24 20:45:30 +01:00
}
if len(eligibleRateTopics) == 0 {
return nil
}
// If access controls are turned off, v has access to everything, and we can set the rate visitor
if s.userManager == nil {
return s.setRateVisitors(r, v, eligibleRateTopics)
}
// If access controls are enabled, only set rate visitor if
// - topic is reserved, and v.user is the owner
// - topic is not reserved, and v.user has write access
writableRateTopics := make([]*topic, 0)
for _, t := range topics {
ownerUserID, err := s.userManager.ReservationOwner(t.ID)
if err != nil {
return err
}
if ownerUserID == "" {
if err := s.userManager.Authorize(v.User(), t.ID, user.PermissionWrite); err == nil {
writableRateTopics = append(writableRateTopics, t)
2023-02-23 04:26:43 +01:00
}
2023-02-24 20:45:30 +01:00
} else if ownerUserID == v.MaybeUserID() {
writableRateTopics = append(writableRateTopics, t)
2023-02-23 04:26:43 +01:00
}
}
2023-02-24 20:45:30 +01:00
return s.setRateVisitors(r, v, writableRateTopics)
}
func (s *Server) setRateVisitors(r *http.Request, v *visitor, rateTopics []*topic) error {
for _, t := range rateTopics {
logvr(v, r).
Tag(tagSubscribe).
2023-02-26 02:23:22 +01:00
With(t).
2023-02-24 20:45:30 +01:00
Debug("Setting visitor as rate visitor for topic %s", t.ID)
t.SetRateVisitor(v)
}
return nil
2023-02-23 04:26:43 +01:00
}
// sendOldMessages selects old messages from the messageCache and calls sub for each of them. It uses since as the
// marker, returning only messages that are newer than the marker.
2022-06-01 02:38:56 +02:00
func (s *Server) sendOldMessages(topics []*topic, since sinceMarker, scheduled bool, v *visitor, sub subscriber) error {
if since.IsNone() {
2021-10-29 19:58:14 +02:00
return nil
}
messages := make([]*message, 0)
2021-11-15 13:56:58 +01:00
for _, t := range topics {
topicMessages, err := s.messageCache.Messages(t.ID, since, scheduled)
2021-11-15 13:56:58 +01:00
if err != nil {
2021-10-29 19:58:14 +02:00
return err
}
messages = append(messages, topicMessages...)
}
sort.Slice(messages, func(i, j int) bool {
return messages[i].Time < messages[j].Time
})
for _, m := range messages {
if err := sub(v, m); err != nil {
return err
2021-11-15 13:56:58 +01:00
}
2021-10-29 19:58:14 +02:00
}
2021-10-24 19:34:15 +02:00
return nil
}
// parseSince returns a timestamp identifying the time span from which cached messages should be received.
//
// Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
// "all" for all messages.
2022-02-26 21:57:10 +01:00
func parseSince(r *http.Request, poll bool) (sinceMarker, error) {
2021-12-22 09:44:16 +01:00
since := readParam(r, "x-since", "since", "si")
2022-02-26 21:57:10 +01:00
// Easy cases (empty, all, none)
2021-12-22 09:44:16 +01:00
if since == "" {
if poll {
return sinceAllMessages, nil
}
return sinceNoMessages, nil
2022-02-26 21:57:10 +01:00
} else if since == "all" {
return sinceAllMessages, nil
2022-02-26 21:57:10 +01:00
} else if since == "none" {
return sinceNoMessages, nil
}
// ID, timestamp, duration
if validMessageID(since) {
return newSinceID(since), nil
2021-12-22 09:44:16 +01:00
} else if s, err := strconv.ParseInt(since, 10, 64); err == nil {
2022-02-26 21:57:10 +01:00
return newSinceTime(s), nil
2021-12-22 09:44:16 +01:00
} else if d, err := time.ParseDuration(since); err == nil {
2022-02-26 21:57:10 +01:00
return newSinceTime(time.Now().Add(-1 * d).Unix()), nil
2021-10-29 19:58:14 +02:00
}
2021-12-25 15:15:05 +01:00
return sinceNoMessages, errHTTPBadRequestSinceInvalid
2021-10-29 19:58:14 +02:00
}
2022-05-13 20:42:25 +02:00
func (s *Server) handleOptions(w http.ResponseWriter, _ *http.Request, _ *visitor) error {
w.Header().Set("Access-Control-Allow-Methods", "GET, PUT, POST, PATCH, DELETE")
2023-01-18 21:50:06 +01:00
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
w.Header().Set("Access-Control-Allow-Headers", "*") // CORS, allow auth via JS // FIXME is this terrible?
2021-10-24 20:22:53 +02:00
return nil
}
2023-05-13 20:39:31 +02:00
// topicFromPath returns the topic from a root path (e.g. /mytopic), creating it if it doesn't exist.
2021-12-15 15:41:55 +01:00
func (s *Server) topicFromPath(path string) (*topic, error) {
parts := strings.Split(path, "/")
if len(parts) < 2 {
2021-12-25 15:15:05 +01:00
return nil, errHTTPBadRequestTopicInvalid
2021-12-15 15:41:55 +01:00
}
return s.topicFromID(parts[1])
2021-11-15 13:56:58 +01:00
}
// topicsFromPath returns the topic from a root path (e.g. /mytopic,mytopic2), creating it if it doesn't exist.
2022-01-16 05:17:46 +01:00
func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
parts := strings.Split(path, "/")
if len(parts) < 2 {
return nil, "", errHTTPBadRequestTopicInvalid
}
topicIDs := util.SplitNoEmpty(parts[1], ",")
topics, err := s.topicsFromIDs(topicIDs...)
if err != nil {
return nil, "", errHTTPBadRequestTopicInvalid
}
return topics, parts[1], nil
}
2023-05-13 20:39:31 +02:00
// topicsFromIDs returns the topics with the given IDs, creating them if they don't exist.
2021-11-27 22:12:08 +01:00
func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
2021-10-23 03:26:01 +02:00
s.mu.Lock()
defer s.mu.Unlock()
2021-11-15 13:56:58 +01:00
topics := make([]*topic, 0)
2021-11-27 22:12:08 +01:00
for _, id := range ids {
2023-02-09 14:32:51 +01:00
if util.Contains(s.config.DisallowedTopics, id) {
2021-12-25 15:15:05 +01:00
return nil, errHTTPBadRequestTopicDisallowed
2021-12-09 04:13:59 +01:00
}
2021-11-15 13:56:58 +01:00
if _, ok := s.topics[id]; !ok {
2022-01-02 23:56:12 +01:00
if len(s.topics) >= s.config.TotalTopicLimit {
2022-01-12 17:05:04 +01:00
return nil, errHTTPTooManyRequestsLimitTotalTopics
2021-11-15 13:56:58 +01:00
}
2021-12-09 04:57:31 +01:00
s.topics[id] = newTopic(id)
2021-10-29 19:58:14 +02:00
}
2021-11-15 13:56:58 +01:00
topics = append(topics, s.topics[id])
2021-10-23 03:26:01 +02:00
}
2021-11-15 13:56:58 +01:00
return topics, nil
2021-10-23 03:26:01 +02:00
}
2023-05-13 20:39:31 +02:00
// topicFromID returns the topic with the given ID, creating it if it doesn't exist.
func (s *Server) topicFromID(id string) (*topic, error) {
topics, err := s.topicsFromIDs(id)
if err != nil {
return nil, err
}
return topics[0], nil
}
2023-05-13 20:39:31 +02:00
// topicsFromPattern returns a list of topics matching the given pattern, but it does not create them.
func (s *Server) topicsFromPattern(pattern string) ([]*topic, error) {
s.mu.RLock()
defer s.mu.RUnlock()
patternRegexp, err := regexp.Compile("^" + strings.ReplaceAll(pattern, "*", ".*") + "$")
if err != nil {
return nil, err
}
topics := make([]*topic, 0)
for _, t := range s.topics {
if patternRegexp.MatchString(t.ID) {
topics = append(topics, t)
}
}
return topics, nil
}
2021-12-27 22:06:40 +01:00
func (s *Server) runSMTPServer() error {
2022-06-02 05:24:44 +02:00
s.smtpServerBackend = newMailBackend(s.config, s.handle)
s.smtpServer = smtp.NewServer(s.smtpServerBackend)
2021-12-27 22:06:40 +01:00
s.smtpServer.Addr = s.config.SMTPServerListen
s.smtpServer.Domain = s.config.SMTPServerDomain
s.smtpServer.ReadTimeout = 10 * time.Second
s.smtpServer.WriteTimeout = 10 * time.Second
2021-12-28 01:26:20 +01:00
s.smtpServer.MaxMessageBytes = 1024 * 1024 // Must be much larger than message size (headers, multipart, etc.)
2021-12-27 22:06:40 +01:00
s.smtpServer.MaxRecipients = 1
s.smtpServer.AllowInsecureAuth = true
return s.smtpServer.ListenAndServe()
2021-12-27 15:48:09 +01:00
}
2021-12-15 15:13:16 +01:00
func (s *Server) runManager() {
2021-12-22 14:17:50 +01:00
for {
select {
case <-time.After(s.config.ManagerInterval):
2023-02-09 21:24:12 +01:00
log.
Tag(tagManager).
Timing(s.execManager).
Debug("Manager finished")
2021-12-22 14:17:50 +01:00
case <-s.closeChan:
return
2021-12-15 15:13:16 +01:00
}
2021-12-22 14:17:50 +01:00
}
2021-12-15 15:13:16 +01:00
}
// runStatsResetter runs once a day (usually midnight UTC) to reset all the visitor's message and
// email counters. The stats are used to display the counters in the web app, as well as for rate limiting.
2023-01-11 04:51:51 +01:00
func (s *Server) runStatsResetter() {
for {
runAt := util.NextOccurrenceUTC(s.config.VisitorStatsResetTime, time.Now())
timer := time.NewTimer(time.Until(runAt))
2023-02-04 04:21:50 +01:00
log.Tag(tagResetter).Debug("Waiting until %v to reset visitor stats", runAt)
2023-01-11 04:51:51 +01:00
select {
case <-timer.C:
2023-02-04 04:21:50 +01:00
log.Tag(tagResetter).Debug("Running stats resetter")
2023-01-11 04:51:51 +01:00
s.resetStats()
case <-s.closeChan:
2023-02-04 04:21:50 +01:00
log.Tag(tagResetter).Debug("Stopping stats resetter")
2023-01-11 04:51:51 +01:00
timer.Stop()
return
}
}
}
func (s *Server) resetStats() {
log.Info("Resetting all visitor stats (daily task)")
s.mu.Lock()
defer s.mu.Unlock() // Includes the database query to avoid races with other processes
for _, v := range s.visitors {
v.ResetStats()
}
if s.userManager != nil {
if err := s.userManager.ResetStats(); err != nil {
2023-02-04 04:21:50 +01:00
log.Tag(tagResetter).Warn("Failed to write to database: %s", err.Error())
2023-01-11 04:51:51 +01:00
}
}
}
2022-01-21 20:17:59 +01:00
func (s *Server) runFirebaseKeepaliver() {
2022-06-01 05:16:44 +02:00
if s.firebaseClient == nil {
2021-12-15 15:13:16 +01:00
return
}
2023-01-03 02:08:37 +01:00
v := newVisitor(s.config, s.messageCache, s.userManager, netip.IPv4Unspecified(), nil) // Background process, not a real visitor, uses IP 0.0.0.0
2021-12-15 15:13:16 +01:00
for {
2021-12-22 14:17:50 +01:00
select {
case <-time.After(s.config.FirebaseKeepaliveInterval):
2022-06-01 05:16:44 +02:00
s.sendToFirebase(v, newKeepaliveMessage(firebaseControlTopic))
2023-03-30 20:48:52 +02:00
/*
FIXME: Disable iOS polling entirely for now due to thundering herd problem (see #677)
To solve this, we'd have to shard the iOS poll topics to spread out the polling evenly.
Given that it's not really necessary to poll, turning it off for now should not have any impact.
case <-time.After(s.config.FirebasePollInterval):
s.sendToFirebase(v, newKeepaliveMessage(firebasePollTopic))
*/
2021-12-22 14:17:50 +01:00
case <-s.closeChan:
return
2021-12-15 15:13:16 +01:00
}
}
}
2021-12-22 14:17:50 +01:00
2022-06-02 05:24:44 +02:00
func (s *Server) runDelayedSender() {
for {
select {
case <-time.After(s.config.DelayedSenderInterval):
if err := s.sendDelayedMessages(); err != nil {
2023-02-04 04:21:50 +01:00
log.Tag(tagPublish).Err(err).Warn("Error sending delayed messages")
2022-06-02 05:24:44 +02:00
}
case <-s.closeChan:
return
}
}
}
func (s *Server) sendDelayedMessages() error {
2022-02-27 20:47:28 +01:00
messages, err := s.messageCache.MessagesDue()
if err != nil {
return err
}
for _, m := range messages {
2023-01-27 04:57:18 +01:00
var u *user.User
2022-12-29 17:09:45 +01:00
if s.userManager != nil && m.User != "" {
u, err = s.userManager.UserByID(m.User)
2022-12-22 03:55:39 +01:00
if err != nil {
2023-02-06 05:34:27 +01:00
log.With(m).Err(err).Warn("Error sending delayed message")
2022-12-22 03:55:39 +01:00
continue
}
}
2023-01-27 04:57:18 +01:00
v := s.visitor(m.Sender, u)
2022-06-01 02:38:56 +02:00
if err := s.sendDelayedMessage(v, m); err != nil {
2023-02-04 04:21:50 +01:00
logvm(v, m).Err(err).Warn("Error sending delayed message")
2022-01-10 21:36:12 +01:00
}
2022-06-01 02:38:56 +02:00
}
return nil
}
func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
2023-02-04 04:21:50 +01:00
logvm(v, m).Debug("Sending delayed message")
2023-04-21 04:04:11 +02:00
s.mu.RLock()
2022-06-01 02:38:56 +02:00
t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
2023-04-21 04:04:11 +02:00
s.mu.RUnlock()
2022-06-01 02:38:56 +02:00
if ok {
go func() {
// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
if err := t.Publish(v, m); err != nil {
2023-02-04 04:21:50 +01:00
logvm(v, m).Err(err).Warn("Unable to publish message")
}
}()
2022-06-01 02:38:56 +02:00
}
2022-06-01 05:16:44 +02:00
if s.firebaseClient != nil { // Firebase subscribers may not show up in topics map
go s.sendToFirebase(v, m)
}
if s.config.UpstreamBaseURL != "" {
go s.forwardPollRequest(v, m)
}
2022-06-01 02:38:56 +02:00
if err := s.messageCache.MarkPublished(m); err != nil {
return err
}
return nil
}
2022-04-03 18:39:52 +02:00
// transformBodyJSON peeks the request body, reads the JSON, and converts it to headers
2022-03-16 19:16:54 +01:00
// before passing it on to the next handler. This is meant to be used in combination with handlePublish.
func (s *Server) transformBodyJSON(next handleFunc) handleFunc {
2022-03-15 21:00:59 +01:00
return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
2023-01-28 05:10:59 +01:00
m, err := readJSONWithLimit[publishMessage](r.Body, s.config.MessageLimit*2, false) // 2x to account for JSON format overhead
2022-03-15 21:00:59 +01:00
if err != nil {
return err
}
if !topicRegex.MatchString(m.Topic) {
2022-03-16 19:16:54 +01:00
return errHTTPBadRequestTopicInvalid
2022-03-15 21:00:59 +01:00
}
if m.Message == "" {
m.Message = emptyMessageBody
}
r.URL.Path = "/" + m.Topic
r.Body = io.NopCloser(strings.NewReader(m.Message))
if m.Title != "" {
r.Header.Set("X-Title", m.Title)
}
2022-03-16 19:16:54 +01:00
if m.Priority != 0 {
r.Header.Set("X-Priority", fmt.Sprintf("%d", m.Priority))
2022-03-15 21:00:59 +01:00
}
2022-03-16 19:16:54 +01:00
if m.Tags != nil && len(m.Tags) > 0 {
r.Header.Set("X-Tags", strings.Join(m.Tags, ","))
2022-03-15 21:00:59 +01:00
}
if m.Attach != "" {
r.Header.Set("X-Attach", m.Attach)
}
if m.Filename != "" {
r.Header.Set("X-Filename", m.Filename)
}
if m.Click != "" {
r.Header.Set("X-Click", m.Click)
}
2022-07-16 21:31:03 +02:00
if m.Icon != "" {
r.Header.Set("X-Icon", m.Icon)
}
2022-04-16 22:17:58 +02:00
if len(m.Actions) > 0 {
actionsStr, err := json.Marshal(m.Actions)
if err != nil {
2022-12-29 15:57:42 +01:00
return errHTTPBadRequestMessageJSONInvalid
2022-04-16 22:17:58 +02:00
}
r.Header.Set("X-Actions", string(actionsStr))
}
if m.Email != "" {
r.Header.Set("X-Email", m.Email)
}
if m.Delay != "" {
r.Header.Set("X-Delay", m.Delay)
}
if m.Call != "" {
r.Header.Set("X-Call", m.Call)
}
2022-03-15 21:00:59 +01:00
return next(w, r, v)
}
}
2022-06-14 04:07:30 +02:00
func (s *Server) transformMatrixJSON(next handleFunc) handleFunc {
return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
newRequest, err := newRequestFromMatrixJSON(r, s.config.BaseURL, s.config.MessageLimit)
2022-06-14 04:07:30 +02:00
if err != nil {
logvr(v, r).Tag(tagMatrix).Err(err).Debug("Invalid Matrix request")
if e, ok := err.(*errMatrixPushkeyRejected); ok {
return writeMatrixResponse(w, e.rejectedPushKey)
}
2022-06-14 04:07:30 +02:00
return err
}
if err := next(w, newRequest, v); err != nil {
logvr(v, r).Tag(tagMatrix).Err(err).Debug("Error handling Matrix request")
return err
2022-06-14 04:07:30 +02:00
}
return nil
}
}
2022-12-03 21:20:59 +01:00
func (s *Server) authorizeTopicWrite(next handleFunc) handleFunc {
return s.autorizeTopic(next, user.PermissionWrite)
2022-01-22 04:22:27 +01:00
}
2022-12-03 21:20:59 +01:00
func (s *Server) authorizeTopicRead(next handleFunc) handleFunc {
return s.autorizeTopic(next, user.PermissionRead)
2022-01-22 04:22:27 +01:00
}
func (s *Server) autorizeTopic(next handleFunc, perm user.Permission) handleFunc {
2022-01-22 04:22:27 +01:00
return func(w http.ResponseWriter, r *http.Request, v *visitor) error {
if s.userManager == nil {
2022-01-22 04:22:27 +01:00
return next(w, r, v)
}
2022-01-27 18:49:05 +01:00
topics, _, err := s.topicsFromPath(r.URL.Path)
2022-01-22 04:22:27 +01:00
if err != nil {
return err
}
2023-01-28 15:03:14 +01:00
u := v.User()
2022-01-27 18:49:05 +01:00
for _, t := range topics {
2023-01-28 15:03:14 +01:00
if err := s.userManager.Authorize(u, t.ID, perm); err != nil {
2023-02-26 02:23:22 +01:00
logvr(v, r).With(t).Err(err).Debug("Access to topic %s not authorized", t.ID)
return errHTTPForbidden.With(t)
2022-01-27 18:49:05 +01:00
}
2022-01-22 04:22:27 +01:00
}
return next(w, r, v)
}
}
2023-02-08 21:20:44 +01:00
// maybeAuthenticate reads the "Authorization" header and will try to authenticate the user
// if it is set.
//
2023-03-07 05:12:46 +01:00
// - If auth-file is not configured, immediately return an IP-based visitor
// - If the header is not set or not supported (anything non-Basic and non-Bearer),
// an IP-based visitor is returned
2023-02-08 21:20:44 +01:00
// - If the header is set, authenticate will be called to check the username/password (Basic auth),
// or the token (Bearer auth), and read the user from the database
//
// This function will ALWAYS return a visitor, even if an error occurs (e.g. unauthorized), so
// that subsequent logging calls still have a visitor context.
func (s *Server) maybeAuthenticate(r *http.Request) (*visitor, error) {
// Read "Authorization" header value, and exit out early if it's not set
2022-12-22 03:55:39 +01:00
ip := extractIPAddress(r, s.config.BehindProxy)
2023-02-08 21:20:44 +01:00
vip := s.visitor(ip, nil)
if s.userManager == nil {
return vip, nil
}
2023-02-08 21:20:44 +01:00
header, err := readAuthHeader(r)
if err != nil {
return vip, err
} else if !supportedAuthHeader(header) {
2023-02-08 21:20:44 +01:00
return vip, nil
2022-12-03 21:20:59 +01:00
}
2023-02-08 21:20:44 +01:00
// If we're trying to auth, check the rate limiter first
if !vip.AuthAllowed() {
return vip, errHTTPTooManyRequestsLimitAuthFailure // Always return visitor, even when error occurs!
}
u, err := s.authenticate(r, header)
if err != nil {
vip.AuthFailed()
logr(r).Err(err).Debug("Authentication failed")
return vip, errHTTPUnauthorized // Always return visitor, even when error occurs!
}
// Authentication with user was successful
return s.visitor(ip, u), nil
2022-12-02 21:37:48 +01:00
}
// authenticate a user based on basic auth username/password (Authorization: Basic ...), or token auth (Authorization: Bearer ...).
// The Authorization header can be passed as a header or the ?auth=... query param. The latter is required only to
// support the WebSocket JavaScript class, which does not support passing headers during the initial request. The auth
2023-02-08 21:20:44 +01:00
// query param is effectively doubly base64 encoded. Its format is base64(Basic base64(user:pass)).
func (s *Server) authenticate(r *http.Request, header string) (user *user.User, err error) {
if strings.HasPrefix(header, "Bearer") {
return s.authenticateBearerAuth(r, strings.TrimSpace(strings.TrimPrefix(header, "Bearer")))
}
return s.authenticateBasicAuth(r, header)
}
// readAuthHeader reads the raw value of the Authorization header, either from the actual HTTP header,
// or from the ?auth... query parameter
func readAuthHeader(r *http.Request) (string, error) {
2022-12-28 04:14:14 +01:00
value := strings.TrimSpace(r.Header.Get("Authorization"))
2022-12-03 21:20:59 +01:00
queryParam := readQueryParam(r, "authorization", "auth")
if queryParam != "" {
a, err := base64.RawURLEncoding.DecodeString(queryParam)
if err != nil {
2023-02-08 21:20:44 +01:00
return "", err
2022-12-03 21:20:59 +01:00
}
2022-12-28 04:14:14 +01:00
value = strings.TrimSpace(string(a))
2022-12-03 21:20:59 +01:00
}
2023-02-08 21:20:44 +01:00
return value, nil
2022-12-03 21:20:59 +01:00
}
// supportedAuthHeader returns true only if the Authorization header value starts
// with "Basic" or "Bearer". In particular, an empty value is not supported, and neither
// are things like "WebPush", or "vapid" (see #629).
func supportedAuthHeader(value string) bool {
value = strings.ToLower(value)
return strings.HasPrefix(value, "basic ") || strings.HasPrefix(value, "bearer ")
}
func (s *Server) authenticateBasicAuth(r *http.Request, value string) (user *user.User, err error) {
2022-12-03 21:20:59 +01:00
r.Header.Set("Authorization", value)
username, password, ok := r.BasicAuth()
if !ok {
return nil, errors.New("invalid basic auth")
2023-01-29 22:15:08 +01:00
} else if username == "" {
return s.authenticateBearerAuth(r, password) // Treat password as token
2022-12-03 21:20:59 +01:00
}
return s.userManager.Authenticate(username, password)
2022-12-03 21:20:59 +01:00
}
2023-01-29 22:15:08 +01:00
func (s *Server) authenticateBearerAuth(r *http.Request, token string) (*user.User, error) {
2023-01-29 02:29:06 +01:00
u, err := s.userManager.AuthenticateToken(token)
if err != nil {
return nil, err
}
ip := extractIPAddress(r, s.config.BehindProxy)
go s.userManager.EnqueueTokenUpdate(token, &user.TokenUpdate{
LastAccess: time.Now(),
LastOrigin: ip,
})
return u, nil
2022-12-03 21:20:59 +01:00
}
2023-01-27 04:57:18 +01:00
func (s *Server) visitor(ip netip.Addr, user *user.User) *visitor {
2022-12-02 21:37:48 +01:00
s.mu.Lock()
defer s.mu.Unlock()
2023-01-27 04:57:18 +01:00
id := visitorID(ip, user)
v, exists := s.visitors[id]
2022-12-02 21:37:48 +01:00
if !exists {
2023-01-27 04:57:18 +01:00
s.visitors[id] = newVisitor(s.config, s.messageCache, s.userManager, ip, user)
return s.visitors[id]
2022-12-02 21:37:48 +01:00
}
v.Keepalive()
2023-02-08 21:20:44 +01:00
v.SetUser(user) // Always update with the latest user, may be nil!
2022-12-02 21:37:48 +01:00
return v
}
2023-01-18 21:50:06 +01:00
func (s *Server) writeJSON(w http.ResponseWriter, v any) error {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests
if err := json.NewEncoder(w).Encode(v); err != nil {
return err
}
return nil
}
2023-04-21 17:09:13 +02:00
func (s *Server) updateAndWriteStats(messagesCount int64) {
s.mu.Lock()
s.messagesHistory = append(s.messagesHistory, messagesCount)
if len(s.messagesHistory) > messagesHistoryMax {
s.messagesHistory = s.messagesHistory[1:]
}
s.mu.Unlock()
go func() {
if err := s.messageCache.UpdateStats(messagesCount); err != nil {
log.Tag(tagManager).Err(err).Warn("Cannot write messages stats")
}
}()
}