- commit
- 60a7e0c
- parent
- 7e4ad3e
- author
- Antonio Mika
- date
- 2024-11-18 00:58:51 +0000 UTC
Use shared lib for metrics connection
5 files changed,
+23,
-33
+9,
-9
1@@ -21,7 +21,7 @@ import (
2 "github.com/picosh/pico/db/postgres"
3 "github.com/picosh/pico/shared"
4 "github.com/picosh/utils"
5- "github.com/picosh/utils/pipe"
6+ "github.com/picosh/utils/pipe/metrics"
7 )
8
9 type Client struct {
10@@ -643,15 +643,15 @@ func handler(routes []shared.Route, client *Client) http.HandlerFunc {
11 }
12
13 func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
14- conn := shared.NewPicoPipeClient()
15- stdoutPipe, err := pipe.Sub(ctx, logger, conn, "sub metric-drain -k")
16-
17- if err != nil {
18- logger.Error("could not sub to metric-drain", "err", err)
19- return
20- }
21+ drain := metrics.ReconnectReadMetrics(
22+ ctx,
23+ logger,
24+ shared.NewPicoPipeClient(),
25+ 100,
26+ 10*time.Millisecond,
27+ )
28
29- scanner := bufio.NewScanner(stdoutPipe)
30+ scanner := bufio.NewScanner(drain)
31 for scanner.Scan() {
32 line := scanner.Text()
33 visit := db.AnalyticsVisits{}
M
go.mod
+1,
-1
1@@ -41,7 +41,7 @@ require (
2 github.com/picosh/pubsub v0.0.0-20241114191831-ec8f16c0eb88
3 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f
4 github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9
5- github.com/picosh/utils v0.0.0-20241117232851-a32d4675d449
6+ github.com/picosh/utils v0.0.0-20241118005757-bec2696bc760
7 github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06
8 github.com/sendgrid/sendgrid-go v3.16.0+incompatible
9 github.com/simplesurance/go-ip-anonymizer v0.0.0-20200429124537-35a880f8e87d
M
go.sum
+2,
-2
1@@ -277,8 +277,8 @@ github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELij
2 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0/go.mod h1:QaBDtybFC5gz7EG/9c3bgzuyW7W5W2rYLFZxWNuWk3Q=
3 github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9 h1:g5oZmnDFr11HarA8IAXcc4o9PBlolSM59QIATCSoato=
4 github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9/go.mod h1:UrDH/VCIc1wg/L6iY2zSYt4TiGw+25GsKSnkVkU40Dw=
5-github.com/picosh/utils v0.0.0-20241117232851-a32d4675d449 h1:hXJPjYl0y9YD0EqqST3j4oqCEKQJzZwNTg/iHgD40wo=
6-github.com/picosh/utils v0.0.0-20241117232851-a32d4675d449/go.mod h1:HogYEyJ43IGXrOa3D/kjM1pkzNAyh+pejRyv8Eo//pk=
7+github.com/picosh/utils v0.0.0-20241118005757-bec2696bc760 h1:NDSSzGV96Rs76/Be+xGdKPm9YGC8ayvCL3CgbFIA4iE=
8+github.com/picosh/utils v0.0.0-20241118005757-bec2696bc760/go.mod h1:HogYEyJ43IGXrOa3D/kjM1pkzNAyh+pejRyv8Eo//pk=
9 github.com/pkg/sftp v1.13.7 h1:uv+I3nNJvlKZIQGSr8JVQLNHFU9YhhNpvC14Y6KgmSM=
10 github.com/pkg/sftp v1.13.7/go.mod h1:KMKI0t3T6hfA+lTR/ssZdunHo+uwq7ghoN09/FSu3DY=
11 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1@@ -15,7 +15,7 @@ import (
2 "time"
3
4 "github.com/picosh/pico/db"
5- "github.com/picosh/utils/pipe"
6+ "github.com/picosh/utils/pipe/metrics"
7 "github.com/simplesurance/go-ip-anonymizer/ipanonymizer"
8 "github.com/x-way/crawlerdetect"
9 )
10@@ -168,18 +168,13 @@ func AnalyticsVisitFromRequest(r *http.Request, dbpool db.DB, userID string) (*d
11 }
12
13 func AnalyticsCollect(ch chan *db.AnalyticsVisits, dbpool db.DB, logger *slog.Logger) {
14- info := NewPicoPipeClient()
15- metricDrain, err := pipe.NewClient(context.Background(), logger, info)
16- if err != nil {
17- logger.Error("could not create metric-drain client", "err", err)
18- return
19- }
20-
21- s, err := metricDrain.AddSession("metric-drain", "pub metric-drain -b=false", 100, 0, 10*time.Millisecond)
22- if err != nil {
23- logger.Error("could not add session for metric-drain", "err", err)
24- return
25- }
26+ drain := metrics.RegisterReconnectMetricRecorder(
27+ context.Background(),
28+ logger,
29+ NewPicoPipeClient(),
30+ 100,
31+ 10*time.Millisecond,
32+ )
33
34 for visit := range ch {
35 data, err := json.Marshal(visit)
36@@ -190,7 +185,7 @@ func AnalyticsCollect(ch chan *db.AnalyticsVisits, dbpool db.DB, logger *slog.Lo
37
38 data = append(data, '\n')
39
40- _, err = s.Write(data)
41+ _, err = drain.Write(data)
42 if err != nil {
43 logger.Error("could not write to metric-drain", "err", err)
44 }
1@@ -1,6 +1,7 @@
2 package shared
3
4 import (
5+ "context"
6 "fmt"
7 "html/template"
8 "log/slog"
9@@ -280,13 +281,7 @@ func CreateLogger(space string) *slog.Logger {
10
11 if strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true" {
12 conn := NewPicoPipeClient()
13- newLog, err := pipeLogger.RegisterReconnectLogger(log, conn, 100, 10*time.Millisecond)
14-
15- if err == nil {
16- newLogger = newLog
17- } else {
18- slog.Error("unable to start send logger", "error", err)
19- }
20+ newLogger = pipeLogger.RegisterReconnectLogger(context.Background(), log, conn, 100, 10*time.Millisecond)
21 }
22
23 return newLogger.With("service", space)