- commit
- 9b9e1db
- parent
- 931fca5
- author
- Eric Bower
- date
- 2024-12-17 20:41:56 +0000 UTC
refactor(auth): send caddy access logs directly to metric-drain This change removes the need to hop from `container-drain` to `metric-drain`. Instead we are going to process access logs directly inside of our `metric-drain` function.
2 files changed,
+29,
-86
+28,
-86
1@@ -21,7 +21,6 @@ import (
2 "github.com/picosh/pico/db/postgres"
3 "github.com/picosh/pico/shared"
4 "github.com/picosh/utils"
5- "github.com/picosh/utils/pipe"
6 "github.com/picosh/utils/pipe/metrics"
7 )
8
9@@ -583,35 +582,32 @@ func checkoutHandler() http.HandlerFunc {
10 }
11 }
12
13-type AccessLogReq struct {
14- RemoteIP string `json:"remote_ip"`
15- RemotePort string `json:"remote_port"`
16- ClientIP string `json:"client_ip"`
17- Method string `json:"method"`
18- Host string `json:"host"`
19- Uri string `json:"uri"`
20- Headers struct {
21- UserAgent []string `json:"User-Agent"`
22- Referer []string `json:"Referer"`
23- } `json:"headers"`
24- Tls struct {
25- ServerName string `json:"server_name"`
26- } `json:"tls"`
27+type AccessLog struct {
28+ Status int `json:"status"`
29+ ServerID string `json:"server_id"`
30+ Request AccessLogReq `json:"request"`
31+ RespHeaders AccessRespHeaders `json:"resp_headers"`
32 }
33
34-type RespHeaders struct {
35- ContentType []string `json:"Content-Type"`
36+type AccessLogReqHeaders struct {
37+ UserAgent []string `json:"User-Agent"`
38+ Referer []string `json:"Referer"`
39+}
40+
41+type AccessLogReq struct {
42+ ClientIP string `json:"client_ip"`
43+ Method string `json:"method"`
44+ Host string `json:"host"`
45+ Uri string `json:"uri"`
46+ Headers AccessLogReqHeaders `json:"headers"`
47 }
48
49-type CaddyAccessLog struct {
50- Request AccessLogReq `json:"request"`
51- Status int `json:"status"`
52- RespHeaders RespHeaders `json:"resp_headers"`
53- ServiceID string `json:"server_id"`
54+type AccessRespHeaders struct {
55+ ContentType []string `json:"Content-Type"`
56 }
57
58-func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.AnalyticsVisits, error) {
59- spaceRaw := strings.SplitN(access.ServiceID, ".", 2)
60+func deserializeCaddyAccessLog(dbpool db.DB, access *AccessLog) (*db.AnalyticsVisits, error) {
61+ spaceRaw := strings.SplitN(access.ServerID, ".", 2)
62 space := spaceRaw[0]
63 host := access.Request.Host
64 path := access.Request.Uri
65@@ -676,62 +672,8 @@ func deserializeCaddyAccessLog(dbpool db.DB, access *CaddyAccessLog) (*db.Analyt
66 }, nil
67 }
68
69-// this feels really stupid because i'm taking containter-drain,
70-// filtering it, and then sending it to metric-drain. The
71-// metricDrainSub function listens on the metric-drain and saves it.
72-// So why not just call the necessary functions to save the visit?
73-// We want to be able to use pipe as a debugging tool which means we
74-// can manually sub to `metric-drain` and have a nice clean output to view.
75-func containerDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger) {
76- info := shared.NewPicoPipeClient()
77- drain := pipe.NewReconnectReadWriteCloser(
78- ctx,
79- logger,
80- info,
81- "container drain",
82- "sub container-drain -k",
83- 100,
84- -1,
85- )
86-
87- send := pipe.NewReconnectReadWriteCloser(
88- ctx,
89- logger,
90- info,
91- "from container drain to metric drain",
92- "pub metric-drain -b=false",
93- 100,
94- -1,
95- )
96-
97- for {
98- scanner := bufio.NewScanner(drain)
99- for scanner.Scan() {
100- line := scanner.Text()
101- if strings.Contains(line, "http.log.access") {
102- clean := strings.TrimSpace(line)
103- visit, err := accessLogToVisit(dbpool, clean)
104- if err != nil {
105- logger.Debug("could not convert access log to a visit", "err", err)
106- continue
107- }
108- jso, err := json.Marshal(visit)
109- if err != nil {
110- logger.Error("could not marshal json of a visit", "err", err)
111- continue
112- }
113- jso = append(jso, []byte("\n")...)
114- _, err = send.Write(jso)
115- if err != nil {
116- logger.Error("could not write to metric-drain", "err", err)
117- }
118- }
119- }
120- }
121-}
122-
123 func accessLogToVisit(dbpool db.DB, line string) (*db.AnalyticsVisits, error) {
124- accessLog := CaddyAccessLog{}
125+ accessLog := AccessLog{}
126 err := json.Unmarshal([]byte(line), &accessLog)
127 if err != nil {
128 return nil, err
129@@ -753,14 +695,16 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
130 scanner := bufio.NewScanner(drain)
131 for scanner.Scan() {
132 line := scanner.Text()
133- visit := db.AnalyticsVisits{}
134- err := json.Unmarshal([]byte(line), &visit)
135+ clean := strings.TrimSpace(line)
136+
137+ visit, err := accessLogToVisit(dbpool, clean)
138 if err != nil {
139- logger.Info("could not unmarshal json", "err", err, "line", line)
140+ logger.Error("could not convert access log to a visit", "err", err)
141 continue
142 }
143+
144 logger.Info("received visit", "visit", visit)
145- err = shared.AnalyticsVisitFromVisit(&visit, dbpool, secret)
146+ err = shared.AnalyticsVisitFromVisit(visit, dbpool, secret)
147 if err != nil {
148 logger.Info("could not record analytics visit", "err", err)
149 continue
150@@ -772,7 +716,7 @@ func metricDrainSub(ctx context.Context, dbpool db.DB, logger *slog.Logger, secr
151 }
152
153 logger.Info("inserting visit", "visit", visit)
154- err = dbpool.InsertVisit(&visit)
155+ err = dbpool.InsertVisit(visit)
156 if err != nil {
157 logger.Error("could not insert visit record", "err", err)
158 }
159@@ -846,8 +790,6 @@ func StartApiServer() {
160
161 ctx := context.Background()
162
163- // convert container logs to access logs
164- go containerDrainSub(ctx, db, logger)
165 // gather metrics in the auth service
166 go metricDrainSub(ctx, db, logger, cfg.Secret)
167
+1,
-0
1@@ -30,6 +30,7 @@ services:
2 - all
3 pipemgr:
4 image: ghcr.io/picosh/pipemgr:latest
5+ command: /pipemgr -command "pub metric-drain -b=false"
6 restart: always
7 volumes:
8 - /var/run/docker.sock:/var/run/docker.sock:ro