repos / pico

pico services - prose.sh, pastes.sh, imgs.sh, feeds.sh, pgs.sh
git clone https://github.com/picosh/pico.git

pico / feeds
Eric Bower · 13 Dec 24

cron.go

  1package feeds
  2
  3import (
  4	"crypto/tls"
  5	"errors"
  6	"fmt"
  7	html "html/template"
  8	"io"
  9	"log/slog"
 10	"math"
 11	"net/http"
 12	"strings"
 13	"text/template"
 14	"time"
 15
 16	"github.com/mmcdole/gofeed"
 17	"github.com/picosh/pico/db"
 18	"github.com/picosh/pico/shared"
 19	"github.com/sendgrid/sendgrid-go"
 20	"github.com/sendgrid/sendgrid-go/helpers/mail"
 21)
 22
 23var ErrNoRecentArticles = errors.New("no recent articles")
 24
 25type UserAgentTransport struct {
 26	http.RoundTripper
 27}
 28
 29func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
 30	userAgent := "linux:feeds:v2 (by /u/pico-sh)"
 31	r.Header.Set("User-Agent", userAgent)
 32	r.Header.Set("Accept", "*/*")
 33	return c.RoundTripper.RoundTrip(r)
 34}
 35
 36var httpClient = http.Client{
 37	Transport: &UserAgentTransport{
 38		&http.Transport{
 39			TLSClientConfig: &tls.Config{},
 40		},
 41	},
 42}
 43
 44type FeedItemTmpl struct {
 45	GUID        string
 46	Title       string
 47	Link        string
 48	PublishedAt *time.Time
 49	Content     html.HTML
 50	Description html.HTML
 51}
 52
 53type Feed struct {
 54	Title       string
 55	Link        string
 56	Description string
 57	Items       []*FeedItemTmpl
 58	FeedItems   []*gofeed.Item
 59}
 60
 61type DigestFeed struct {
 62	Feeds        []*Feed
 63	Options      DigestOptions
 64	KeepAliveURL string
 65	DaysLeft     string
 66	ShowBanner   bool
 67}
 68
 69type DigestOptions struct {
 70	InlineContent bool
 71}
 72
 73func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
 74	return &FeedItemTmpl{
 75		Title:       item.Title,
 76		Link:        item.Link,
 77		PublishedAt: item.PublishedParsed,
 78		Description: html.HTML(item.Description),
 79		Content:     html.HTML(item.Content),
 80	}
 81}
 82
 83func digestOptionToTime(lastDigest time.Time, interval string) time.Time {
 84	day := 24 * time.Hour
 85	if interval == "10min" {
 86		return lastDigest.Add(10 * time.Minute)
 87	} else if interval == "1hour" {
 88		return lastDigest.Add(1 * time.Hour)
 89	} else if interval == "6hour" {
 90		return lastDigest.Add(6 * time.Hour)
 91	} else if interval == "12hour" {
 92		return lastDigest.Add(12 * time.Hour)
 93	} else if interval == "1day" || interval == "" {
 94		return lastDigest.Add(1 * day)
 95	} else if interval == "7day" {
 96		return lastDigest.Add(7 * day)
 97	} else if interval == "30day" {
 98		return lastDigest.Add(30 * day)
 99	} else {
100		return lastDigest
101	}
102}
103
104// see if this feed item should be emailed to user.
105func isValidItem(item *gofeed.Item, feedItems []*db.FeedItem) bool {
106	for _, feedItem := range feedItems {
107		if item.GUID == feedItem.GUID {
108			return false
109		}
110	}
111
112	return true
113}
114
115type Fetcher struct {
116	cfg *shared.ConfigSite
117	db  db.DB
118}
119
120func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
121	return &Fetcher{
122		db:  dbpool,
123		cfg: cfg,
124	}
125}
126
127func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText) error {
128	lastDigest := post.Data.LastDigest
129	if lastDigest == nil {
130		return nil
131	}
132
133	now := time.Now().UTC()
134
135	expiresAt := post.ExpiresAt
136	if expiresAt != nil {
137		if post.ExpiresAt.Before(now) {
138			return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
139		}
140	}
141
142	digestAt := digestOptionToTime(*lastDigest, parsed.DigestInterval)
143	if digestAt.After(now) {
144		return fmt.Errorf("(%s) not time to digest, skipping", digestAt.Format(time.RFC3339))
145	}
146	return nil
147}
148
149func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post) error {
150	logger = logger.With("filename", post.Filename)
151	logger.Info("running feed post")
152
153	parsed := shared.ListParseText(post.Text)
154
155	logger.Info("last digest at", "lastDigest", post.Data.LastDigest.Format(time.RFC3339))
156	err := f.Validate(post, parsed)
157	if err != nil {
158		logger.Info("validation failed", "err", err)
159		return nil
160	}
161
162	urls := []string{}
163	for _, item := range parsed.Items {
164		url := ""
165		if item.IsText {
166			url = item.Value
167		} else if item.IsURL {
168			url = string(item.URL)
169		}
170
171		if url == "" {
172			continue
173		}
174
175		urls = append(urls, url)
176	}
177
178	now := time.Now().UTC()
179	if post.ExpiresAt == nil {
180		expiresAt := time.Now().AddDate(0, 6, 0)
181		post.ExpiresAt = &expiresAt
182	}
183	_, err = f.db.UpdatePost(post)
184	if err != nil {
185		return err
186	}
187
188	subject := fmt.Sprintf("%s feed digest", post.Title)
189
190	msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, post)
191	if err != nil {
192		errForUser := err
193
194		// we don't want to increment in this case
195		if errors.Is(errForUser, ErrNoRecentArticles) {
196			return nil
197		}
198
199		post.Data.Attempts += 1
200		logger.Error("could not fetch urls", "err", err, "attempts", post.Data.Attempts)
201
202		errBody := fmt.Sprintf(`There was an error attempting to fetch your feeds (%d) times.  After (3) attempts we remove the file from our system.  Please check all the URLs and re-upload.
203Also, we have centralized logs in our pico.sh TUI that will display realtime feed errors so you can debug.
204
205
206%s
207
208
209%s`, post.Data.Attempts, errForUser.Error(), post.Text)
210		err = f.SendEmail(
211			logger, user.Name,
212			parsed.Email,
213			subject,
214			&MsgBody{Html: strings.ReplaceAll(errBody, "\n", "<br />"), Text: errBody},
215		)
216		if err != nil {
217			return err
218		}
219
220		if post.Data.Attempts >= 3 {
221			err = f.db.RemovePosts([]string{post.ID})
222			if err != nil {
223				return err
224			}
225		} else {
226			_, err = f.db.UpdatePost(post)
227			if err != nil {
228				return err
229			}
230		}
231		return errForUser
232	} else {
233		post.Data.Attempts = 0
234		_, err := f.db.UpdatePost(post)
235		if err != nil {
236			return err
237		}
238	}
239
240	if msgBody != nil {
241		err = f.SendEmail(logger, user.Name, parsed.Email, subject, msgBody)
242		if err != nil {
243			return err
244		}
245	}
246
247	post.Data.LastDigest = &now
248	_, err = f.db.UpdatePost(post)
249	if err != nil {
250		return err
251	}
252
253	return nil
254}
255
256func (f *Fetcher) RunUser(user *db.User) error {
257	logger := shared.LoggerWithUser(f.cfg.Logger, user)
258	posts, err := f.db.FindPostsForUser(&db.Pager{Num: 100}, user.ID, "feeds")
259	if err != nil {
260		return err
261	}
262
263	if len(posts.Data) > 0 {
264		logger.Info("found feed posts", "len", len(posts.Data))
265	}
266
267	for _, post := range posts.Data {
268		err = f.RunPost(logger, user, post)
269		if err != nil {
270			logger.Error("run post failed", "err", err)
271		}
272	}
273
274	return nil
275}
276
277func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
278	req, err := http.NewRequest("GET", url, nil)
279	if err != nil {
280		return nil, err
281	}
282
283	resp, err := httpClient.Do(req)
284	if err != nil {
285		return nil, err
286	}
287
288	defer resp.Body.Close()
289	body, err := io.ReadAll(resp.Body)
290	if err != nil {
291		return nil, err
292	}
293
294	if resp.StatusCode < 200 || resp.StatusCode > 300 {
295		return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
296	}
297
298	feed, err := fp.ParseString(string(body))
299	if err != nil {
300		return nil, err
301	}
302
303	return feed, nil
304}
305
306func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
307	logger.Info("fetching feed", "url", url)
308
309	feed, err := f.ParseURL(fp, url)
310	if err != nil {
311		return nil, err
312	}
313
314	feedTmpl := &Feed{
315		Title:       feed.Title,
316		Description: feed.Description,
317		Link:        feed.Link,
318	}
319
320	items := []*FeedItemTmpl{}
321	gofeedItems := []*gofeed.Item{}
322	// we only want to return feed items published since the last digest time we fetched
323	for _, item := range feed.Items {
324		if item == nil {
325			continue
326		}
327
328		if !isValidItem(item, feedItems) {
329			continue
330		}
331
332		gofeedItems = append(gofeedItems, item)
333		items = append(items, itemToTemplate(item))
334	}
335
336	if len(items) == 0 {
337		return nil, fmt.Errorf(
338			"%s %w, skipping",
339			url,
340			ErrNoRecentArticles,
341		)
342	}
343
344	feedTmpl.FeedItems = gofeedItems
345	feedTmpl.Items = items
346	return feedTmpl, nil
347}
348
349func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
350	ts, err := template.ParseFiles(
351		f.cfg.StaticPath("html/digest_text.page.tmpl"),
352	)
353
354	if err != nil {
355		return "", err
356	}
357
358	w := new(strings.Builder)
359	err = ts.Execute(w, feedTmpl)
360	if err != nil {
361		return "", err
362	}
363
364	return w.String(), nil
365}
366
367func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
368	ts, err := html.ParseFiles(
369		f.cfg.StaticPath("html/digest.page.tmpl"),
370	)
371
372	if err != nil {
373		return "", err
374	}
375
376	w := new(strings.Builder)
377	err = ts.Execute(w, feedTmpl)
378	if err != nil {
379		return "", err
380	}
381
382	return w.String(), nil
383}
384
385type MsgBody struct {
386	Html string
387	Text string
388}
389
390func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, post *db.Post) (*MsgBody, error) {
391	fp := gofeed.NewParser()
392	daysLeft := ""
393	showBanner := false
394	if post.ExpiresAt != nil {
395		diff := time.Until(*post.ExpiresAt)
396		daysLeftInt := int(math.Ceil(diff.Hours() / 24))
397		daysLeft = fmt.Sprintf("%d", daysLeftInt)
398		if daysLeftInt <= 30 {
399			showBanner = true
400		}
401	}
402	feeds := &DigestFeed{
403		KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
404		DaysLeft:     daysLeft,
405		ShowBanner:   showBanner,
406		Options:      DigestOptions{InlineContent: inlineContent},
407	}
408	feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
409	if err != nil {
410		return nil, err
411	}
412
413	var allErrors error
414	for _, url := range urls {
415		feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
416		if err != nil {
417			if errors.Is(err, ErrNoRecentArticles) {
418				logger.Info("no recent articles", "err", err)
419			} else {
420				allErrors = errors.Join(allErrors, fmt.Errorf("%s: %w", url, err))
421				logger.Error("fetch error", "err", err)
422			}
423			continue
424		}
425		feeds.Feeds = append(feeds.Feeds, feedTmpl)
426	}
427
428	if len(feeds.Feeds) == 0 {
429		if allErrors != nil {
430			return nil, allErrors
431		}
432		return nil, fmt.Errorf("%w, skipping email", ErrNoRecentArticles)
433	}
434
435	fdi := []*db.FeedItem{}
436	for _, feed := range feeds.Feeds {
437		for _, item := range feed.FeedItems {
438			fdi = append(fdi, &db.FeedItem{
439				PostID: post.ID,
440				GUID:   item.GUID,
441				Data: db.FeedItemData{
442					Title:       item.Title,
443					Description: item.Description,
444					Content:     item.Content,
445					Link:        item.Link,
446					PublishedAt: item.PublishedParsed,
447				},
448			})
449		}
450	}
451	err = f.db.InsertFeedItems(post.ID, fdi)
452	if err != nil {
453		return nil, err
454	}
455
456	text, err := f.PrintText(feeds)
457	if err != nil {
458		return nil, err
459	}
460
461	html, err := f.PrintHtml(feeds)
462	if err != nil {
463		return nil, err
464	}
465
466	if allErrors != nil {
467		text = fmt.Sprintf("> %s\n\n%s", allErrors, text)
468		html = fmt.Sprintf("<blockquote>%s</blockquote><br /><br/>%s", allErrors, html)
469	}
470
471	return &MsgBody{
472		Text: text,
473		Html: html,
474	}, nil
475}
476
477func (f *Fetcher) SendEmail(logger *slog.Logger, username, email string, subject string, msg *MsgBody) error {
478	if email == "" {
479		return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
480	}
481
482	from := mail.NewEmail("team pico", shared.DefaultEmail)
483	to := mail.NewEmail(username, email)
484
485	// f.logger.Infof("message body (%s)", plainTextContent)
486
487	message := mail.NewSingleEmail(from, subject, to, msg.Text, msg.Html)
488	client := sendgrid.NewSendClient(f.cfg.SendgridKey)
489
490	logger.Info("sending email digest")
491	response, err := client.Send(message)
492	if err != nil {
493		return err
494	}
495
496	// f.logger.Infof("(%s) email digest response: %v", username, response)
497
498	if len(response.Headers["X-Message-Id"]) > 0 {
499		logger.Info(
500			"successfully sent email digest",
501			"email", email,
502			"x-message-id", response.Headers["X-Message-Id"][0],
503		)
504	} else {
505		logger.Error(
506			"could not find x-message-id, which means sending an email failed",
507			"email", email,
508		)
509	}
510
511	return nil
512}
513
514func (f *Fetcher) Run(logger *slog.Logger) error {
515	users, err := f.db.FindUsers()
516	if err != nil {
517		return err
518	}
519
520	for _, user := range users {
521		err := f.RunUser(user)
522		if err != nil {
523			logger.Error("run user failed", "err", err)
524			continue
525		}
526	}
527
528	return nil
529}
530
531func (f *Fetcher) Loop() {
532	logger := f.cfg.Logger
533	for {
534		logger.Info("running digest emailer")
535
536		err := f.Run(logger)
537		if err != nil {
538			logger.Error("run failed", "err", err)
539		}
540
541		logger.Info("digest emailer finished, waiting 10 mins")
542		time.Sleep(10 * time.Minute)
543	}
544}