diff --git a/server/config.go b/server/config.go index 3de4bd70..7f15aedc 100644 --- a/server/config.go +++ b/server/config.go @@ -67,7 +67,7 @@ type Config struct { KeepaliveInterval time.Duration ManagerInterval time.Duration WebRootIsApp bool - AtSenderInterval time.Duration + DelayedSenderInterval time.Duration FirebaseKeepaliveInterval time.Duration FirebasePollInterval time.Duration FirebaseQuotaLimitPenaltyDuration time.Duration @@ -120,7 +120,7 @@ func NewConfig() *Config { MessageLimit: DefaultMessageLengthLimit, MinDelay: DefaultMinDelay, MaxDelay: DefaultMaxDelay, - AtSenderInterval: DefaultAtSenderInterval, + DelayedSenderInterval: DefaultAtSenderInterval, FirebaseKeepaliveInterval: DefaultFirebaseKeepaliveInterval, FirebasePollInterval: DefaultFirebasePollInterval, FirebaseQuotaLimitPenaltyDuration: DefaultFirebaseQuotaLimitPenaltyDuration, diff --git a/server/message_cache.go b/server/message_cache.go index b55c34ba..4dc83bdf 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -36,7 +36,7 @@ const ( attachment_size INT NOT NULL, attachment_expires INT NOT NULL, attachment_url TEXT NOT NULL, - attachment_owner TEXT NOT NULL, + sender TEXT NOT NULL, encoding TEXT NOT NULL, published INT NOT NULL ); @@ -45,37 +45,37 @@ const ( COMMIT; ` insertMessageQuery = ` - INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published) + INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1` selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?` selectMessagesSinceTimeQuery = ` - SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding FROM messages WHERE topic = ? AND time >= ? AND published = 1 ORDER BY time, id ` selectMessagesSinceTimeIncludeScheduledQuery = ` - SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding FROM messages WHERE topic = ? AND time >= ? ORDER BY time, id ` selectMessagesSinceIDQuery = ` - SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding FROM messages WHERE topic = ? AND id > ? AND published = 1 ORDER BY time, id ` selectMessagesSinceIDIncludeScheduledQuery = ` - SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding FROM messages WHERE topic = ? AND (id > ? OR published = 0) ORDER BY time, id ` selectMessagesDueQuery = ` - SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding + SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding FROM messages WHERE time <= ? AND published = 0 ORDER BY time, id @@ -84,13 +84,13 @@ const ( selectMessagesCountQuery = `SELECT COUNT(*) FROM messages` selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?` selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` - selectAttachmentsSizeQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE attachment_owner = ? AND attachment_expires >= ?` + selectAttachmentsSizeQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE sender = ? AND attachment_expires >= ?` selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?` ) // Schema management queries const ( - currentSchemaVersion = 6 + currentSchemaVersion = 7 createSchemaVersionTableQuery = ` CREATE TABLE IF NOT EXISTS schemaVersion ( id INT PRIMARY KEY, @@ -173,6 +173,11 @@ const ( migrate5To6AlterMessagesTableQuery = ` ALTER TABLE messages ADD COLUMN actions TEXT NOT NULL DEFAULT(''); ` + + // 6 -> 7 + migrate6To7AlterMessagesTableQuery = ` + ALTER TABLE messages RENAME COLUMN attachment_owner TO sender; + ` ) type messageCache struct { @@ -225,7 +230,7 @@ func (c *messageCache) AddMessage(m *message) error { } published := m.Time <= time.Now().Unix() tags := strings.Join(m.Tags, ",") - var attachmentName, attachmentType, attachmentURL, attachmentOwner string + var attachmentName, attachmentType, attachmentURL string var attachmentSize, attachmentExpires int64 if m.Attachment != nil { attachmentName = m.Attachment.Name @@ -233,7 +238,6 @@ func (c *messageCache) AddMessage(m *message) error { attachmentSize = m.Attachment.Size attachmentExpires = m.Attachment.Expires attachmentURL = m.Attachment.URL - attachmentOwner = m.Attachment.Owner } var actionsStr string if len(m.Actions) > 0 { @@ -259,7 +263,7 @@ func (c *messageCache) AddMessage(m *message) error { attachmentSize, attachmentExpires, attachmentURL, - attachmentOwner, + m.Sender, m.Encoding, published, ) @@ -371,8 +375,8 @@ func (c *messageCache) Prune(olderThan time.Time) error { return err } -func (c *messageCache) AttachmentBytesUsed(owner string) (int64, error) { - rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix()) +func (c *messageCache) AttachmentBytesUsed(sender string) (int64, error) { + rows, err := c.db.Query(selectAttachmentsSizeQuery, sender, time.Now().Unix()) if err != nil { return 0, err } @@ -415,7 +419,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) { for rows.Next() { var timestamp, attachmentSize, attachmentExpires int64 var priority int - var id, topic, msg, title, tagsStr, click, actionsStr, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string + var id, topic, msg, title, tagsStr, click, actionsStr, attachmentName, attachmentType, attachmentURL, sender, encoding string err := rows.Scan( &id, ×tamp, @@ -431,7 +435,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) { &attachmentSize, &attachmentExpires, &attachmentURL, - &attachmentOwner, + &sender, &encoding, ) if err != nil { @@ -455,7 +459,6 @@ func readMessages(rows *sql.Rows) ([]*message, error) { Size: attachmentSize, Expires: attachmentExpires, URL: attachmentURL, - Owner: attachmentOwner, } } messages = append(messages, &message{ @@ -470,6 +473,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) { Click: click, Actions: actions, Attachment: att, + Sender: sender, Encoding: encoding, }) } @@ -516,6 +520,8 @@ func setupCacheDB(db *sql.DB) error { return migrateFrom4(db) } else if schemaVersion == 5 { return migrateFrom5(db) + } else if schemaVersion == 6 { + return migrateFrom6(db) } return fmt.Errorf("unexpected schema version found: %d", schemaVersion) } @@ -599,5 +605,16 @@ func migrateFrom5(db *sql.DB) error { if _, err := db.Exec(updateSchemaVersion, 6); err != nil { return err } + return migrateFrom6(db) +} + +func migrateFrom6(db *sql.DB) error { + log.Print("Migrating cache database schema: from 6 to 7") + if _, err := db.Exec(migrate6To7AlterMessagesTableQuery); err != nil { + return err + } + if _, err := db.Exec(updateSchemaVersion, 7); err != nil { + return err + } return nil // Update this when a new version is added } diff --git a/server/message_cache_test.go b/server/message_cache_test.go index cb888b42..398f21e4 100644 --- a/server/message_cache_test.go +++ b/server/message_cache_test.go @@ -281,39 +281,39 @@ func testCacheAttachments(t *testing.T, c *messageCache) { expires1 := time.Now().Add(-4 * time.Hour).Unix() m := newDefaultMessage("mytopic", "flower for you") m.ID = "m1" + m.Sender = "1.2.3.4" m.Attachment = &attachment{ Name: "flower.jpg", Type: "image/jpeg", Size: 5000, Expires: expires1, URL: "https://ntfy.sh/file/AbDeFgJhal.jpg", - Owner: "1.2.3.4", } require.Nil(t, c.AddMessage(m)) expires2 := time.Now().Add(2 * time.Hour).Unix() // Future m = newDefaultMessage("mytopic", "sending you a car") m.ID = "m2" + m.Sender = "1.2.3.4" m.Attachment = &attachment{ Name: "car.jpg", Type: "image/jpeg", Size: 10000, Expires: expires2, URL: "https://ntfy.sh/file/aCaRURL.jpg", - Owner: "1.2.3.4", } require.Nil(t, c.AddMessage(m)) expires3 := time.Now().Add(1 * time.Hour).Unix() // Future m = newDefaultMessage("another-topic", "sending you another car") m.ID = "m3" + m.Sender = "1.2.3.4" m.Attachment = &attachment{ Name: "another-car.jpg", Type: "image/jpeg", Size: 20000, Expires: expires3, URL: "https://ntfy.sh/file/zakaDHFW.jpg", - Owner: "1.2.3.4", } require.Nil(t, c.AddMessage(m)) @@ -327,7 +327,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) { require.Equal(t, int64(5000), messages[0].Attachment.Size) require.Equal(t, expires1, messages[0].Attachment.Expires) require.Equal(t, "https://ntfy.sh/file/AbDeFgJhal.jpg", messages[0].Attachment.URL) - require.Equal(t, "1.2.3.4", messages[0].Attachment.Owner) + require.Equal(t, "1.2.3.4", messages[0].Sender) require.Equal(t, "sending you a car", messages[1].Message) require.Equal(t, "car.jpg", messages[1].Attachment.Name) @@ -335,7 +335,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) { require.Equal(t, int64(10000), messages[1].Attachment.Size) require.Equal(t, expires2, messages[1].Attachment.Expires) require.Equal(t, "https://ntfy.sh/file/aCaRURL.jpg", messages[1].Attachment.URL) - require.Equal(t, "1.2.3.4", messages[1].Attachment.Owner) + require.Equal(t, "1.2.3.4", messages[1].Sender) size, err := c.AttachmentBytesUsed("1.2.3.4") require.Nil(t, err) diff --git a/server/server.go b/server/server.go index 2baa3666..7384ab47 100644 --- a/server/server.go +++ b/server/server.go @@ -443,7 +443,7 @@ func (s *Server) handlePublish(w http.ResponseWriter, r *http.Request, v *visito if s.mailer != nil && email != "" && !delayed { go s.sendEmail(v, m, email) } - if s.config.UpstreamBaseURL != "" { + if s.config.UpstreamBaseURL != "" && !delayed { go s.forwardPollRequest(v, m) } if cache { @@ -484,7 +484,10 @@ func (s *Server) forwardPollRequest(v *visitor, m *message) { return } req.Header.Set("X-Poll-ID", m.ID) - response, err := http.DefaultClient.Do(req) + var httpClient = &http.Client{ + Timeout: time.Second * 10, + } + response, err := httpClient.Do(req) if err != nil { log.Printf("[%s] FWD - Unable to forward poll request: %v", v.ip, err.Error()) return @@ -566,6 +569,7 @@ func (s *Server) parsePublishParams(r *http.Request, v *visitor, m *message) (ca return false, false, "", false, errHTTPBadRequestDelayTooLarge } m.Time = delay.Unix() + m.Sender = v.ip // Important for rate limiting } actionsStr := readParam(r, "x-actions", "actions", "action") if actionsStr != "" { @@ -661,7 +665,7 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *message, m.Attachment = &attachment{} } var ext string - m.Attachment.Owner = v.ip // Important for attachment rate limiting + m.Sender = v.ip // Important for attachment rate limiting m.Attachment.Expires = time.Now().Add(s.config.AttachmentExpiryDuration).Unix() m.Attachment.Type, ext = util.DetectContentType(body.PeekedBytes, m.Attachment.Name) m.Attachment.URL = fmt.Sprintf("%s/file/%s%s", s.config.BaseURL, m.ID, ext) @@ -1081,7 +1085,7 @@ func (s *Server) runManager() { func (s *Server) runDelayedSender() { for { select { - case <-time.After(s.config.AtSenderInterval): + case <-time.After(s.config.DelayedSenderInterval): if err := s.sendDelayedMessages(); err != nil { log.Printf("error sending scheduled messages: %s", err.Error()) } @@ -1118,7 +1122,7 @@ func (s *Server) sendDelayedMessages() error { return err } for _, m := range messages { - v := s.visitorFromIP("0.0.0.0") // FIXME: get message owner!! + v := s.visitorFromIP(m.Sender) if err := s.sendDelayedMessage(v, m); err != nil { log.Printf("error sending delayed message: %s", err.Error()) } @@ -1131,14 +1135,18 @@ func (s *Server) sendDelayedMessage(v *visitor, m *message) error { defer s.mu.Unlock() t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published if ok { - if err := t.Publish(v, m); err != nil { - return fmt.Errorf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error()) - } + go func() { + // We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler + if err := t.Publish(v, m); err != nil { + log.Printf("unable to publish message %s to topic %s: %v", m.ID, m.Topic, err.Error()) + } + }() } if s.firebase != nil { // Firebase subscribers may not show up in topics map - if err := s.firebase(v, m); err != nil { - return fmt.Errorf("unable to publish to Firebase: %v", err.Error()) - } + go s.sendToFirebase(v, m) + } + if s.config.UpstreamBaseURL != "" { + go s.forwardPollRequest(v, m) } if err := s.messageCache.MarkPublished(m); err != nil { return err diff --git a/server/server_firebase_test.go b/server/server_firebase_test.go index f3904fac..6ad6fde9 100644 --- a/server/server_firebase_test.go +++ b/server/server_firebase_test.go @@ -119,7 +119,6 @@ func TestToFirebaseMessage_Message_Normal_Allowed(t *testing.T) { Size: 12345, Expires: 98765543, URL: "https://example.com/file.jpg", - Owner: "some-owner", } fbm, err := toFirebaseMessage(m, &testAuther{Allow: true}) require.Nil(t, err) diff --git a/server/server_test.go b/server/server_test.go index 5e23e47e..1fec1f56 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -264,7 +264,7 @@ func TestServer_PublishNoCache(t *testing.T) { func TestServer_PublishAt(t *testing.T) { c := newTestConfig(t) c.MinDelay = time.Second - c.AtSenderInterval = 100 * time.Millisecond + c.DelayedSenderInterval = 100 * time.Millisecond s := newTestServer(t, c) response := request(t, s, "PUT", "/mytopic", "a message", map[string]string{ @@ -283,6 +283,13 @@ func TestServer_PublishAt(t *testing.T) { messages = toMessages(t, response.Body.String()) require.Equal(t, 1, len(messages)) require.Equal(t, "a message", messages[0].Message) + require.Equal(t, "", messages[0].Sender) // Never return the sender! + + messages, err := s.messageCache.Messages("mytopic", sinceAllMessages, true) + require.Nil(t, err) + require.Equal(t, 1, len(messages)) + require.Equal(t, "a message", messages[0].Message) + require.Equal(t, "9.9.9.9", messages[0].Sender) // It's stored in the DB though! } func TestServer_PublishAtWithCacheError(t *testing.T) { @@ -1019,7 +1026,7 @@ func TestServer_PublishAttachment(t *testing.T) { require.Equal(t, int64(5000), msg.Attachment.Size) require.GreaterOrEqual(t, msg.Attachment.Expires, time.Now().Add(179*time.Minute).Unix()) // Almost 3 hours require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/") - require.Equal(t, "", msg.Attachment.Owner) // Should never be returned + require.Equal(t, "", msg.Sender) // Should never be returned require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, msg.ID)) path := strings.TrimPrefix(msg.Attachment.URL, "http://127.0.0.1:12345") @@ -1048,7 +1055,7 @@ func TestServer_PublishAttachmentShortWithFilename(t *testing.T) { require.Equal(t, int64(21), msg.Attachment.Size) require.GreaterOrEqual(t, msg.Attachment.Expires, time.Now().Add(3*time.Hour).Unix()) require.Contains(t, msg.Attachment.URL, "http://127.0.0.1:12345/file/") - require.Equal(t, "", msg.Attachment.Owner) // Should never be returned + require.Equal(t, "", msg.Sender) // Should never be returned require.FileExists(t, filepath.Join(s.config.AttachmentCacheDir, msg.ID)) path := strings.TrimPrefix(msg.Attachment.URL, "http://127.0.0.1:12345") @@ -1075,7 +1082,7 @@ func TestServer_PublishAttachmentExternalWithoutFilename(t *testing.T) { require.Equal(t, "", msg.Attachment.Type) require.Equal(t, int64(0), msg.Attachment.Size) require.Equal(t, int64(0), msg.Attachment.Expires) - require.Equal(t, "", msg.Attachment.Owner) + require.Equal(t, "", msg.Sender) // Slightly unrelated cross-test: make sure we don't add an owner for external attachments size, err := s.messageCache.AttachmentBytesUsed("127.0.0.1") @@ -1096,7 +1103,7 @@ func TestServer_PublishAttachmentExternalWithFilename(t *testing.T) { require.Equal(t, "", msg.Attachment.Type) require.Equal(t, int64(0), msg.Attachment.Size) require.Equal(t, int64(0), msg.Attachment.Expires) - require.Equal(t, "", msg.Attachment.Owner) + require.Equal(t, "", msg.Sender) } func TestServer_PublishAttachmentBadURL(t *testing.T) { diff --git a/server/types.go b/server/types.go index 6a69338c..bb8e32a3 100644 --- a/server/types.go +++ b/server/types.go @@ -32,6 +32,7 @@ type message struct { Actions []*action `json:"actions,omitempty"` Attachment *attachment `json:"attachment,omitempty"` PollID string `json:"poll_id,omitempty"` + Sender string `json:"-"` // IP address of uploader, used for rate limiting Encoding string `json:"encoding,omitempty"` // empty for raw UTF-8, or "base64" for encoded bytes } @@ -41,7 +42,6 @@ type attachment struct { Size int64 `json:"size,omitempty"` Expires int64 `json:"expires,omitempty"` URL string `json:"url"` - Owner string `json:"-"` // IP address of uploader, used for rate limiting } type action struct {