- commit
- d218bbd
- parent
- 308b689
- author
- Eric Bower
- date
- 2024-05-29 03:27:01 +0000 UTC
feat(imgs): pubsub for docker image tag updates
1 files changed,
+60,
-14
+60,
-14
1@@ -3,7 +3,6 @@ package imgs
2 import (
3 "bytes"
4 "context"
5- "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "io"
9@@ -43,11 +42,11 @@ func setUserCtx(ctx ssh.Context, user *db.User) {
10
11 func AuthHandler(dbh db.DB, log *slog.Logger) func(ssh.Context, ssh.PublicKey) bool {
12 return func(ctx ssh.Context, key ssh.PublicKey) bool {
13- kb := base64.StdEncoding.EncodeToString(key.Marshal())
14- if kb == "" {
15+ kk, err := shared.KeyForKeyText(key)
16+ if err != nil {
17+ log.Error("cannot get pubkey", "err", err)
18 return false
19 }
20- kk := fmt.Sprintf("%s %s", key.Type(), kb)
21
22 user, err := dbh.FindUserForKey("", kk)
23 if err != nil {
24@@ -67,7 +66,7 @@ func AuthHandler(dbh db.DB, log *slog.Logger) func(ssh.Context, ssh.PublicKey) b
25 return false
26 }
27
28- return false
29+ return true
30 }
31 }
32
33@@ -80,7 +79,7 @@ func (e *ErrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
34 http.Error(w, e.Err.Error(), http.StatusInternalServerError)
35 }
36
37-func createServeMux(handler *CliHandler) func(ctx ssh.Context) http.Handler {
38+func createServeMux(handler *CliHandler, pubsub *ptun.PubSubHandler) func(ctx ssh.Context) http.Handler {
39 return func(ctx ssh.Context) http.Handler {
40 router := http.NewServeMux()
41
42@@ -90,6 +89,12 @@ func createServeMux(handler *CliHandler) func(ctx ssh.Context) http.Handler {
43 slug = user.Name
44 }
45
46+ pubkeys, err := handler.DBPool.FindKeysForUser(user)
47+ if err != nil {
48+ handler.Logger.Error("cant get pubkeys for user", "err", err)
49+ return router
50+ }
51+
52 proxy := httputil.NewSingleHostReverseProxy(&url.URL{
53 Scheme: "http",
54 Host: handler.RegistryUrl,
55@@ -98,7 +103,7 @@ func createServeMux(handler *CliHandler) func(ctx ssh.Context) http.Handler {
56 oldDirector := proxy.Director
57
58 proxy.Director = func(r *http.Request) {
59- log.Printf("%+v", r)
60+ handler.Logger.Info("director", "request", r)
61 oldDirector(r)
62
63 if strings.HasSuffix(r.URL.Path, "_catalog") || r.URL.Path == "/v2" || r.URL.Path == "/v2/" {
64@@ -125,12 +130,10 @@ func createServeMux(handler *CliHandler) func(ctx ssh.Context) http.Handler {
65
66 r.URL.RawQuery = query.Encode()
67 }
68-
69- log.Printf("%+v", r)
70 }
71
72 proxy.ModifyResponse = func(r *http.Response) error {
73- log.Printf("%+v", r)
74+ handler.Logger.Info("modify", "request", r)
75 shared.CorsHeaders(r.Header)
76
77 if r.Request.Method == http.MethodGet && strings.HasSuffix(r.Request.URL.Path, "_catalog") {
78@@ -216,6 +219,46 @@ func createServeMux(handler *CliHandler) func(ctx ssh.Context) http.Handler {
79 }
80 }
81
82+ if r.Request.Method == http.MethodPut && strings.Contains(r.Request.URL.Path, "/manifests/") {
83+ digest := r.Header.Get("Docker-Content-Digest")
84+ forwards := pubsub.GetForwards()
85+ for _, rf := range forwards {
86+ ck, err := shared.KeyForKeyText(rf.Pubkey)
87+ if err != nil {
88+ continue
89+ }
90+ found := false
91+ for _, pk := range pubkeys {
92+ if pk.Key == ck {
93+ found = true
94+ }
95+ }
96+ // event corresponds to a different user, skip
97+ if !found {
98+ continue
99+ }
100+
101+ // [ ]/v2/erock/alpine/manifests/latest
102+ splitPath := strings.Split(r.Request.URL.Path, "/")
103+ img := splitPath[3]
104+ tag := splitPath[5]
105+
106+ addr := rf.Listener.Addr()
107+ furl := fmt.Sprintf(
108+ "http://%s?digest=%s&image=%s&tag=%s",
109+ addr.String(),
110+ url.QueryEscape(digest),
111+ img,
112+ tag,
113+ )
114+ handler.Logger.Info("sending event", "url", furl)
115+ _, err = http.Get(furl)
116+ if err != nil {
117+ handler.Logger.Error("could not make request to pubsub", "err", err)
118+ }
119+ }
120+ }
121+
122 locationHeader := r.Header.Get("location")
123 if strings.Contains(locationHeader, fmt.Sprintf("/v2/%s", slug)) {
124 r.Header.Set("location", strings.ReplaceAll(locationHeader, fmt.Sprintf("/v2/%s", slug), "/v2"))
125@@ -240,12 +283,13 @@ func StartSshServer() {
126 port = "2222"
127 }
128 dbUrl := os.Getenv("DATABASE_URL")
129- registryUrl := shared.GetEnv("REGISTRY_URL", "registry:5000")
130- minioUrl := shared.GetEnv("MINIO_URL", "")
131+ registryUrl := shared.GetEnv("REGISTRY_URL", "0.0.0.0:5000")
132+ minioUrl := shared.GetEnv("MINIO_URL", "http://0.0.0.0:9000")
133 minioUser := shared.GetEnv("MINIO_ROOT_USER", "")
134 minioPass := shared.GetEnv("MINIO_ROOT_PASSWORD", "")
135
136- logger := slog.Default()
137+ logger := shared.CreateLogger(false)
138+ logger.Info("bootup", "registry", registryUrl, "minio", minioUrl)
139 dbh := postgres.NewDB(dbUrl, logger)
140 st, err := storage.NewStorageMinio(minioUrl, minioUser, minioPass)
141 if err != nil {
142@@ -259,14 +303,16 @@ func StartSshServer() {
143 RegistryUrl: registryUrl,
144 }
145
146+ pubsub := ptun.NewPubSubHandler(logger)
147 s, err := wish.NewServer(
148 wish.WithAddress(fmt.Sprintf("%s:%s", host, port)),
149 wish.WithHostKeyPath("ssh_data/term_info_ed25519"),
150 wish.WithPublicKeyAuth(AuthHandler(dbh, logger)),
151 wish.WithMiddleware(WishMiddleware(handler)),
152 ptun.WithWebTunnel(
153- ptun.NewWebTunnelHandler(createServeMux(handler), logger),
154+ ptun.NewWebTunnelHandler(createServeMux(handler, pubsub), logger),
155 ),
156+ ptun.WithPubSub(pubsub),
157 )
158
159 if err != nil {