- commit
- dcfaddb
- parent
- 56ae22e
- author
- Eric Bower
- date
- 2024-10-03 04:14:44 +0000 UTC
chore(pubsub): update pubsub dep
6 files changed,
+61,
-69
M
go.mod
+2,
-2
1@@ -17,7 +17,6 @@ replace github.com/gdamore/tcell/v2 => github.com/delthas/tcell/v2 v2.4.1-0.2023
2 require (
3 git.sr.ht/~delthas/senpai v0.3.1-0.20240425235039-206be659439e
4 github.com/alecthomas/chroma/v2 v2.14.0
5- github.com/antoniomika/syncmap v1.0.0
6 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
7 github.com/charmbracelet/bubbles v0.18.0
8 github.com/charmbracelet/bubbletea v0.27.0
9@@ -37,7 +36,7 @@ require (
10 github.com/muesli/termenv v0.15.3-0.20240509142007-81b8f94111d5
11 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
12 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a
13- github.com/picosh/pubsub v0.0.0-20241002213216-242caa6020e9
14+ github.com/picosh/pubsub v0.0.0-20241003035535-c319ff16a4ce
15 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc
16 github.com/picosh/tunkit v0.0.0-20240709033345-8315d4f3cd0e
17 github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06
18@@ -60,6 +59,7 @@ require (
19 github.com/PuerkitoBio/goquery v1.9.2 // indirect
20 github.com/andybalholm/cascadia v1.3.2 // indirect
21 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
22+ github.com/antoniomika/syncmap v1.0.0 // indirect
23 github.com/atotto/clipboard v0.1.4 // indirect
24 github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
25 github.com/aymerick/douceur v0.2.0 // indirect
M
go.sum
+2,
-2
1@@ -223,8 +223,8 @@ github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoO
2 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
3 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a h1:Cr1xODiyd/SjjBRtYA9VX6Do3D+w+DansQzkb4NGeyA=
4 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a/go.mod h1:VIkR1MZBvxSK2OO47jikxikAO/sKb/vTmXX5ZuYTIvo=
5-github.com/picosh/pubsub v0.0.0-20241002213216-242caa6020e9 h1:rm5eAdIC4nY8u9B6/C+lB5PYqWpTK0xGjId8Fl9Cg30=
6-github.com/picosh/pubsub v0.0.0-20241002213216-242caa6020e9/go.mod h1:iuxAenTRpwThFkOJNw5Sumgv0FewjceHrl/4cHKkMe4=
7+github.com/picosh/pubsub v0.0.0-20241003035535-c319ff16a4ce h1:mVcqbixMv4F33Aht4chMfIjyN/YZ80Hv+BHhA2sEb6Q=
8+github.com/picosh/pubsub v0.0.0-20241003035535-c319ff16a4ce/go.mod h1:iuxAenTRpwThFkOJNw5Sumgv0FewjceHrl/4cHKkMe4=
9 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc h1:IIsJuAFG2ju3cygKVKTIsYYZf21q5S3Dr1H4fGbfgJg=
10 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
11 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=
+2,
-2
1@@ -188,7 +188,7 @@ type CliHandler struct {
2 Logger *slog.Logger
3 Storage storage.StorageServe
4 RegistryUrl string
5- PubSub *psub.Cfg
6+ PubSub psub.PubSub
7 }
8
9 func WishMiddleware(handler *CliHandler) wish.Middleware {
10@@ -259,7 +259,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
11 opts.bail(err)
12 return
13 } else if cmd == "sub" {
14- err = pubsub.PubSub.Sub(sesh.Context(), uuid.NewString(), sesh, []*psub.Channel{
15+ err = pubsub.Sub(sesh.Context(), uuid.NewString(), sesh, []*psub.Channel{
16 psub.NewChannel(fmt.Sprintf("%s/%s", user.Name, repoName)),
17 }, false)
18
+3,
-9
1@@ -81,7 +81,7 @@ func (e *ErrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2 http.Error(w, e.Err.Error(), http.StatusInternalServerError)
3 }
4
5-func createServeMux(handler *CliHandler, pubsub *psub.Cfg) func(ctx ssh.Context) http.Handler {
6+func createServeMux(handler *CliHandler, pubsub psub.PubSub) func(ctx ssh.Context) http.Handler {
7 return func(ctx ssh.Context) http.Handler {
8 router := http.NewServeMux()
9
10@@ -230,7 +230,7 @@ func createServeMux(handler *CliHandler, pubsub *psub.Cfg) func(ctx ssh.Context)
11 )
12 handler.Logger.Info("sending event", "url", furl)
13
14- err := pubsub.PubSub.Pub(ctx, uuid.NewString(), bytes.NewBufferString(furl), []*psub.Channel{
15+ err := pubsub.Pub(ctx, uuid.NewString(), bytes.NewBufferString(furl), []*psub.Channel{
16 psub.NewChannel(fmt.Sprintf("%s/%s:%s", user.Name, img, tag)),
17 })
18
19@@ -276,13 +276,7 @@ func StartSshServer() {
20 panic(err)
21 }
22
23- pubsub := &psub.Cfg{
24- Logger: logger,
25- PubSub: &psub.PubSubMulticast{
26- Logger: logger,
27- },
28- }
29-
30+ pubsub := psub.NewMulticast(logger)
31 handler := &CliHandler{
32 Logger: logger,
33 DBPool: dbh,
+51,
-43
1@@ -59,13 +59,13 @@ func getUser(s ssh.Session, dbpool db.DB) (*db.User, error) {
2 return user, nil
3 }
4
5-// scope channel to user by prefixing name.
6-func toChannel(userName, name string) string {
7- return fmt.Sprintf("%s/%s", userName, name)
8+// scope topic to user by prefixing name.
9+func toTopic(userName, topic string) string {
10+ return fmt.Sprintf("%s/%s", userName, topic)
11 }
12
13-func toPublicChannel(name string) string {
14- return fmt.Sprintf("public/%s", name)
15+func toPublicTopic(topic string) string {
16+ return fmt.Sprintf("public/%s", topic)
17 }
18
19 func clientInfo(clients []*psub.Client, clientType string) string {
20@@ -85,18 +85,26 @@ func clientInfo(clients []*psub.Client, clientType string) string {
21 var helpStr = `Commands: [pub, sub, ls, pipe]
22
23 The simplest authenticated pubsub system. Send messages through
24-user-defined channels. Channels are private to the authenticated
25+user-defined topics. Topics are private to the authenticated
26 ssh user. The default pubsub model is multicast with bidirectional
27 blocking, meaning a publisher ("pub") will send its message to all
28 subscribers ("sub"). Further, both "pub" and "sub" will wait for
29 at least one event to be sent or received. Pipe ("pipe") allows
30 for bidirectional messages to be sent between any clients connected
31-to a pipe.`
32+to a pipe.
33+
34+Think of these different commands in terms of the direction the
35+data is being sent:
36+
37+- pub => writes to client
38+- sub => reads from client
39+- pipe => read and write between clients
40+`
41
42 type CliHandler struct {
43 DBPool db.DB
44 Logger *slog.Logger
45- PubSub *psub.Cfg
46+ PubSub psub.PubSub
47 Cfg *shared.ConfigSite
48 }
49
50@@ -136,15 +144,15 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
51 if cmd == "help" {
52 wish.Println(sesh, helpStr)
53 } else if cmd == "ls" {
54- channelFilter := fmt.Sprintf("%s/", user.Name)
55+ topicFilter := fmt.Sprintf("%s/", user.Name)
56 if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
57- channelFilter = ""
58+ topicFilter = ""
59 }
60
61 var channels []*psub.Channel
62
63- for channelID, channel := range pubsub.PubSub.GetChannels() {
64- if strings.HasPrefix(channelID, channelFilter) {
65+ for topic, channel := range pubsub.GetChannels() {
66+ if strings.HasPrefix(topic, topicFilter) {
67 channels = append(channels, channel)
68 }
69 }
70@@ -156,7 +164,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
71 if len(channels) > 0 {
72 outputData += "Channel Information\r\n"
73 for _, channel := range channels {
74- outputData += fmt.Sprintf("- %s:\r\n", channel.ID)
75+ outputData += fmt.Sprintf("- %s:\r\n", channel.Topic)
76 outputData += "\tClients:\r\n"
77
78 var pubs []*psub.Client
79@@ -182,26 +190,26 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
80 }
81 }
82
83- channelName := ""
84+ topic := ""
85 cmdArgs := args[1:]
86 if len(args) > 1 {
87- channelName = strings.TrimSpace(args[1])
88+ topic = strings.TrimSpace(args[1])
89 cmdArgs = args[2:]
90 }
91 logger.Info(
92 "pubsub middleware detected command",
93 "args", args,
94 "cmd", cmd,
95- "channelName", channelName,
96+ "topic", topic,
97 "cmdArgs", cmdArgs,
98 )
99
100 if cmd == "pub" {
101 pubCmd := flagSet("pub", sesh)
102 empty := pubCmd.Bool("e", false, "Send an empty message to subs")
103- public := pubCmd.Bool("p", false, "Anyone can sub to this channel")
104+ public := pubCmd.Bool("p", false, "Anyone can sub to this topic")
105 timeout := pubCmd.Duration("t", 30*24*time.Hour, "Timeout as a Go duration before cancelling the pub event. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is 30 days.")
106- if !flagCheck(pubCmd, channelName, cmdArgs) {
107+ if !flagCheck(pubCmd, topic, cmdArgs) {
108 return
109 }
110
111@@ -212,25 +220,25 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
112 rw = sesh
113 }
114
115- if channelName == "" {
116- channelName = uuid.NewString()
117+ if topic == "" {
118+ topic = uuid.NewString()
119 }
120- name := toChannel(user.Name, channelName)
121+ name := toTopic(user.Name, topic)
122 if *public {
123- name = toPublicChannel(channelName)
124+ name = toPublicTopic(topic)
125 }
126 wish.Printf(
127 sesh,
128 "subscribe to this channel:\n\tssh %s sub %s\n",
129 toSshCmd(handler.Cfg),
130- channelName,
131+ topic,
132 )
133
134 wish.Println(sesh, "sending msg ...")
135
136 count := 0
137- for channelID, channel := range pubsub.PubSub.GetChannels() {
138- if channelID == name {
139+ for topic, channel := range pubsub.GetChannels() {
140+ if topic == name {
141 for _, client := range channel.GetClients() {
142 if client.Direction == psub.ChannelDirectionOutput || client.Direction == psub.ChannelDirectionInputOutput {
143 count++
144@@ -263,8 +271,8 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
145 return
146 default:
147 count := 0
148- for channelID, channel := range pubsub.PubSub.GetChannels() {
149- if channelID == name {
150+ for topic, channel := range pubsub.GetChannels() {
151+ if topic == name {
152 for _, client := range channel.GetClients() {
153 if client.Direction == psub.ChannelDirectionOutput || client.Direction == psub.ChannelDirectionInputOutput {
154 count++
155@@ -290,7 +298,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
156 }
157 }
158
159- err = pubsub.PubSub.Pub(
160+ err = pubsub.Pub(
161 pubCtx,
162 fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
163 rw,
164@@ -305,19 +313,19 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
165 }
166 } else if cmd == "sub" {
167 pubCmd := flagSet("pub", sesh)
168- public := pubCmd.Bool("p", false, "Subscribe to a public channel")
169+ public := pubCmd.Bool("p", false, "Subscribe to a public topic")
170 keepAlive := pubCmd.Bool("k", false, "Keep the sub alive even after the pub as died")
171- if !flagCheck(pubCmd, channelName, cmdArgs) {
172+ if !flagCheck(pubCmd, topic, cmdArgs) {
173 return
174 }
175- channelName := channelName
176+ topic := topic
177
178- name := toChannel(user.Name, channelName)
179+ name := toTopic(user.Name, topic)
180 if *public {
181- name = toPublicChannel(channelName)
182+ name = toPublicTopic(topic)
183 }
184
185- err = pubsub.PubSub.Sub(
186+ err = pubsub.Sub(
187 ctx,
188 fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
189 sesh,
190@@ -332,29 +340,29 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
191 }
192 } else if cmd == "pipe" {
193 pipeCmd := flagSet("pipe", sesh)
194- public := pipeCmd.Bool("p", false, "Pipe to a public channel")
195+ public := pipeCmd.Bool("p", false, "Pipe to a public topic")
196 replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
197- if !flagCheck(pipeCmd, channelName, cmdArgs) {
198+ if !flagCheck(pipeCmd, topic, cmdArgs) {
199 return
200 }
201- isCreator := channelName == ""
202+ isCreator := topic == ""
203 if isCreator {
204- channelName = uuid.NewString()
205+ topic = uuid.NewString()
206 }
207- name := toChannel(user.Name, channelName)
208+ name := toTopic(user.Name, topic)
209 if *public {
210- name = toPublicChannel(channelName)
211+ name = toPublicTopic(topic)
212 }
213 if isCreator {
214 wish.Printf(
215 sesh,
216- "subscribe to this channel:\n\tssh %s sub %s\n",
217+ "subscribe to this topic:\n\tssh %s sub %s\n",
218 toSshCmd(handler.Cfg),
219- channelName,
220+ topic,
221 )
222 }
223
224- readErr, writeErr := pubsub.PubSub.Pipe(
225+ readErr, writeErr := pubsub.Pipe(
226 ctx,
227 fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
228 sesh,
+1,
-11
1@@ -8,7 +8,6 @@ import (
2 "syscall"
3 "time"
4
5- "github.com/antoniomika/syncmap"
6 "github.com/charmbracelet/promwish"
7 "github.com/charmbracelet/wish"
8 "github.com/picosh/pico/db/postgres"
9@@ -29,16 +28,7 @@ func StartSshServer() {
10
11 cfg.Port = port
12
13- pubsub := &psub.Cfg{
14- Logger: logger,
15- PubSub: &psub.PubSubMulticast{
16- Logger: logger,
17- Connector: &psub.BaseConnector{
18- Channels: syncmap.New[string, *psub.Channel](),
19- },
20- },
21- }
22-
23+ pubsub := psub.NewMulticast(logger)
24 handler := &CliHandler{
25 Logger: logger,
26 DBPool: dbh,