- commit
- dbac3df
- parent
- 8e0e7f4
- author
- Antonio Mika
- date
- 2024-11-17 16:37:02 +0000 UTC
Add buffering to analytics collection and make it non-blocking
6 files changed,
+32,
-15
+1,
-1
1@@ -81,7 +81,7 @@ func StartSshServer() {
2 st,
3 )
4
5- ch := make(chan *db.AnalyticsVisits)
6+ ch := make(chan *db.AnalyticsVisits, 100)
7 go shared.AnalyticsCollect(ch, dbpool, logger)
8 apiConfig := &shared.ApiConfig{
9 Cfg: cfg,
+1,
-1
1@@ -40,7 +40,7 @@ func StartApiServer() {
2 return
3 }
4
5- ch := make(chan *db.AnalyticsVisits)
6+ ch := make(chan *db.AnalyticsVisits, 100)
7 go shared.AnalyticsCollect(ch, dbpool, logger)
8
9 routes := NewWebRouter(cfg, logger, dbpool, st, ch)
+12,
-4
1@@ -161,10 +161,14 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2 if err == nil {
3 view.ProjectID = h.ProjectID
4 view.Status = http.StatusNotFound
5- ch <- view
6+ select {
7+ case ch <- view:
8+ default:
9+ logger.Error("could not send analytics view to channel", "view", view)
10+ }
11 } else {
12 if !errors.Is(err, shared.ErrAnalyticsDisabled) {
13- logger.Error("could not record analytics view", "err", err)
14+ logger.Error("could not record analytics view", "err", err, "view", view)
15 }
16 }
17 http.Error(w, "404 not found", http.StatusNotFound)
18@@ -239,10 +243,14 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
19 view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
20 if err == nil {
21 view.ProjectID = h.ProjectID
22- ch <- view
23+ select {
24+ case ch <- view:
25+ default:
26+ logger.Error("could not send analytics view to channel", "view", view)
27+ }
28 } else {
29 if !errors.Is(err, shared.ErrAnalyticsDisabled) {
30- logger.Error("could not record analytics view", "err", err)
31+ logger.Error("could not record analytics view", "err", err, "view", view)
32 }
33 }
34 }
+3,
-3
1@@ -219,7 +219,7 @@ func TestApiBasic(t *testing.T) {
2 responseRecorder := httptest.NewRecorder()
3
4 st, _ := storage.NewStorageMemory(tc.storage)
5- ch := make(chan *db.AnalyticsVisits)
6+ ch := make(chan *db.AnalyticsVisits, 100)
7 router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
8 router.ServeHTTP(responseRecorder, request)
9
10@@ -254,7 +254,7 @@ func TestAnalytics(t *testing.T) {
11 },
12 }
13 st, _ := storage.NewStorageMemory(sto)
14- ch := make(chan *db.AnalyticsVisits)
15+ ch := make(chan *db.AnalyticsVisits, 100)
16 dbpool := NewPgsAnalticsDb(cfg.Logger)
17 router := NewWebRouter(cfg, cfg.Logger, dbpool, st, ch)
18
19@@ -337,7 +337,7 @@ func TestImageManipulation(t *testing.T) {
20 Ratio: &storage.Ratio{},
21 },
22 }
23- ch := make(chan *db.AnalyticsVisits)
24+ ch := make(chan *db.AnalyticsVisits, 100)
25 router := NewWebRouter(cfg, cfg.Logger, tc.dbpool, st, ch)
26 router.ServeHTTP(responseRecorder, request)
27
+13,
-5
1@@ -274,10 +274,14 @@ func blogHandler(w http.ResponseWriter, r *http.Request) {
2 ch := shared.GetAnalyticsQueue(r)
3 view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
4 if err == nil {
5- ch <- view
6+ select {
7+ case ch <- view:
8+ default:
9+ logger.Error("could not send analytics view to channel", "view", view)
10+ }
11 } else {
12 if !errors.Is(err, shared.ErrAnalyticsDisabled) {
13- logger.Error("could not record analytics view", "err", err)
14+ logger.Error("could not record analytics view", "err", err, "view", view)
15 }
16 }
17
18@@ -429,10 +433,14 @@ func postHandler(w http.ResponseWriter, r *http.Request) {
19 view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
20 if err == nil {
21 view.PostID = post.ID
22- ch <- view
23+ select {
24+ case ch <- view:
25+ default:
26+ logger.Error("could not send analytics view to channel", "view", view)
27+ }
28 } else {
29 if !errors.Is(err, shared.ErrAnalyticsDisabled) {
30- logger.Error("could not record analytics view", "err", err)
31+ logger.Error("could not record analytics view", "err", err, "view", view)
32 }
33 }
34
35@@ -945,7 +953,7 @@ func StartApiServer() {
36 mainRoutes := createMainRoutes(staticRoutes)
37 subdomainRoutes := createSubdomainRoutes(staticRoutes)
38
39- ch := make(chan *db.AnalyticsVisits)
40+ ch := make(chan *db.AnalyticsVisits, 100)
41 go shared.AnalyticsCollect(ch, dbpool, logger)
42 apiConfig := &shared.ApiConfig{
43 Cfg: cfg,
1@@ -12,6 +12,7 @@ import (
2 "net"
3 "net/http"
4 "net/url"
5+ "time"
6
7 "github.com/picosh/pico/db"
8 "github.com/picosh/utils/pipe"
9@@ -174,7 +175,7 @@ func AnalyticsCollect(ch chan *db.AnalyticsVisits, dbpool db.DB, logger *slog.Lo
10 return
11 }
12
13- s, err := metricDrain.AddSession("metric-drain", "pub metric-drain -b=false", -1, -1)
14+ s, err := metricDrain.AddSession("metric-drain", "pub metric-drain -b=false", 100, 10*time.Millisecond)
15 if err != nil {
16 logger.Error("could not add session for metric-drain", "err", err)
17 return