- commit
- 9e5b256
- parent
- bfcbe7d
- author
- Eric Bower
- date
- 2024-09-24 03:34:48 +0000 UTC
feat(pubsub): if no channel is provided, randomly gen one
2 files changed,
+95,
-69
+92,
-69
1@@ -15,7 +15,6 @@ import (
2 "github.com/google/uuid"
3 "github.com/picosh/pico/db"
4 "github.com/picosh/pico/shared"
5- "github.com/picosh/pico/shared/storage"
6 psub "github.com/picosh/pubsub"
7 "github.com/picosh/send/send/utils"
8 )
9@@ -80,11 +79,18 @@ for bidirectional messages to be sent between any clients connected
10 to a pipe.`
11
12 type CliHandler struct {
13- DBPool db.DB
14- Logger *slog.Logger
15- Storage storage.StorageServe
16- RegistryUrl string
17- PubSub *psub.Cfg
18+ DBPool db.DB
19+ Logger *slog.Logger
20+ PubSub *psub.Cfg
21+ Cfg *shared.ConfigSite
22+}
23+
24+func toSshCmd(cfg *shared.ConfigSite) string {
25+ port := "22"
26+ if cfg.Port != "" {
27+ port = fmt.Sprintf("-p %s", cfg.Port)
28+ }
29+ return fmt.Sprintf("%s %s", port, cfg.Domain)
30 }
31
32 func WishMiddleware(handler *CliHandler) wish.Middleware {
33@@ -112,69 +118,69 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
34 }
35
36 cmd := strings.TrimSpace(args[0])
37- if len(args) == 1 {
38- if cmd == "help" {
39- wish.Println(sesh, helpStr)
40- } else if cmd == "ls" {
41- channelFilter := fmt.Sprintf("%s/", user.Name)
42- if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
43- channelFilter = ""
44- }
45+ if cmd == "help" {
46+ wish.Println(sesh, helpStr)
47+ } else if cmd == "ls" {
48+ channelFilter := fmt.Sprintf("%s/", user.Name)
49+ if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
50+ channelFilter = ""
51+ }
52
53- channels := pubsub.PubSub.GetChannels(channelFilter)
54- pipes := pubsub.PubSub.GetPipes(channelFilter)
55-
56- if len(channels) == 0 && len(pipes) == 0 {
57- wish.Println(sesh, "no pubsub channels or pipes found")
58- } else {
59- var outputData string
60- if len(channels) > 0 {
61- outputData += "Channel Information\r\n"
62- for _, channel := range channels {
63- outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
64- outputData += "\tPubs:\r\n"
65-
66- channel.Pubs.Range(func(I string, J *psub.Pub) bool {
67- outputData += fmt.Sprintf("\t- %s:\r\n", I)
68- return true
69- })
70-
71- outputData += "\tSubs:\r\n"
72-
73- channel.Subs.Range(func(I string, J *psub.Sub) bool {
74- outputData += fmt.Sprintf("\t- %s:\r\n", I)
75- return true
76- })
77- }
78- }
79+ channels := pubsub.PubSub.GetChannels(channelFilter)
80+ pipes := pubsub.PubSub.GetPipes(channelFilter)
81
82- if len(pipes) > 0 {
83- outputData += "Pipe Information\r\n"
84- for _, pipe := range pipes {
85- outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
86- outputData += "\tClients:\r\n"
87-
88- pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
89- outputData += fmt.Sprintf("\t- %s:\r\n", I)
90- return true
91- })
92- }
93+ if len(channels) == 0 && len(pipes) == 0 {
94+ wish.Println(sesh, "no pubsub channels or pipes found")
95+ } else {
96+ var outputData string
97+ if len(channels) > 0 {
98+ outputData += "Channel Information\r\n"
99+ for _, channel := range channels {
100+ outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
101+ outputData += "\tPubs:\r\n"
102+
103+ channel.Pubs.Range(func(I string, J *psub.Pub) bool {
104+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
105+ return true
106+ })
107+
108+ outputData += "\tSubs:\r\n"
109+
110+ channel.Subs.Range(func(I string, J *psub.Sub) bool {
111+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
112+ return true
113+ })
114 }
115+ }
116
117- _, _ = sesh.Write([]byte(outputData))
118+ if len(pipes) > 0 {
119+ outputData += "Pipe Information\r\n"
120+ for _, pipe := range pipes {
121+ outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
122+ outputData += "\tClients:\r\n"
123+
124+ pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
125+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
126+ return true
127+ })
128+ }
129 }
130+
131+ _, _ = sesh.Write([]byte(outputData))
132 }
133- next(sesh)
134- return
135 }
136
137- repoName := strings.TrimSpace(args[1])
138- cmdArgs := args[2:]
139+ channelName := ""
140+ cmdArgs := args[1:]
141+ if len(args) > 1 {
142+ channelName = strings.TrimSpace(args[1])
143+ cmdArgs = args[2:]
144+ }
145 logger.Info(
146 "imgs middleware detected command",
147 "args", args,
148 "cmd", cmd,
149- "repoName", repoName,
150+ "channelName", channelName,
151 "cmdArgs", cmdArgs,
152 )
153
154@@ -183,10 +189,9 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
155 empty := pubCmd.Bool("e", false, "Send an empty message to subs")
156 public := pubCmd.Bool("p", false, "Anyone can sub to this channel")
157 timeout := pubCmd.Duration("t", -1, "Timeout as a Go duration before cancelling the pub event. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is no timeout.")
158- if !flagCheck(pubCmd, repoName, cmdArgs) {
159+ if !flagCheck(pubCmd, channelName, cmdArgs) {
160 return
161 }
162- channelName := repoName
163
164 var reader io.Reader
165 if *empty {
166@@ -195,10 +200,19 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
167 reader = sesh
168 }
169
170+ if channelName == "" {
171+ channelName = uuid.NewString()
172+ }
173 name := toChannel(user.Name, channelName)
174 if *public {
175 name = toPublicChannel(channelName)
176 }
177+ wish.Printf(
178+ sesh,
179+ "subscribe to this channel:\n\tssh %s sub %s\n",
180+ toSshCmd(handler.Cfg),
181+ channelName,
182+ )
183
184 wish.Println(sesh, "sending msg ...")
185 pub := &psub.Pub{
186@@ -218,12 +232,11 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
187 }
188
189 tt := *timeout
190- str := "no subs found ... waiting"
191- if tt > 0 {
192- str += " " + tt.String()
193- }
194-
195 if count == 0 {
196+ str := "no subs found ... waiting"
197+ if tt > 0 {
198+ str += " " + tt.String()
199+ }
200 wish.Println(sesh, str)
201 }
202
203@@ -251,10 +264,10 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
204 } else if cmd == "sub" {
205 pubCmd := flagSet("pub", sesh)
206 public := pubCmd.Bool("p", false, "Subscribe to a public channel")
207- if !flagCheck(pubCmd, repoName, cmdArgs) {
208+ if !flagCheck(pubCmd, channelName, cmdArgs) {
209 return
210 }
211- channelName := repoName
212+ channelName := channelName
213
214 name := toChannel(user.Name, channelName)
215 if *public {
216@@ -280,15 +293,25 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
217 pipeCmd := flagSet("pipe", sesh)
218 public := pipeCmd.Bool("p", false, "Pipe to a public channel")
219 replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
220- if !flagCheck(pipeCmd, repoName, cmdArgs) {
221+ if !flagCheck(pipeCmd, channelName, cmdArgs) {
222 return
223 }
224- channelName := repoName
225-
226+ isCreator := channelName == ""
227+ if isCreator {
228+ channelName = uuid.NewString()
229+ }
230 name := toChannel(user.Name, channelName)
231 if *public {
232 name = toPublicChannel(channelName)
233 }
234+ if isCreator {
235+ wish.Printf(
236+ sesh,
237+ "subscribe to this channel:\n\tssh %s sub %s\n",
238+ toSshCmd(handler.Cfg),
239+ channelName,
240+ )
241+ }
242
243 pipe := &psub.PipeClient{
244 ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
+3,
-0
1@@ -27,6 +27,8 @@ func StartSshServer() {
2 dbh := postgres.NewDB(cfg.DbURL, cfg.Logger)
3 defer dbh.Close()
4
5+ cfg.Port = port
6+
7 pubsub := &psub.Cfg{
8 Logger: logger,
9 PubSub: &psub.PubSubMulticast{
10@@ -40,6 +42,7 @@ func StartSshServer() {
11 Logger: logger,
12 DBPool: dbh,
13 PubSub: pubsub,
14+ Cfg: cfg,
15 }
16
17 sshAuth := util.NewSshAuthHandler(dbh, logger, cfg)