- commit
- 1ec8c47
- parent
- ddbd590
- author
- Antonio Mika
- date
- 2024-09-19 21:24:26 +0000 UTC
Added pipe support to send
4 files changed,
+79,
-21
M
go.mod
+1,
-1
1@@ -37,7 +37,7 @@ require (
2 github.com/muesli/termenv v0.15.3-0.20240509142007-81b8f94111d5
3 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
4 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a
5- github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2
6+ github.com/picosh/pubsub v0.0.0-20240919210825-ed408c5349aa
7 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc
8 github.com/picosh/tunkit v0.0.0-20240709033345-8315d4f3cd0e
9 github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06
M
go.sum
+4,
-0
1@@ -228,6 +228,10 @@ github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a h1:Cr1xODiyd/SjjBRtYA9
2 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a/go.mod h1:VIkR1MZBvxSK2OO47jikxikAO/sKb/vTmXX5ZuYTIvo=
3 github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2 h1:Em/eEiElW3OHKDLzchzJ7m8OOk+yJ8dgc7cH0d0c55Q=
4 github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
5+github.com/picosh/pubsub v0.0.0-20240919205849-b9230385df48 h1:KTG+pG98stHQPZclVkHmogSIxcXZyuMXY0/llTo+sVc=
6+github.com/picosh/pubsub v0.0.0-20240919205849-b9230385df48/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
7+github.com/picosh/pubsub v0.0.0-20240919210825-ed408c5349aa h1:zJ/xYTo2qYp1gdCqy4QC8ydXCn44LQVb5auk+uc3j3c=
8+github.com/picosh/pubsub v0.0.0-20240919210825-ed408c5349aa/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
9 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc h1:IIsJuAFG2ju3cygKVKTIsYYZf21q5S3Dr1H4fGbfgJg=
10 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
11 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=
+73,
-20
1@@ -67,14 +67,16 @@ func toPublicChannel(name string) string {
2 return fmt.Sprintf("public/%s", name)
3 }
4
5-var helpStr = `Commands: [pub, sub, ls]
6+var helpStr = `Commands: [pub, sub, ls, pipe]
7
8 The simplest authenticated pubsub system. Send messages through
9 user-defined channels. Channels are private to the authenticated
10 ssh user. The default pubsub model is multicast with bidirectional
11 blocking, meaning a publisher ("pub") will send its message to all
12 subscribers ("sub"). Further, both "pub" and "sub" will wait for
13-at least one event to be sent or received.`
14+at least one event to be sent or received. Pipe ("pipe") allows
15+for bidirectional messages to be sent between any clients connected
16+to a pipe.`
17
18 type CliHandler struct {
19 DBPool db.DB
20@@ -117,27 +119,45 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
21 }
22
23 channels := pubsub.PubSub.GetChannels(channelFilter)
24+ pipes := pubsub.PubSub.GetPipes(channelFilter)
25
26- if len(channels) == 0 {
27- wish.Println(sesh, "no pubsub channels found")
28+ if len(channels) == 0 && len(pipes) == 0 {
29+ wish.Println(sesh, "no pubsub channels or pipes found")
30 } else {
31- outputData := "Channel Information\r\n"
32- for _, channel := range channels {
33- outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
34- outputData += "\tPubs:\r\n"
35-
36- channel.Pubs.Range(func(I string, J *psub.Pub) bool {
37- outputData += fmt.Sprintf("\t- %s:\r\n", I)
38- return true
39- })
40-
41- outputData += "\tSubs:\r\n"
42-
43- channel.Subs.Range(func(I string, J *psub.Sub) bool {
44- outputData += fmt.Sprintf("\t- %s:\r\n", I)
45- return true
46- })
47+ var outputData string
48+ if len(channels) > 0 {
49+ outputData += "Channel Information\r\n"
50+ for _, channel := range channels {
51+ outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
52+ outputData += "\tPubs:\r\n"
53+
54+ channel.Pubs.Range(func(I string, J *psub.Pub) bool {
55+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
56+ return true
57+ })
58+
59+ outputData += "\tSubs:\r\n"
60+
61+ channel.Subs.Range(func(I string, J *psub.Sub) bool {
62+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
63+ return true
64+ })
65+ }
66 }
67+
68+ if len(pipes) > 0 {
69+ outputData += "Pipe Information\r\n"
70+ for _, pipe := range pipes {
71+ outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
72+ outputData += "\tClients:\r\n"
73+
74+ pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
75+ outputData += fmt.Sprintf("\t- %s:\r\n", I)
76+ return true
77+ })
78+ }
79+ }
80+
81 _, _ = sesh.Write([]byte(outputData))
82 }
83 }
84@@ -235,6 +255,39 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
85 if err != nil {
86 wish.Errorln(sesh, err)
87 }
88+ } else if cmd == "pipe" {
89+ pipeCmd := flagSet("pipe", sesh)
90+ public := pipeCmd.Bool("p", false, "Pipe to a public channel")
91+ replay := pipeCmd.Bool("r", false, "Replay messages to the client that sent it")
92+ if !flagCheck(pipeCmd, repoName, cmdArgs) {
93+ return
94+ }
95+ channelName := repoName
96+
97+ name := toChannel(user.Name, channelName)
98+ if *public {
99+ name = toPublicChannel(channelName)
100+ }
101+
102+ pipe := &psub.PipeClient{
103+ ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
104+ Done: make(chan struct{}),
105+ Data: make(chan psub.PipeMessage),
106+ ReadWriter: sesh,
107+ Replay: *replay,
108+ }
109+
110+ go func() {
111+ <-ctx.Done()
112+ pipe.Cleanup()
113+ }()
114+ readErr, writeErr := pubsub.PubSub.Pipe(name, pipe)
115+ if readErr != nil {
116+ wish.Errorln(sesh, readErr)
117+ }
118+ if writeErr != nil {
119+ wish.Errorln(sesh, writeErr)
120+ }
121 }
122
123 next(sesh)
+1,
-0
1@@ -32,6 +32,7 @@ func StartSshServer() {
2 PubSub: &psub.PubSubMulticast{
3 Logger: logger,
4 Channels: syncmap.New[string, *psub.Channel](),
5+ Pipes: syncmap.New[string, *psub.Pipe](),
6 },
7 }
8