- commit
- ddbd590
- parent
- 11beaab
- author
- Antonio Mika
- date
- 2024-09-18 14:41:16 +0000 UTC
Merge pull request #140 from picosh/am/pubsub-clean Alternate pubsub implementation
9 files changed,
+83,
-79
+14,
-8
1@@ -1,4 +1,4 @@
2-FROM --platform=$BUILDPLATFORM golang:1.22 as builder-deps
3+FROM --platform=$BUILDPLATFORM golang:1.22 AS builder-deps
4 LABEL maintainer="Pico Maintainers <hello@pico.sh>"
5
6 WORKDIR /app
7@@ -8,9 +8,11 @@ RUN apt-get install -y git ca-certificates
8
9 COPY go.* ./
10
11-RUN go mod download
12+RUN --mount=type=cache,target=/go/pkg/,rw \
13+ --mount=type=cache,target=/root/.cache/,rw \
14+ go mod download
15
16-FROM builder-deps as builder-web
17+FROM builder-deps AS builder-web
18
19 COPY . .
20
21@@ -23,9 +25,11 @@ ENV LDFLAGS="-s -w"
22
23 ENV GOOS=${TARGETOS} GOARCH=${TARGETARCH}
24
25-RUN go build -ldflags "$LDFLAGS" -o /go/bin/${APP}-web ./cmd/${APP}/web
26+RUN --mount=type=cache,target=/go/pkg/,rw \
27+ --mount=type=cache,target=/root/.cache/,rw \
28+ go build -ldflags "$LDFLAGS" -o /go/bin/${APP}-web ./cmd/${APP}/web
29
30-FROM builder-deps as builder-ssh
31+FROM builder-deps AS builder-ssh
32
33 COPY . .
34
35@@ -38,9 +42,11 @@ ENV LDFLAGS="-s -w"
36
37 ENV GOOS=${TARGETOS} GOARCH=${TARGETARCH}
38
39-RUN go build -ldflags "$LDFLAGS" -o /go/bin/${APP}-ssh ./cmd/${APP}/ssh
40+RUN --mount=type=cache,target=/go/pkg/,rw \
41+ --mount=type=cache,target=/root/.cache/,rw \
42+ go build -ldflags "$LDFLAGS" -o /go/bin/${APP}-ssh ./cmd/${APP}/ssh
43
44-FROM scratch as release-web
45+FROM scratch AS release-web
46
47 WORKDIR /app
48
49@@ -53,7 +59,7 @@ COPY --from=builder-web /app/${APP}/public ./${APP}/public
50
51 ENTRYPOINT ["/app/web"]
52
53-FROM scratch as release-ssh
54+FROM scratch AS release-ssh
55
56 WORKDIR /app
57 ENV TERM="xterm-256color"
+1,
-1
1@@ -1,4 +1,4 @@
2-FROM golang:1.22-alpine as builder
3+FROM golang:1.22-alpine AS builder
4
5 WORKDIR /app
6
+1,
-1
1@@ -57,7 +57,7 @@ services:
2 pubsub-ssh:
3 build:
4 args:
5- APP:pubsub
6+ APP: pubsub
7 target: release-ssh
8 env_file:
9 - .env.example
M
go.mod
+2,
-1
1@@ -17,6 +17,7 @@ replace github.com/gdamore/tcell/v2 => github.com/delthas/tcell/v2 v2.4.1-0.2023
2 require (
3 git.sr.ht/~delthas/senpai v0.3.1-0.20240425235039-206be659439e
4 github.com/alecthomas/chroma/v2 v2.14.0
5+ github.com/antoniomika/syncmap v1.0.0
6 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
7 github.com/charmbracelet/bubbles v0.18.0
8 github.com/charmbracelet/bubbletea v0.27.0
9@@ -36,7 +37,7 @@ require (
10 github.com/muesli/termenv v0.15.3-0.20240509142007-81b8f94111d5
11 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
12 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a
13- github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b
14+ github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2
15 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc
16 github.com/picosh/tunkit v0.0.0-20240709033345-8315d4f3cd0e
17 github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06
M
go.sum
+4,
-2
1@@ -17,6 +17,8 @@ github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsVi
2 github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU=
3 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
4 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
5+github.com/antoniomika/syncmap v1.0.0 h1:iFSfbQFQOvHZILFZF+hqWosO0no+W9+uF4y2VEyMKWU=
6+github.com/antoniomika/syncmap v1.0.0/go.mod h1:fK2829foEYnO4riNfyUn0SHQZt4ue3DStYjGU+sJj38=
7 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA=
8 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
9 github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
10@@ -224,8 +226,8 @@ github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoO
11 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
12 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a h1:Cr1xODiyd/SjjBRtYA9VX6Do3D+w+DansQzkb4NGeyA=
13 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a/go.mod h1:VIkR1MZBvxSK2OO47jikxikAO/sKb/vTmXX5ZuYTIvo=
14-github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b h1:/gGhT8y9rnrv8K9ZJKZYzdWvZcnazl8NGE1DGNrD8HU=
15-github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b/go.mod h1:FKC8uot+40iXmuDzTfbxYDG5PIc3ghwkmP2iItBKH0I=
16+github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2 h1:Em/eEiElW3OHKDLzchzJ7m8OOk+yJ8dgc7cH0d0c55Q=
17+github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
18 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc h1:IIsJuAFG2ju3cygKVKTIsYYZf21q5S3Dr1H4fGbfgJg=
19 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
20 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=
+4,
-3
1@@ -259,12 +259,13 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
2 opts.bail(err)
3 return
4 } else if cmd == "sub" {
5- err = pubsub.PubSub.Sub(&psub.Subscriber{
6+ err = pubsub.PubSub.Sub(fmt.Sprintf("%s/%s", user.Name, repoName), &psub.Sub{
7 ID: uuid.NewString(),
8- Name: fmt.Sprintf("%s@%s", user.Name, repoName),
9 Writer: sesh,
10- Chan: make(chan error),
11+ Done: make(chan struct{}),
12+ Data: make(chan []byte),
13 })
14+
15 if err != nil {
16 wish.Errorln(sesh, err)
17 }
+4,
-2
1@@ -20,6 +20,7 @@ import (
2
3 "github.com/charmbracelet/ssh"
4 "github.com/charmbracelet/wish"
5+ "github.com/google/uuid"
6 "github.com/picosh/pico/db"
7 "github.com/picosh/pico/db/postgres"
8 "github.com/picosh/pico/shared"
9@@ -229,8 +230,9 @@ func createServeMux(handler *CliHandler, pubsub *psub.Cfg) func(ctx ssh.Context)
10 )
11 handler.Logger.Info("sending event", "url", furl)
12
13- err := pubsub.PubSub.Pub(&psub.Msg{
14- Name: fmt.Sprintf("%s@%s:%s", user.Name, img, tag),
15+ err := pubsub.PubSub.Pub(fmt.Sprintf("%s@%s:%s", user.Name, img, tag), &psub.Pub{
16+ ID: uuid.NewString(),
17+ Done: make(chan struct{}),
18 Reader: strings.NewReader(furl),
19 })
20
+50,
-59
1@@ -1,6 +1,7 @@
2 package pubsub
3
4 import (
5+ "bytes"
6 "flag"
7 "fmt"
8 "io"
9@@ -59,24 +60,11 @@ func getUser(s ssh.Session, dbpool db.DB) (*db.User, error) {
10
11 // scope channel to user by prefixing name.
12 func toChannel(userName, name string) string {
13- return fmt.Sprintf("%s@%s", userName, name)
14+ return fmt.Sprintf("%s/%s", userName, name)
15 }
16
17 func toPublicChannel(name string) string {
18- return fmt.Sprintf("public@%s", name)
19-}
20-
21-// extract user and scoped channel from channel.
22-func fromChannel(channel string) (string, string) {
23- sp := strings.SplitN(channel, "@", 2)
24- ln := len(sp)
25- if ln == 0 {
26- return "", ""
27- }
28- if ln == 1 {
29- return "", ""
30- }
31- return sp[0], sp[1]
32+ return fmt.Sprintf("public/%s", name)
33 }
34
35 var helpStr = `Commands: [pub, sub, ls]
36@@ -123,26 +111,34 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
37 if cmd == "help" {
38 wish.Println(sesh, helpStr)
39 } else if cmd == "ls" {
40- subs := pubsub.PubSub.GetSubs()
41+ channelFilter := fmt.Sprintf("%s/", user.Name)
42+ if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
43+ channelFilter = ""
44+ }
45+
46+ channels := pubsub.PubSub.GetChannels(channelFilter)
47
48- if len(subs) == 0 {
49- wish.Println(sesh, "no subs found")
50+ if len(channels) == 0 {
51+ wish.Println(sesh, "no pubsub channels found")
52 } else {
53- writer := NewTabWriter(sesh)
54- fmt.Fprintln(writer, "Channel\tID")
55- for _, sub := range subs {
56- userName, _ := fromChannel(sub.Name)
57- if userName != "public" && userName != user.Name {
58- continue
59- }
60-
61- fmt.Fprintf(
62- writer,
63- "%s\t%s\n",
64- sub.Name, sub.ID,
65- )
66+ outputData := "Channel Information\r\n"
67+ for _, channel := range channels {
68+ outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
69+ outputData += "\tPubs:\r\n"
70+
71+ channel.Pubs.Range(func(I string, J *psub.Pub) bool {
72+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
73+ return true
74+ })
75+
76+ outputData += "\tSubs:\r\n"
77+
78+ channel.Subs.Range(func(I string, J *psub.Sub) bool {
79+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
80+ return true
81+ })
82 }
83- writer.Flush()
84+ _, _ = sesh.Write([]byte(outputData))
85 }
86 }
87 next(sesh)
88@@ -170,7 +166,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
89
90 var reader io.Reader
91 if *empty {
92- reader = strings.NewReader("")
93+ reader = bytes.NewReader(make([]byte, 1))
94 } else {
95 reader = sesh
96 }
97@@ -181,34 +177,32 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
98 }
99
100 wish.Println(sesh, "sending msg ...")
101- msg := &psub.Msg{
102- Name: name,
103+ pub := &psub.Pub{
104+ ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
105+ Done: make(chan struct{}),
106 Reader: reader,
107 }
108
109- // hacky: we want to notify when no subs are found so
110- // we duplicate some logic for now
111- subs := pubsub.PubSub.GetSubs()
112- found := false
113- for _, sub := range subs {
114- if pubsub.PubSub.PubMatcher(msg, sub) {
115- found = true
116- break
117- }
118+ count := 0
119+ channelInfo := pubsub.PubSub.GetChannel(name)
120+
121+ if channelInfo != nil {
122+ channelInfo.Subs.Range(func(I string, J *psub.Sub) bool {
123+ count++
124+ return true
125+ })
126 }
127- if !found {
128+
129+ if count == 0 {
130 wish.Println(sesh, "no subs found ... waiting")
131 }
132
133 go func() {
134 <-ctx.Done()
135- err := pubsub.PubSub.UnPub(msg)
136- if err != nil {
137- wish.Errorln(sesh, err)
138- }
139+ pub.Cleanup()
140 }()
141
142- err = pubsub.PubSub.Pub(msg)
143+ err = pubsub.PubSub.Pub(name, pub)
144 wish.Println(sesh, "msg sent!")
145 if err != nil {
146 wish.Errorln(sesh, err)
147@@ -226,21 +220,18 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
148 name = toPublicChannel(channelName)
149 }
150
151- sub := &psub.Subscriber{
152- ID: uuid.NewString(),
153- Name: name,
154+ sub := &psub.Sub{
155+ ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
156 Writer: sesh,
157- Chan: make(chan error),
158+ Done: make(chan struct{}),
159+ Data: make(chan []byte),
160 }
161
162 go func() {
163 <-ctx.Done()
164- err := pubsub.PubSub.UnSub(sub)
165- if err != nil {
166- wish.Errorln(sesh, err)
167- }
168+ sub.Cleanup()
169 }()
170- err = pubsub.PubSub.Sub(sub)
171+ err = pubsub.PubSub.Sub(name, sub)
172 if err != nil {
173 wish.Errorln(sesh, err)
174 }
+3,
-2
1@@ -8,6 +8,7 @@ import (
2 "syscall"
3 "time"
4
5+ "github.com/antoniomika/syncmap"
6 "github.com/charmbracelet/promwish"
7 "github.com/charmbracelet/wish"
8 "github.com/picosh/pico/db/postgres"
9@@ -29,8 +30,8 @@ func StartSshServer() {
10 pubsub := &psub.Cfg{
11 Logger: logger,
12 PubSub: &psub.PubSubMulticast{
13- Logger: logger,
14- Chan: make(chan *psub.Subscriber),
15+ Logger: logger,
16+ Channels: syncmap.New[string, *psub.Channel](),
17 },
18 }
19