repos / pico

pico services - prose.sh, pastes.sh, imgs.sh, feeds.sh, pgs.sh
git clone https://github.com/picosh/pico.git

pico / feeds
Eric Bower · 23 Sep 24

cron.go

  1package feeds
  2
  3import (
  4	"crypto/tls"
  5	"errors"
  6	"fmt"
  7	html "html/template"
  8	"io"
  9	"log/slog"
 10	"math"
 11	"net/http"
 12	"strings"
 13	"text/template"
 14	"time"
 15
 16	"github.com/mmcdole/gofeed"
 17	"github.com/picosh/pico/db"
 18	"github.com/picosh/pico/shared"
 19	"github.com/sendgrid/sendgrid-go"
 20	"github.com/sendgrid/sendgrid-go/helpers/mail"
 21)
 22
 23var ErrNoRecentArticles = errors.New("no recent articles")
 24
 25type UserAgentTransport struct {
 26	http.RoundTripper
 27}
 28
 29func (c *UserAgentTransport) RoundTrip(r *http.Request) (*http.Response, error) {
 30	userAgent := "linux:feeds:v2 (by /u/pico-sh)"
 31	r.Header.Set("User-Agent", userAgent)
 32	r.Header.Set("Accept", "*/*")
 33	return c.RoundTripper.RoundTrip(r)
 34}
 35
 36var httpClient = http.Client{
 37	Transport: &UserAgentTransport{
 38		&http.Transport{
 39			TLSClientConfig: &tls.Config{},
 40		},
 41	},
 42}
 43
 44type FeedItemTmpl struct {
 45	GUID        string
 46	Title       string
 47	Link        string
 48	PublishedAt *time.Time
 49	Content     html.HTML
 50	Description html.HTML
 51}
 52
 53type Feed struct {
 54	Title       string
 55	Link        string
 56	Description string
 57	Items       []*FeedItemTmpl
 58	FeedItems   []*gofeed.Item
 59}
 60
 61type DigestFeed struct {
 62	Feeds        []*Feed
 63	Options      DigestOptions
 64	KeepAliveURL string
 65	DaysLeft     string
 66}
 67
 68type DigestOptions struct {
 69	InlineContent bool
 70}
 71
 72func itemToTemplate(item *gofeed.Item) *FeedItemTmpl {
 73	return &FeedItemTmpl{
 74		Title:       item.Title,
 75		Link:        item.Link,
 76		PublishedAt: item.PublishedParsed,
 77		Description: html.HTML(item.Description),
 78		Content:     html.HTML(item.Content),
 79	}
 80}
 81
 82func digestOptionToTime(lastDigest time.Time, interval string) time.Time {
 83	day := 24 * time.Hour
 84	if interval == "10min" {
 85		return lastDigest.Add(10 * time.Minute)
 86	} else if interval == "1hour" {
 87		return lastDigest.Add(1 * time.Hour)
 88	} else if interval == "6hour" {
 89		return lastDigest.Add(6 * time.Hour)
 90	} else if interval == "12hour" {
 91		return lastDigest.Add(12 * time.Hour)
 92	} else if interval == "1day" || interval == "" {
 93		return lastDigest.Add(1 * day)
 94	} else if interval == "7day" {
 95		return lastDigest.Add(7 * day)
 96	} else if interval == "30day" {
 97		return lastDigest.Add(30 * day)
 98	} else {
 99		return lastDigest
100	}
101}
102
103// see if this feed item should be emailed to user.
104func isValidItem(item *gofeed.Item, feedItems []*db.FeedItem) bool {
105	for _, feedItem := range feedItems {
106		if item.GUID == feedItem.GUID {
107			return false
108		}
109	}
110
111	return true
112}
113
114type Fetcher struct {
115	cfg *shared.ConfigSite
116	db  db.DB
117}
118
119func NewFetcher(dbpool db.DB, cfg *shared.ConfigSite) *Fetcher {
120	return &Fetcher{
121		db:  dbpool,
122		cfg: cfg,
123	}
124}
125
126func (f *Fetcher) Validate(post *db.Post, parsed *shared.ListParsedText) error {
127	lastDigest := post.Data.LastDigest
128	if lastDigest == nil {
129		return nil
130	}
131
132	now := time.Now().UTC()
133
134	expiresAt := post.ExpiresAt
135	if expiresAt != nil {
136		if post.ExpiresAt.Before(now) {
137			return fmt.Errorf("(%s) post has expired, skipping", post.ExpiresAt.Format(time.RFC3339))
138		}
139	}
140
141	digestAt := digestOptionToTime(*lastDigest, parsed.DigestInterval)
142	if digestAt.After(now) {
143		return fmt.Errorf("(%s) not time to digest, skipping", digestAt.Format(time.RFC3339))
144	}
145	return nil
146}
147
148func (f *Fetcher) RunPost(logger *slog.Logger, user *db.User, post *db.Post) error {
149	logger = logger.With("filename", post.Filename)
150	logger.Info("running feed post")
151
152	parsed := shared.ListParseText(post.Text)
153
154	logger.Info("last digest at", "lastDigest", post.Data.LastDigest)
155	err := f.Validate(post, parsed)
156	if err != nil {
157		logger.Info("validation failed", "err", err.Error())
158		return nil
159	}
160
161	urls := []string{}
162	for _, item := range parsed.Items {
163		url := ""
164		if item.IsText {
165			url = item.Value
166		} else if item.IsURL {
167			url = string(item.URL)
168		}
169
170		if url == "" {
171			continue
172		}
173
174		urls = append(urls, url)
175	}
176
177	msgBody, err := f.FetchAll(logger, urls, parsed.InlineContent, user.Name, post)
178	if err != nil {
179		return err
180	}
181
182	subject := fmt.Sprintf("%s feed digest", post.Title)
183	err = f.SendEmail(logger, user.Name, parsed.Email, subject, msgBody)
184	if err != nil {
185		return err
186	}
187
188	now := time.Now().UTC()
189	if post.ExpiresAt == nil {
190		expiresAt := time.Now().AddDate(0, 3, 0)
191		post.ExpiresAt = &expiresAt
192	}
193	post.Data.LastDigest = &now
194	_, err = f.db.UpdatePost(post)
195	return err
196}
197
198func (f *Fetcher) RunUser(user *db.User) error {
199	logger := shared.LoggerWithUser(f.cfg.Logger, user)
200	posts, err := f.db.FindPostsForUser(&db.Pager{Num: 1000}, user.ID, "feeds")
201	if err != nil {
202		return err
203	}
204
205	if len(posts.Data) > 0 {
206		logger.Info("found feed posts", "len", len(posts.Data))
207	}
208
209	for _, post := range posts.Data {
210		err = f.RunPost(logger, user, post)
211		if err != nil {
212			logger.Info("RunPost failed", "err", err.Error())
213		}
214	}
215
216	return nil
217}
218
219func (f *Fetcher) ParseURL(fp *gofeed.Parser, url string) (*gofeed.Feed, error) {
220	req, err := http.NewRequest("GET", url, nil)
221	if err != nil {
222		return nil, err
223	}
224
225	resp, err := httpClient.Do(req)
226	if err != nil {
227		return nil, err
228	}
229
230	defer resp.Body.Close()
231	body, err := io.ReadAll(resp.Body)
232	if err != nil {
233		return nil, err
234	}
235
236	if resp.StatusCode < 200 || resp.StatusCode > 300 {
237		return nil, fmt.Errorf("fetching feed resulted in an error: %s %s", resp.Status, body)
238	}
239
240	feed, err := fp.ParseString(string(body))
241
242	if err != nil {
243		return nil, err
244	}
245
246	return feed, nil
247}
248
249func (f *Fetcher) Fetch(logger *slog.Logger, fp *gofeed.Parser, url string, username string, feedItems []*db.FeedItem) (*Feed, error) {
250	logger.Info("fetching feed", "url", url)
251
252	feed, err := f.ParseURL(fp, url)
253	if err != nil {
254		return nil, err
255	}
256
257	feedTmpl := &Feed{
258		Title:       feed.Title,
259		Description: feed.Description,
260		Link:        feed.Link,
261	}
262
263	items := []*FeedItemTmpl{}
264	gofeedItems := []*gofeed.Item{}
265	// we only want to return feed items published since the last digest time we fetched
266	for _, item := range feed.Items {
267		if item == nil {
268			continue
269		}
270
271		if !isValidItem(item, feedItems) {
272			continue
273		}
274
275		gofeedItems = append(gofeedItems, item)
276		items = append(items, itemToTemplate(item))
277	}
278
279	if len(items) == 0 {
280		return nil, fmt.Errorf(
281			"%s %w, skipping",
282			url,
283			ErrNoRecentArticles,
284		)
285	}
286
287	feedTmpl.FeedItems = gofeedItems
288	feedTmpl.Items = items
289	return feedTmpl, nil
290}
291
292func (f *Fetcher) PrintText(feedTmpl *DigestFeed) (string, error) {
293	ts, err := template.ParseFiles(
294		f.cfg.StaticPath("html/digest_text.page.tmpl"),
295	)
296
297	if err != nil {
298		return "", err
299	}
300
301	w := new(strings.Builder)
302	err = ts.Execute(w, feedTmpl)
303	if err != nil {
304		return "", err
305	}
306
307	return w.String(), nil
308}
309
310func (f *Fetcher) PrintHtml(feedTmpl *DigestFeed) (string, error) {
311	ts, err := html.ParseFiles(
312		f.cfg.StaticPath("html/digest.page.tmpl"),
313	)
314
315	if err != nil {
316		return "", err
317	}
318
319	w := new(strings.Builder)
320	err = ts.Execute(w, feedTmpl)
321	if err != nil {
322		return "", err
323	}
324
325	return w.String(), nil
326}
327
328type MsgBody struct {
329	Html string
330	Text string
331}
332
333func (f *Fetcher) FetchAll(logger *slog.Logger, urls []string, inlineContent bool, username string, post *db.Post) (*MsgBody, error) {
334	fp := gofeed.NewParser()
335	daysLeft := "90"
336	if post.ExpiresAt != nil {
337		diff := time.Until(*post.ExpiresAt)
338		daysLeft = fmt.Sprintf("%f", math.Ceil(diff.Hours()/24))
339	}
340	feeds := &DigestFeed{
341		KeepAliveURL: fmt.Sprintf("https://feeds.pico.sh/keep-alive/%s", post.ID),
342		DaysLeft:     daysLeft,
343		Options:      DigestOptions{InlineContent: inlineContent},
344	}
345	feedItems, err := f.db.FindFeedItemsByPostID(post.ID)
346	if err != nil {
347		return nil, err
348	}
349
350	for _, url := range urls {
351		feedTmpl, err := f.Fetch(logger, fp, url, username, feedItems)
352		if err != nil {
353			if errors.Is(err, ErrNoRecentArticles) {
354				logger.Info("no recent articles", "err", err.Error())
355			} else {
356				logger.Error("fetch error", "err", err.Error())
357			}
358			continue
359		}
360		feeds.Feeds = append(feeds.Feeds, feedTmpl)
361	}
362
363	if len(feeds.Feeds) == 0 {
364		return nil, fmt.Errorf("(%s) %w, skipping email", username, ErrNoRecentArticles)
365	}
366
367	fdi := []*db.FeedItem{}
368	for _, feed := range feeds.Feeds {
369		for _, item := range feed.FeedItems {
370			fdi = append(fdi, &db.FeedItem{
371				PostID: post.ID,
372				GUID:   item.GUID,
373				Data: db.FeedItemData{
374					Title:       item.Title,
375					Description: item.Description,
376					Content:     item.Content,
377					Link:        item.Link,
378					PublishedAt: item.PublishedParsed,
379				},
380			})
381		}
382	}
383	err = f.db.InsertFeedItems(post.ID, fdi)
384	if err != nil {
385		return nil, err
386	}
387
388	text, err := f.PrintText(feeds)
389	if err != nil {
390		return nil, err
391	}
392
393	html, err := f.PrintHtml(feeds)
394	if err != nil {
395		return nil, err
396	}
397
398	return &MsgBody{
399		Text: text,
400		Html: html,
401	}, nil
402}
403
404func (f *Fetcher) SendEmail(logger *slog.Logger, username, email string, subject string, msg *MsgBody) error {
405	if email == "" {
406		return fmt.Errorf("(%s) does not have an email associated with their feed post", username)
407	}
408
409	from := mail.NewEmail("team pico", shared.DefaultEmail)
410	to := mail.NewEmail(username, email)
411
412	// f.logger.Infof("message body (%s)", plainTextContent)
413
414	message := mail.NewSingleEmail(from, subject, to, msg.Text, msg.Html)
415	client := sendgrid.NewSendClient(f.cfg.SendgridKey)
416
417	logger.Info("sending email digest")
418	response, err := client.Send(message)
419	if err != nil {
420		return err
421	}
422
423	// f.logger.Infof("(%s) email digest response: %v", username, response)
424
425	if len(response.Headers["X-Message-Id"]) > 0 {
426		logger.Info(
427			"successfully sent email digest",
428			"email", email,
429			"x-message-id", response.Headers["X-Message-Id"][0],
430		)
431	} else {
432		logger.Error(
433			"could not find x-message-id, which means sending an email failed",
434			"email", email,
435		)
436	}
437
438	return nil
439}
440
441func (f *Fetcher) Run(logger *slog.Logger) error {
442	users, err := f.db.FindUsers()
443	if err != nil {
444		return err
445	}
446
447	for _, user := range users {
448		err := f.RunUser(user)
449		if err != nil {
450			logger.Error("RunUser failed", "err", err.Error())
451			continue
452		}
453	}
454
455	return nil
456}
457
458func (f *Fetcher) Loop() {
459	logger := f.cfg.Logger
460	for {
461		logger.Info("running digest emailer")
462
463		err := f.Run(logger)
464		if err != nil {
465			logger.Error(err.Error())
466		}
467
468		logger.Info("digest emailer finished, waiting 10 mins")
469		time.Sleep(10 * time.Minute)
470	}
471}