- commit
- 2653160
- parent
- 4bc6f5b
- author
- Eric Bower
- date
- 2024-11-11 02:41:27 +0000 UTC
feat(auth): subscribe to pico's metric-drain pipe We have a lot of services that need to record site usage analytics so we need a distributed way to receive these events. As a result, all of our site visits will be sent through our pipe service. Then inside of our auth service we process the visits and store them in our analytics table. This overloads the auth service since it's now serving as a destination for our metric-drain, but we think that's okay for now.
M
Makefile
+2,
-1
1@@ -130,10 +130,11 @@ migrate:
2 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240324_add_analytics_table.sql
3 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20240819_add_projects_blocked.sql
4 $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql
5+ $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
6 .PHONY: migrate
7
8 latest:
9- $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241028_add_analytics_indexes.sql
10+ $(DOCKER_CMD) exec -i $(DB_CONTAINER) psql -U $(PGUSER) -d $(PGDATABASE) < ./sql/migrations/20241114_add_namespace_to_analytics.sql
11 .PHONY: latest
12
13 psql:
+48,
-1
1@@ -1,9 +1,11 @@
2 package auth
3
4 import (
5+ "bufio"
6 "context"
7 "crypto/hmac"
8 "encoding/json"
9+ "errors"
10 "fmt"
11 "html/template"
12 "io"
13@@ -18,6 +20,7 @@ import (
14 "github.com/picosh/pico/db"
15 "github.com/picosh/pico/db/postgres"
16 "github.com/picosh/pico/shared"
17+ "github.com/picosh/pubsub"
18 "github.com/picosh/utils"
19 )
20
21@@ -639,12 +642,47 @@ func handler(routes []shared.Route, client *Client) http.HandlerFunc {
22 }
23 }
24
25+func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secret string) {
26+ conn := shared.NewPicoPipeClient()
27+ stdoutPipe, err := pubsub.RemoteSub("sub metric-drain -k", ctx, conn)
28+
29+ if err != nil {
30+ logger.Error("could not sub to metric-drain", "err", err)
31+ return
32+ }
33+
34+ scanner := bufio.NewScanner(stdoutPipe)
35+ for scanner.Scan() {
36+ line := scanner.Text()
37+ visit := db.AnalyticsVisits{}
38+ err := json.Unmarshal([]byte(line), &visit)
39+ if err != nil {
40+ logger.Error("json unmarshal", "err", err)
41+ continue
42+ }
43+
44+ err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
45+ if err != nil {
46+ if !errors.Is(err, shared.ErrAnalyticsDisabled) {
47+ logger.Info("could not record analytics visit", "reason", err)
48+ }
49+ }
50+
51+ logger.Info("inserting visit", "visit", visit)
52+ err = dbpool.InsertVisit(&visit)
53+ if err != nil {
54+ logger.Error("could not insert visit record", "err", err)
55+ }
56+ }
57+}
58+
59 type AuthCfg struct {
60 Debug bool
61 Port string
62 DbURL string
63 Domain string
64 Issuer string
65+ Secret string
66 }
67
68 func StartApiServer() {
69@@ -655,6 +693,10 @@ func StartApiServer() {
70 Issuer: utils.GetEnv("AUTH_ISSUER", "pico.sh"),
71 Domain: utils.GetEnv("AUTH_DOMAIN", "http://0.0.0.0:3000"),
72 Port: utils.GetEnv("AUTH_WEB_PORT", "3000"),
73+ Secret: utils.GetEnv("PICO_SECRET", ""),
74+ }
75+ if cfg.Secret == "" {
76+ panic("must provide PICO_SECRET environment variable")
77 }
78
79 logger := shared.CreateLogger("auth")
80@@ -667,6 +709,11 @@ func StartApiServer() {
81 Logger: logger,
82 }
83
84+ ctx := context.Background()
85+ // gather metrics in the auth service
86+ go metricDrainSub(ctx, db, logger, cfg.Secret)
87+ defer ctx.Done()
88+
89 routes := createMainRoutes()
90
91 if cfg.Debug {
92@@ -679,6 +726,6 @@ func StartApiServer() {
93 client.Logger.Info("starting server on port", "port", cfg.Port)
94 err := http.ListenAndServe(portStr, router)
95 if err != nil {
96- client.Logger.Info(err.Error())
97+ client.Logger.Info("http-serve", "err", err.Error())
98 }
99 }
M
db/db.go
+11,
-10
1@@ -161,16 +161,17 @@ type PostAnalytics struct {
2 }
3
4 type AnalyticsVisits struct {
5- ID string
6- UserID string
7- ProjectID string
8- PostID string
9- Host string
10- Path string
11- IpAddress string
12- UserAgent string
13- Referer string
14- Status int
15+ ID string `json:"id"`
16+ UserID string `json:"user_id"`
17+ ProjectID string `json:"project_id"`
18+ PostID string `json:"post_id"`
19+ Namespace string `json:"namespace"`
20+ Host string `json:"host"`
21+ Path string `json:"path"`
22+ IpAddress string `json:"ip_adress"`
23+ UserAgent string `json:"user_agent"`
24+ Referer string `json:"referer"`
25+ Status int `json:"status"`
26 }
27
28 type VisitInterval struct {
+12,
-11
1@@ -984,18 +984,19 @@ func newNullString(s string) sql.NullString {
2 }
3 }
4
5-func (me *PsqlDB) InsertVisit(view *db.AnalyticsVisits) error {
6+func (me *PsqlDB) InsertVisit(visit *db.AnalyticsVisits) error {
7 _, err := me.Db.Exec(
8- `INSERT INTO analytics_visits (user_id, project_id, post_id, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);`,
9- view.UserID,
10- newNullString(view.ProjectID),
11- newNullString(view.PostID),
12- view.Host,
13- view.Path,
14- view.IpAddress,
15- view.UserAgent,
16- view.Referer,
17- view.Status,
18+ `INSERT INTO analytics_visits (user_id, project_id, post_id, namespace, host, path, ip_address, user_agent, referer, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`,
19+ visit.UserID,
20+ newNullString(visit.ProjectID),
21+ newNullString(visit.PostID),
22+ newNullString(visit.Namespace),
23+ visit.Host,
24+ visit.Path,
25+ visit.IpAddress,
26+ visit.UserAgent,
27+ visit.Referer,
28+ visit.Status,
29 )
30 return err
31 }
M
go.mod
+2,
-2
1@@ -31,13 +31,12 @@ require (
2 github.com/gorilla/feeds v1.2.0
3 github.com/lib/pq v1.10.9
4 github.com/microcosm-cc/bluemonday v1.0.27
5- github.com/minio/minio-go/v7 v7.0.80
6 github.com/mmcdole/gofeed v1.3.0
7 github.com/muesli/reflow v0.3.0
8 github.com/muesli/termenv v0.15.3-0.20240912151726-82936c5ea257
9 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
10 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23
11- github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8
12+ github.com/picosh/pubsub v0.0.0-20241114162342-5138571dceda
13 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f
14 github.com/picosh/tunkit v0.0.0-20240905223921-532404cef9d9
15 github.com/picosh/utils v0.0.0-20241018143404-b351d5d765f3
16@@ -133,6 +132,7 @@ require (
17 github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
18 github.com/minio/madmin-go/v3 v3.0.75 // indirect
19 github.com/minio/md5-simd v1.1.2 // indirect
20+ github.com/minio/minio-go/v7 v7.0.80 // indirect
21 github.com/mmcdole/goxpp v1.1.1 // indirect
22 github.com/mmcloughlin/md4 v0.1.2 // indirect
23 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
M
go.sum
+2,
-2
1@@ -269,8 +269,8 @@ github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoO
2 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
3 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23 h1:NEJ5a4UXeF0/X7xmYNzXcwLQID9DwgazlqkMMC5zZ3M=
4 github.com/picosh/pobj v0.0.0-20241016194248-c39198b2ff23/go.mod h1:cF+eAl4G1vU+WOD8cYCKaxokHo6MWmbR8J4/SJnvESg=
5-github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8 h1:E/eQsxdHBctPArAzjSHUAVZtDXjsD1AduGD94mbUJQg=
6-github.com/picosh/pubsub v0.0.0-20241030185810-e24d08b67ab8/go.mod h1:ajolgob5MxlHdp5HllF7u3rTlCgER4InqfP7M/xl6HQ=
7+github.com/picosh/pubsub v0.0.0-20241114162342-5138571dceda h1:0prs4E8ai30JhQm6ZH2b9NYD5NcKGElnmP21XiS2IH8=
8+github.com/picosh/pubsub v0.0.0-20241114162342-5138571dceda/go.mod h1:m6ZZpg+lZB3XTIKlbSqQgi4NrBPtARv23b8vGYDoCo4=
9 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f h1:pdEh1Z7zH5Og9nS7jRuqwup3bcPsC6faDNQ6mgrV9ws=
10 github.com/picosh/send v0.0.0-20241107150437-0febb0049b4f/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
11 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=
+0,
-5
1@@ -20,13 +20,8 @@ func NewConfigSite() *shared.ConfigSite {
2 minioUser := utils.GetEnv("MINIO_ROOT_USER", "")
3 minioPass := utils.GetEnv("MINIO_ROOT_PASSWORD", "")
4 dbURL := utils.GetEnv("DATABASE_URL", "")
5- secret := utils.GetEnv("PICO_SECRET", "")
6- if secret == "" {
7- panic("must provide PICO_SECRET environment variable")
8- }
9
10 cfg := shared.ConfigSite{
11- Secret: secret,
12 Domain: domain,
13 Port: port,
14 Protocol: protocol,
+2,
-2
1@@ -157,7 +157,7 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2 )
3 // track 404s
4 ch := h.AnalyticsQueue
5- view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID, h.Cfg.Secret)
6+ view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
7 if err == nil {
8 view.ProjectID = h.ProjectID
9 view.Status = http.StatusNotFound
10@@ -236,7 +236,7 @@ func (h *ApiAssetHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
11 if finContentType == "text/html" {
12 // track visit
13 ch := h.AnalyticsQueue
14- view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID, h.Cfg.Secret)
15+ view, err := shared.AnalyticsVisitFromRequest(r, h.Dbpool, h.UserID)
16 if err == nil {
17 view.ProjectID = h.ProjectID
18 ch <- view
+2,
-7
1@@ -70,13 +70,8 @@ func (c *Cmd) notifications() error {
2 }
3
4 func (c *Cmd) logs(ctx context.Context) error {
5- stdoutPipe, err := pipeLogger.ConnectToLogs(ctx, &pipeLogger.PubSubConnectionInfo{
6- RemoteHost: utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
7- KeyLocation: utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
8- KeyPassphrase: utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
9- RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
10- RemoteUser: utils.GetEnv("PICO_PIPE_USER", "pico"),
11- })
12+ conn := shared.NewPicoPipeClient()
13+ stdoutPipe, err := pipeLogger.ConnectToLogs(ctx, conn)
14
15 if err != nil {
16 return err
+2,
-2
1@@ -272,7 +272,7 @@ func blogHandler(w http.ResponseWriter, r *http.Request) {
2
3 // track visit
4 ch := shared.GetAnalyticsQueue(r)
5- view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID, cfg.Secret)
6+ view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
7 if err == nil {
8 ch <- view
9 } else {
10@@ -426,7 +426,7 @@ func postHandler(w http.ResponseWriter, r *http.Request) {
11 }
12
13 // track visit
14- view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID, cfg.Secret)
15+ view, err := shared.AnalyticsVisitFromRequest(r, dbpool, user.ID)
16 if err == nil {
17 view.PostID = post.ID
18 ch <- view
+0,
-5
1@@ -17,14 +17,9 @@ func NewConfigSite() *shared.ConfigSite {
2 dbURL := utils.GetEnv("DATABASE_URL", "")
3 maxSize := uint64(500 * utils.MB)
4 maxImgSize := int64(10 * utils.MB)
5- secret := utils.GetEnv("PICO_SECRET", "")
6- if secret == "" {
7- panic("must provide PICO_SECRET environment variable")
8- }
9
10 return &shared.ConfigSite{
11 Debug: debug == "1",
12- Secret: secret,
13 Domain: domain,
14 Port: port,
15 Protocol: protocol,
1@@ -4,6 +4,7 @@ import (
2 "crypto/hmac"
3 "crypto/sha256"
4 "encoding/hex"
5+ "encoding/json"
6 "errors"
7 "fmt"
8 "log/slog"
9@@ -12,6 +13,7 @@ import (
10 "net/url"
11
12 "github.com/picosh/pico/db"
13+ "github.com/picosh/pubsub"
14 "github.com/simplesurance/go-ip-anonymizer/ipanonymizer"
15 "github.com/x-way/crawlerdetect"
16 )
17@@ -23,8 +25,7 @@ func HmacString(secret, data string) string {
18 return hex.EncodeToString(dataHmac)
19 }
20
21-func trackableRequest(r *http.Request) error {
22- agent := r.UserAgent()
23+func trackableUserAgent(agent string) error {
24 // dont store requests from bots
25 if crawlerdetect.IsCrawler(agent) {
26 return fmt.Errorf(
27@@ -35,6 +36,11 @@ func trackableRequest(r *http.Request) error {
28 return nil
29 }
30
31+func trackableRequest(r *http.Request) error {
32+ agent := r.UserAgent()
33+ return trackableUserAgent(agent)
34+}
35+
36 func cleanIpAddress(ip string) (string, error) {
37 host, _, err := net.SplitHostPort(ip)
38 if err != nil {
39@@ -50,11 +56,22 @@ func cleanIpAddress(ip string) (string, error) {
40 return anonIp, err
41 }
42
43-func cleanUrl(r *http.Request) (string, string) {
44+func cleanUrl(orig string) (string, string) {
45+ u, err := url.Parse(orig)
46+ if err != nil {
47+ return "", ""
48+ }
49+ return u.Host, u.Path
50+}
51+
52+func cleanUrlFromRequest(r *http.Request) (string, string) {
53 host := r.Header.Get("x-forwarded-host")
54 if host == "" {
55 host = r.URL.Host
56 }
57+ if host == "" {
58+ host = r.Host
59+ }
60 // we don't want query params in the url for security reasons
61 return host, r.URL.Path
62 }
63@@ -79,16 +96,35 @@ func cleanReferer(ref string) (string, error) {
64
65 var ErrAnalyticsDisabled = errors.New("owner does not have site analytics enabled")
66
67-func AnalyticsVisitFromRequest(r *http.Request, dbpool db.DB, userID string, secret string) (*db.AnalyticsVisits, error) {
68- if !dbpool.HasFeatureForUser(userID, "analytics") {
69- return nil, ErrAnalyticsDisabled
70+func AnalyticsVisitFromVisit(visit *db.AnalyticsVisits, dbpool db.DB, secret string) error {
71+ if !dbpool.HasFeatureForUser(visit.UserID, "analytics") {
72+ return ErrAnalyticsDisabled
73 }
74
75- err := trackableRequest(r)
76+ err := trackableUserAgent(visit.UserAgent)
77 if err != nil {
78- return nil, err
79+ return err
80+ }
81+
82+ ipAddress, err := cleanIpAddress(visit.IpAddress)
83+ if err != nil {
84+ return err
85 }
86+ visit.IpAddress = HmacString(secret, ipAddress)
87+ _, path := cleanUrl(visit.Path)
88+ visit.Path = path
89
90+ referer, err := cleanReferer(visit.Referer)
91+ if err != nil {
92+ return err
93+ }
94+ visit.Referer = referer
95+ visit.UserAgent = cleanUserAgent(visit.UserAgent)
96+
97+ return nil
98+}
99+
100+func ipFromRequest(r *http.Request) string {
101 // https://caddyserver.com/docs/caddyfile/directives/reverse_proxy#defaults
102 ipOrig := r.Header.Get("x-forwarded-for")
103 if ipOrig == "" {
104@@ -101,33 +137,48 @@ func AnalyticsVisitFromRequest(r *http.Request, dbpool db.DB, userID string, sec
105 ipOrig = sshCtx.RemoteAddr().String()
106 }
107 }
108- ipAddress, err := cleanIpAddress(ipOrig)
109- if err != nil {
110- return nil, err
111+
112+ return ipOrig
113+}
114+
115+func AnalyticsVisitFromRequest(r *http.Request, dbpool db.DB, userID string) (*db.AnalyticsVisits, error) {
116+ if !dbpool.HasFeatureForUser(userID, "analytics") {
117+ return nil, ErrAnalyticsDisabled
118 }
119- host, path := cleanUrl(r)
120
121- referer, err := cleanReferer(r.Referer())
122+ err := trackableRequest(r)
123 if err != nil {
124 return nil, err
125 }
126
127+ ipAddress := ipFromRequest(r)
128+ host, path := cleanUrlFromRequest(r)
129+
130 return &db.AnalyticsVisits{
131 UserID: userID,
132 Host: host,
133 Path: path,
134- IpAddress: HmacString(secret, ipAddress),
135- UserAgent: cleanUserAgent(r.UserAgent()),
136- Referer: referer,
137+ IpAddress: ipAddress,
138+ UserAgent: r.UserAgent(),
139+ Referer: r.Referer(),
140 Status: http.StatusOK,
141 }, nil
142 }
143
144 func AnalyticsCollect(ch chan *db.AnalyticsVisits, dbpool db.DB, logger *slog.Logger) {
145- for view := range ch {
146- err := dbpool.InsertVisit(view)
147+ info := NewPicoPipeClient()
148+ metricDrain := pubsub.NewRemoteClientWriter(info, logger, 0)
149+ go metricDrain.KeepAlive("pub metric-drain -b=false")
150+
151+ for visit := range ch {
152+ data, err := json.Marshal(visit)
153+ if err != nil {
154+ logger.Error("could not json marshall visit record", "err", err)
155+ }
156+ data = append(data, '\n')
157+ _, err = metricDrain.Write(data)
158 if err != nil {
159- logger.Error("could not insert view record", "err", err)
160+ logger.Error("could not write to metric-drain", "err", err)
161 }
162 }
163 }
1@@ -30,7 +30,6 @@ type PageData struct {
2 type ConfigSite struct {
3 Debug bool
4 SendgridKey string
5- Secret string
6 Domain string
7 Port string
8 PortOverride string
9@@ -279,13 +278,8 @@ func CreateLogger(space string) *slog.Logger {
10 newLogger := log
11
12 if strings.ToLower(utils.GetEnv("PICO_PIPE_ENABLED", "true")) == "true" {
13- newLog, err := pipeLogger.SendLogRegister(log, &pipeLogger.PubSubConnectionInfo{
14- RemoteHost: utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
15- KeyLocation: utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
16- KeyPassphrase: utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
17- RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
18- RemoteUser: utils.GetEnv("PICO_PIPE_USER", "pico"),
19- }, 100)
20+ conn := NewPicoPipeClient()
21+ newLog, err := pipeLogger.SendLogRegister(log, conn, 100)
22
23 if err == nil {
24 newLogger = newLog
1@@ -0,0 +1,16 @@
2+package shared
3+
4+import (
5+ "github.com/picosh/pubsub"
6+ "github.com/picosh/utils"
7+)
8+
9+func NewPicoPipeClient() *pubsub.RemoteClientInfo {
10+ return &pubsub.RemoteClientInfo{
11+ RemoteHost: utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
12+ KeyLocation: utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
13+ KeyPassphrase: utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
14+ RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
15+ RemoteUser: utils.GetEnv("PICO_PIPE_USER", "pico"),
16+ }
17+}
1@@ -0,0 +1 @@
2+ALTER TABLE analytics_visits ADD COLUMN namespace varchar(256);
+3,
-8
1@@ -11,6 +11,7 @@ import (
2 "github.com/charmbracelet/bubbles/viewport"
3 tea "github.com/charmbracelet/bubbletea"
4 "github.com/charmbracelet/lipgloss"
5+ "github.com/picosh/pico/shared"
6 "github.com/picosh/pico/tui/common"
7 "github.com/picosh/pico/tui/pages"
8 "github.com/picosh/utils"
9@@ -170,14 +171,8 @@ func (m Model) waitForActivity(sub chan map[string]any) tea.Cmd {
10
11 func (m Model) connectLogs(sub chan map[string]any) tea.Cmd {
12 return func() tea.Msg {
13- stdoutPipe, err := pipeLogger.ConnectToLogs(m.ctx, &pipeLogger.PubSubConnectionInfo{
14- RemoteHost: utils.GetEnv("PICO_PIPE_ENDPOINT", "pipe.pico.sh:22"),
15- KeyLocation: utils.GetEnv("PICO_PIPE_KEY", "ssh_data/term_info_ed25519"),
16- KeyPassphrase: utils.GetEnv("PICO_PIPE_PASSPHRASE", ""),
17- RemoteHostname: utils.GetEnv("PICO_PIPE_REMOTE_HOST", "pipe.pico.sh"),
18- RemoteUser: utils.GetEnv("PICO_PIPE_USER", "pico"),
19- })
20-
21+ conn := shared.NewPicoPipeClient()
22+ stdoutPipe, err := pipeLogger.ConnectToLogs(m.ctx, conn)
23 if err != nil {
24 return errMsg(err)
25 }