repos / pico

pico services - prose.sh, pastes.sh, imgs.sh, feeds.sh, pgs.sh
git clone https://github.com/picosh/pico.git

commit
79f17fd
parent
215a842
author
Antonio Mika
date
2024-10-09 19:24:58 +0000 UTC
Add public key comment to pubsub ls
3 files changed,  +72, -21
M go.mod
M go.mod
+1, -1
 1@@ -17,6 +17,7 @@ replace github.com/gdamore/tcell/v2 => github.com/delthas/tcell/v2 v2.4.1-0.2023
 2 require (
 3 	git.sr.ht/~delthas/senpai v0.3.1-0.20240425235039-206be659439e
 4 	github.com/alecthomas/chroma/v2 v2.14.0
 5+	github.com/antoniomika/syncmap v1.0.0
 6 	github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
 7 	github.com/charmbracelet/bubbles v0.18.0
 8 	github.com/charmbracelet/bubbletea v1.1.1
 9@@ -60,7 +61,6 @@ require (
10 	github.com/PuerkitoBio/goquery v1.9.2 // indirect
11 	github.com/andybalholm/cascadia v1.3.2 // indirect
12 	github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
13-	github.com/antoniomika/syncmap v1.0.0 // indirect
14 	github.com/atotto/clipboard v0.1.4 // indirect
15 	github.com/aws/aws-sdk-go-v2 v1.32.1 // indirect
16 	github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
M pubsub/cli.go
+65, -16
  1@@ -7,10 +7,12 @@ import (
  2 	"fmt"
  3 	"io"
  4 	"log/slog"
  5+	"slices"
  6 	"strings"
  7 	"text/tabwriter"
  8 	"time"
  9 
 10+	"github.com/antoniomika/syncmap"
 11 	"github.com/charmbracelet/ssh"
 12 	"github.com/charmbracelet/wish"
 13 	"github.com/google/uuid"
 14@@ -79,10 +81,10 @@ func clientInfo(clients []*psub.Client, clientType string) string {
 15 		return ""
 16 	}
 17 
 18-	outputData := fmt.Sprintf("\t%s:\r\n", clientType)
 19+	outputData := fmt.Sprintf("    %s:\r\n", clientType)
 20 
 21 	for _, client := range clients {
 22-		outputData += fmt.Sprintf("\t- %s\r\n", client.ID)
 23+		outputData += fmt.Sprintf("    - %s\r\n", client.ID)
 24 	}
 25 
 26 	return outputData
 27@@ -110,10 +112,11 @@ data is being sent:
 28 }
 29 
 30 type CliHandler struct {
 31-	DBPool db.DB
 32-	Logger *slog.Logger
 33-	PubSub psub.PubSub
 34-	Cfg    *shared.ConfigSite
 35+	DBPool  db.DB
 36+	Logger  *slog.Logger
 37+	PubSub  psub.PubSub
 38+	Cfg     *shared.ConfigSite
 39+	Waiters *syncmap.Map[string, []string]
 40 }
 41 
 42 func toSshCmd(cfg *shared.ConfigSite) string {
 43@@ -162,6 +165,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
 44 				}
 45 
 46 				var channels []*psub.Channel
 47+				waitingChannels := map[string][]string{}
 48 
 49 				for topic, channel := range pubsub.GetChannels() {
 50 					if strings.HasPrefix(topic, topicFilter) {
 51@@ -169,15 +173,21 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
 52 					}
 53 				}
 54 
 55-				if len(channels) == 0 {
 56+				for channel, clients := range handler.Waiters.Range {
 57+					if strings.HasPrefix(channel, topicFilter) {
 58+						waitingChannels[channel] = clients
 59+					}
 60+				}
 61+
 62+				if len(channels) == 0 && len(waitingChannels) == 0 {
 63 					wish.Println(sesh, "no pubsub channels found")
 64 				} else {
 65 					var outputData string
 66-					if len(channels) > 0 {
 67+					if len(channels) > 0 || len(waitingChannels) > 0 {
 68 						outputData += "Channel Information\r\n"
 69 						for _, channel := range channels {
 70 							outputData += fmt.Sprintf("- %s:\r\n", channel.Topic)
 71-							outputData += "\tClients:\r\n"
 72+							outputData += "  Clients:\r\n"
 73 
 74 							var pubs []*psub.Client
 75 							var subs []*psub.Client
 76@@ -196,6 +206,15 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
 77 							outputData += clientInfo(subs, "Subs")
 78 							outputData += clientInfo(pipes, "Pipes")
 79 						}
 80+
 81+						for waitingChannel, channelPubs := range waitingChannels {
 82+							outputData += fmt.Sprintf("- %s:\r\n", waitingChannel)
 83+							outputData += "  Clients:\r\n"
 84+							outputData += fmt.Sprintf("    %s:\r\n", "Waiting Pubs")
 85+							for _, client := range channelPubs {
 86+								outputData += fmt.Sprintf("    - %s\r\n", client)
 87+							}
 88+						}
 89 					}
 90 
 91 					_, _ = sesh.Write([]byte(outputData))
 92@@ -220,6 +239,14 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
 93 				"cmdArgs", cmdArgs,
 94 			)
 95 
 96+			userName := user.Name
 97+
 98+			if user.PublicKey.Name != "" {
 99+				userName += fmt.Sprintf("-%s", user.PublicKey.Name)
100+			}
101+
102+			clientID := fmt.Sprintf("%s (%s@%s)", uuid.NewString(), userName, sesh.RemoteAddr().String())
103+
104 			if cmd == "pub" {
105 				pubCmd := flagSet("pub", sesh)
106 				empty := pubCmd.Bool("e", false, "Send an empty message to subs")
107@@ -270,14 +297,12 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
108 
109 				wish.Printf(
110 					sesh,
111-					"subscribe to this channel:\n\tssh %s sub %s%s\n",
112+					"subscribe to this channel:\n  ssh %s sub %s%s\n",
113 					toSshCmd(handler.Cfg),
114 					msgFlag,
115 					topic,
116 				)
117 
118-				wish.Println(sesh, "sending msg ...")
119-
120 				var pubCtx context.Context = ctx
121 
122 				if *block {
123@@ -295,6 +320,9 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
124 
125 					tt := *timeout
126 					if count == 0 {
127+						currentWaiters, _ := handler.Waiters.LoadOrStore(name, nil)
128+						handler.Waiters.Store(name, append(currentWaiters, clientID))
129+
130 						termMsg := "no subs found ... waiting"
131 						if tt > 0 {
132 							termMsg += " " + tt.String()
133@@ -335,16 +363,37 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
134 
135 						select {
136 						case <-ready:
137+						case <-ctx.Done():
138 						case <-time.After(tt):
139 							cancelFunc()
140 							wish.Fatalln(sesh, "timeout reached, exiting ...")
141 						}
142+
143+						newWaiters, _ := handler.Waiters.LoadOrStore(name, nil)
144+						newWaiters = slices.DeleteFunc(newWaiters, func(cl string) bool {
145+							return cl == clientID
146+						})
147+						handler.Waiters.Store(name, newWaiters)
148+
149+						var toDelete []string
150+
151+						for channel, clients := range handler.Waiters.Range {
152+							if len(clients) == 0 {
153+								toDelete = append(toDelete, channel)
154+							}
155+						}
156+
157+						for _, channel := range toDelete {
158+							handler.Waiters.Delete(channel)
159+						}
160 					}
161 				}
162 
163+				wish.Println(sesh, "sending msg ...")
164+
165 				err = pubsub.Pub(
166 					pubCtx,
167-					fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
168+					clientID,
169 					rw,
170 					[]*psub.Channel{
171 						psub.NewChannel(name),
172@@ -389,7 +438,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
173 
174 				err = pubsub.Sub(
175 					ctx,
176-					fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
177+					clientID,
178 					sesh,
179 					[]*psub.Channel{
180 						psub.NewChannel(name),
181@@ -441,7 +490,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
182 				if isCreator {
183 					wish.Printf(
184 						sesh,
185-						"subscribe to this topic:\n\tssh %s sub %s%s\n",
186+						"subscribe to this topic:\n  ssh %s sub %s%s\n",
187 						toSshCmd(handler.Cfg),
188 						flagMsg,
189 						topic,
190@@ -450,7 +499,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
191 
192 				readErr, writeErr := pubsub.Pipe(
193 					ctx,
194-					fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
195+					clientID,
196 					sesh,
197 					[]*psub.Channel{
198 						psub.NewChannel(name),
M pubsub/ssh.go
+6, -4
 1@@ -8,6 +8,7 @@ import (
 2 	"syscall"
 3 	"time"
 4 
 5+	"github.com/antoniomika/syncmap"
 6 	"github.com/charmbracelet/promwish"
 7 	"github.com/charmbracelet/wish"
 8 	"github.com/picosh/pico/db/postgres"
 9@@ -32,10 +33,11 @@ func StartSshServer() {
10 
11 	pubsub := psub.NewMulticast(logger)
12 	handler := &CliHandler{
13-		Logger: logger,
14-		DBPool: dbh,
15-		PubSub: pubsub,
16-		Cfg:    cfg,
17+		Logger:  logger,
18+		DBPool:  dbh,
19+		PubSub:  pubsub,
20+		Cfg:     cfg,
21+		Waiters: syncmap.New[string, []string](),
22 	}
23 
24 	sshAuth := util.NewSshAuthHandler(dbh, logger, cfg)