repos / pico

pico services - prose.sh, pastes.sh, imgs.sh, feeds.sh, pgs.sh
git clone https://github.com/picosh/pico.git

commit
d520b3d
parent
60df5b5
author
Eric Bower
date
2023-03-27 20:23:44 +0000 UTC
chore(feeds): feed_items table to store feed items (#16)

Some RSS feeds are technically invalid but still want to be fetched.  In
these cases we want to make sure we can still fetch them and properly
handle when the feed item does not contain correct tags for dates.

Previously we used `published` and internally `last_digest` to determine
if a feed has already been sent to the user.  Well some feed items do
not contain `published` so we would always send the feed item to the
user in their email digest.

Now we are storing the feed items we have fetched in a separate table
along with some of its metadata in order to properly mark a feed item as
"already sent."  Now it doesn't matter if the feed item has the correct
tags or not, we will try to fetch the items, send them to the user, and
them mark them as already sent.
5 files changed,  +190, -29
M Makefile
+2, -1
 1@@ -76,10 +76,11 @@ migrate:
 2 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20221108_add_expires_at_to_posts.sql
 3 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20221112_add_feeds_space.sql
 4 	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20230310_add_aliases_table.sql
 5+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20230326_add_feed_items.sql
 6 .PHONY: migrate
 7 
 8 latest:
 9-	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20230310_add_aliases_table.sql
10+	$(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20230326_add_feed_items.sql
11 .PHONY: latest
12 
13 psql:
M db/db.go
+36, -0
 1@@ -48,6 +48,31 @@ func (p *PostData) Scan(value interface{}) error {
 2 	return json.Unmarshal(b, &p)
 3 }
 4 
 5+type FeedItemData struct {
 6+	Title       string     `json:"title"`
 7+	Description string     `json:"description"`
 8+	Content     string     `json:"content"`
 9+	Link        string     `json:"link"`
10+	PublishedAt *time.Time `json:"published_at"`
11+}
12+
13+// Make the Attrs struct implement the driver.Valuer interface. This method
14+// simply returns the JSON-encoded representation of the struct.
15+func (p FeedItemData) Value() (driver.Value, error) {
16+	return json.Marshal(p)
17+}
18+
19+// Make the Attrs struct implement the sql.Scanner interface. This method
20+// simply decodes a JSON-encoded value into the struct fields.
21+func (p *FeedItemData) Scan(value interface{}) error {
22+	b, ok := value.([]byte)
23+	if !ok {
24+		return errors.New("type assertion to []byte failed")
25+	}
26+
27+	return json.Unmarshal(b, &p)
28+}
29+
30 type Post struct {
31 	ID          string     `json:"id"`
32 	UserID      string     `json:"user_id"`
33@@ -97,6 +122,14 @@ type Pager struct {
34 	Page int
35 }
36 
37+type FeedItem struct {
38+	ID        string
39+	PostID    string
40+	GUID      string
41+	Data      FeedItemData
42+	CreatedAt *time.Time
43+}
44+
45 type ErrMultiplePublicKeys struct{}
46 
47 func (m *ErrMultiplePublicKeys) Error() string {
48@@ -163,5 +196,8 @@ type DB interface {
49 	HasFeatureForUser(userID string, feature string) bool
50 	FindTotalSizeForUser(userID string) (int, error)
51 
52+	InsertFeedItems(postID string, items []*FeedItem) error
53+	FindFeedItemsByPostID(postID string) ([]*FeedItem, error)
54+
55 	Close() error
56 }
M db/postgres/storage.go
+63, -4
 1@@ -214,7 +214,8 @@ const (
 2 	GROUP BY name
 3 	ORDER BY tally DESC
 4 	LIMIT 5`
 5-	sqlSelectTagsForPost = `SELECT name FROM post_tags WHERE post_id=$1`
 6+	sqlSelectTagsForPost     = `SELECT name FROM post_tags WHERE post_id=$1`
 7+	sqlSelectFeedItemsByPost = `SELECT id, post_id, guid, data, created_at FROM feed_items WHERE post_id=$1`
 8 
 9 	sqlInsertPublicKey = `INSERT INTO public_keys (user_id, public_key) VALUES ($1, $2)`
10 	sqlInsertPost      = `
11@@ -223,9 +224,10 @@ const (
12 		file_size, mime_type, shasum, data, expires_at)
13 	VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
14 	RETURNING id`
15-	sqlInsertUser    = `INSERT INTO app_users DEFAULT VALUES returning id`
16-	sqlInsertTag     = `INSERT INTO post_tags (post_id, name) VALUES($1, $2) RETURNING id;`
17-	sqlInsertAliases = `INSERT INTO post_aliases (post_id, slug) VALUES($1, $2) RETURNING id;`
18+	sqlInsertUser      = `INSERT INTO app_users DEFAULT VALUES returning id`
19+	sqlInsertTag       = `INSERT INTO post_tags (post_id, name) VALUES($1, $2) RETURNING id;`
20+	sqlInsertAliases   = `INSERT INTO post_aliases (post_id, slug) VALUES($1, $2) RETURNING id;`
21+	sqlInsertFeedItems = `INSERT INTO feed_items (post_id, guid, data) VALUES ($1, $2, $3) RETURNING id;`
22 
23 	sqlUpdatePost = `
24 	UPDATE posts
25@@ -1113,3 +1115,60 @@ func (me *PsqlDB) FindTotalSizeForUser(userID string) (int, error) {
26 	}
27 	return fileSize, nil
28 }
29+
30+func (me *PsqlDB) InsertFeedItems(postID string, items []*db.FeedItem) error {
31+	ctx := context.Background()
32+	tx, err := me.Db.BeginTx(ctx, nil)
33+	if err != nil {
34+		return err
35+	}
36+	defer func() {
37+		err = tx.Rollback()
38+	}()
39+
40+	for _, item := range items {
41+		_, err := tx.Exec(
42+			sqlInsertFeedItems,
43+			item.PostID,
44+			item.GUID,
45+			item.Data,
46+		)
47+		if err != nil {
48+			return err
49+		}
50+	}
51+
52+	err = tx.Commit()
53+	return err
54+}
55+
56+func (me *PsqlDB) FindFeedItemsByPostID(postID string) ([]*db.FeedItem, error) {
57+	// sqlSelectFeedItemsByPost
58+	items := make([]*db.FeedItem, 0)
59+	rs, err := me.Db.Query(sqlSelectFeedItemsByPost, postID)
60+	if err != nil {
61+		return items, err
62+	}
63+
64+	for rs.Next() {
65+		item := &db.FeedItem{}
66+		err := rs.Scan(
67+			&item.ID,
68+			&item.PostID,
69+			&item.GUID,
70+			&item.Data,
71+			&item.CreatedAt,
72+		)
73+		if err != nil {
74+			return items, err
75+		}
76+
77+		items = append(items, item)
78+	}
79+
80+	if rs.Err() != nil {
81+		return items, rs.Err()
82+	}
83+
84+	return items, nil
85+}
M feeds/cron.go
+76, -24
  1@@ -39,9 +39,11 @@ var httpClient = http.Client{
  2 	},
  3 }
  4 
  5-type FeedItem struct {
  6+type FeedItemTmpl struct {
  7+	GUID        string
  8 	Title       string
  9 	Link        string
 10+	PublishedAt *time.Time
 11 	Content     html.HTML
 12 	Description html.HTML
 13 }
 14@@ -50,7 +52,8 @@ type Feed struct {
 15 	Title       string
 16 	Link        string
 17 	Description string
 18-	Items       []*FeedItem
 19+	Items       []*FeedItemTmpl
 20+	FeedItems   []*gofeed.Item
 21 }
 22 
 23 type DigestFeed struct {
 24@@ -62,15 +65,13 @@ type DigestOptions struct {
 25 	InlineContent bool
 26 }
 27 
 28-type Fetcher struct {
 29-	cfg *shared.ConfigSite
 30-	db  db.DB
 31-}
 32-
 33-func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
 34-	return &Fetcher{
 35-		db:  dbpool,
 36-		cfg: cfg,
 37+func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
 38+	return &FeedItemTmpl{
 39+		Title:       item.Title,
 40+		Link:        item.Link,
 41+		PublishedAt: item.PublishedParsed,
 42+		Description: html.HTML(item.Description),
 43+		Content:     html.HTML(item.Content),
 44 	}
 45 }
 46 
 47@@ -95,6 +96,29 @@ func digestOptionToTime(date time.Time, interval string) time.Time {
 48 	}
 49 }
 50 
 51+// see if this feed item should be emailed to user.
 52+func isValidItem(item *gofeed.Item, feedItems []*db.FeedItem) bool {
 53+	for _, feedItem := range feedItems {
 54+		if item.GUID == feedItem.GUID {
 55+			return false
 56+		}
 57+	}
 58+
 59+	return true
 60+}
 61+
 62+type Fetcher struct {
 63+	cfg *shared.ConfigSite
 64+	db  db.DB
 65+}
 66+
 67+func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
 68+	return &Fetcher{
 69+		db:  dbpool,
 70+		cfg: cfg,
 71+	}
 72+}
 73+
 74 func (f *Fetcher) Validate(lastDigest *time.Time, parsed *shared.ListParsedText) error {
 75 	if lastDigest == nil {
 76 		return nil
 77@@ -136,7 +160,7 @@ func (f *Fetcher) RunPost(user *db.User, post *db.Post) error {
 78 		urls = append(urls, url)
 79 	}
 80 
 81-	msgBody, err := f.FetchAll(urls, parsed.InlineContent, post.Data.LastDigest)
 82+	msgBody, err := f.FetchAll(urls, parsed.InlineContent, post.ID)
 83 	if err != nil {
 84 		return err
 85 	}
 86@@ -207,7 +231,7 @@ func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error)
 87 	return feed, nil
 88 }
 89 
 90-func (f *Fetcher) Fetch(fp *gofeed.Parser, url string, lastDigest *time.Time) (*Feed, error) {
 91+func (f *Fetcher) Fetch(fp *gofeed.Parser, url string, feedItems []*db.FeedItem) (*Feed, error) {
 92 	f.cfg.Logger.Infof("(%s) fetching feed", url)
 93 
 94 	feed, err := f.ParseURL(fp, url)
 95@@ -220,25 +244,28 @@ func (f *Fetcher) Fetch(fp *gofeed.Parser, url string, lastDigest *time.Time) (*
 96 		Description: feed.Description,
 97 		Link:        feed.Link,
 98 	}
 99-	items := []*FeedItem{}
100+
101+	items := []*FeedItemTmpl{}
102+	gofeedItems := []*gofeed.Item{}
103 	// we only want to return feed items published since the last digest time we fetched
104 	for _, item := range feed.Items {
105-		if item == nil || (item.PublishedParsed != nil && lastDigest != nil && item.PublishedParsed.Before(*lastDigest)) {
106+		if item == nil {
107+			continue
108+		}
109+
110+		if !isValidItem(item, feedItems) {
111 			continue
112 		}
113 
114-		items = append(items, &FeedItem{
115-			Title:       item.Title,
116-			Link:        item.Link,
117-			Content:     html.HTML(item.Content),
118-			Description: html.HTML(item.Description),
119-		})
120+		gofeedItems = append(gofeedItems, item)
121+		items = append(items, itemToTemplate(item))
122 	}
123 
124 	if len(items) == 0 {
125-		return nil, fmt.Errorf("(%s) %w, skipping", feed.FeedLink, ErrNoRecentArticles)
126+		return nil, fmt.Errorf("(%s) %w, skipping", url, ErrNoRecentArticles)
127 	}
128 
129+	feedTmpl.FeedItems = gofeedItems
130 	feedTmpl.Items = items
131 	return feedTmpl, nil
132 }
133@@ -284,12 +311,16 @@ type MsgBody struct {
134 	Text string
135 }
136 
137-func (f *Fetcher) FetchAll(urls []string, inlineContent bool, lastDigest *time.Time) (*MsgBody, error) {
138+func (f *Fetcher) FetchAll(urls []string, inlineContent bool, postID string) (*MsgBody, error) {
139 	fp := gofeed.NewParser()
140 	feeds := &DigestFeed{Options: DigestOptions{InlineContent: inlineContent}}
141+	feedItems, err := f.db.FindFeedItemsByPostID(postID)
142+	if err != nil {
143+		return nil, err
144+	}
145 
146 	for _, url := range urls {
147-		feedTmpl, err := f.Fetch(fp, url, lastDigest)
148+		feedTmpl, err := f.Fetch(fp, url, feedItems)
149 		if err != nil {
150 			if errors.Is(err, ErrNoRecentArticles) {
151 				f.cfg.Logger.Info(err)
152@@ -305,6 +336,27 @@ func (f *Fetcher) FetchAll(urls []string, inlineContent bool, lastDigest *time.T
153 		return nil, fmt.Errorf("%w, skipping", ErrNoRecentArticles)
154 	}
155 
156+	fdi := []*db.FeedItem{}
157+	for _, feed := range feeds.Feeds {
158+		for _, item := range feed.FeedItems {
159+			fdi = append(fdi, &db.FeedItem{
160+				PostID: postID,
161+				GUID:   item.GUID,
162+				Data: db.FeedItemData{
163+					Title:       item.Title,
164+					Description: item.Description,
165+					Content:     item.Content,
166+					Link:        item.Link,
167+					PublishedAt: item.PublishedParsed,
168+				},
169+			})
170+		}
171+	}
172+	err = f.db.InsertFeedItems(postID, fdi)
173+	if err != nil {
174+		return nil, err
175+	}
176+
177 	text, err := f.PrintText(feeds)
178 	if err != nil {
179 		return nil, err
A sql/migrations/20230326_add_feed_items.sql
+13, -0
 1@@ -0,0 +1,13 @@
 2+CREATE TABLE IF NOT EXISTS feed_items (
 3+  id uuid NOT NULL DEFAULT uuid_generate_v4(),
 4+  post_id uuid NOT NULL,
 5+  guid character varying (1000) NOT NULL,
 6+  data jsonb NOT NULL DEFAULT '{}'::jsonb,
 7+  created_at timestamp without time zone NOT NULL DEFAULT NOW(),
 8+  CONSTRAINT feed_items_pkey PRIMARY KEY (id),
 9+  CONSTRAINT fk_feed_items_posts
10+    FOREIGN KEY(post_id)
11+  REFERENCES posts(id)
12+  ON DELETE CASCADE
13+  ON UPDATE CASCADE
14+);