- commit
- d1ea229
- parent
- c5deb2f
- author
- Antonio Mika
- date
- 2024-10-02 20:41:38 +0000 UTC
Pubsub refactor
6 files changed,
+131,
-106
M
go.mod
+2,
-3
1@@ -1,6 +1,6 @@
2 module github.com/picosh/pico
3
4-go 1.22.6
5+go 1.23.1
6
7 // replace github.com/picosh/tunkit => ../tunkit
8
9@@ -37,12 +37,11 @@ require (
10 github.com/muesli/termenv v0.15.3-0.20240509142007-81b8f94111d5
11 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
12 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a
13- github.com/picosh/pubsub v0.0.0-20240919210825-ed408c5349aa
14+ github.com/picosh/pubsub v0.0.0-20241002203517-3ff0e5744b44
15 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc
16 github.com/picosh/tunkit v0.0.0-20240709033345-8315d4f3cd0e
17 github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06
18 github.com/sendgrid/sendgrid-go v3.14.0+incompatible
19- github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3
20 github.com/simplesurance/go-ip-anonymizer v0.0.0-20200429124537-35a880f8e87d
21 github.com/x-way/crawlerdetect v0.2.21
22 github.com/yuin/goldmark v1.7.1
M
go.sum
+2,
-14
1@@ -160,11 +160,8 @@ github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuV
2 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
3 github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
4 github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
5-github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
6 github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
7 github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
8-github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
9-github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
10 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
11 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
12 github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
13@@ -226,12 +223,8 @@ github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc h1:bvcsoO
14 github.com/picosh/go-rsync-receiver v0.0.0-20240709135253-1daf4b12a9fc/go.mod h1:i0iR3W4GSm1PuvVxB9OH32E5jP+CYkVb2NQSe0JCtlo=
15 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a h1:Cr1xODiyd/SjjBRtYA9VX6Do3D+w+DansQzkb4NGeyA=
16 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a/go.mod h1:VIkR1MZBvxSK2OO47jikxikAO/sKb/vTmXX5ZuYTIvo=
17-github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2 h1:Em/eEiElW3OHKDLzchzJ7m8OOk+yJ8dgc7cH0d0c55Q=
18-github.com/picosh/pubsub v0.0.0-20240918141103-977bd6b4c9e2/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
19-github.com/picosh/pubsub v0.0.0-20240919205849-b9230385df48 h1:KTG+pG98stHQPZclVkHmogSIxcXZyuMXY0/llTo+sVc=
20-github.com/picosh/pubsub v0.0.0-20240919205849-b9230385df48/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
21-github.com/picosh/pubsub v0.0.0-20240919210825-ed408c5349aa h1:zJ/xYTo2qYp1gdCqy4QC8ydXCn44LQVb5auk+uc3j3c=
22-github.com/picosh/pubsub v0.0.0-20240919210825-ed408c5349aa/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
23+github.com/picosh/pubsub v0.0.0-20241002203517-3ff0e5744b44 h1:XNty1ovgt4r7WNydALjLIEKdlACit1X4qsYpktgzUlQ=
24+github.com/picosh/pubsub v0.0.0-20241002203517-3ff0e5744b44/go.mod h1:iuxAenTRpwThFkOJNw5Sumgv0FewjceHrl/4cHKkMe4=
25 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc h1:IIsJuAFG2ju3cygKVKTIsYYZf21q5S3Dr1H4fGbfgJg=
26 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
27 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=
28@@ -276,8 +269,6 @@ github.com/sendgrid/rest v2.6.9+incompatible h1:1EyIcsNdn9KIisLW50MKwmSRSK+ekuei
29 github.com/sendgrid/rest v2.6.9+incompatible/go.mod h1:kXX7q3jZtJXK5c5qK83bSGMdV6tsOE70KbHoqJls4lE=
30 github.com/sendgrid/sendgrid-go v3.14.0+incompatible h1:KDSasSTktAqMJCYClHVE94Fcif2i7P7wzISv1sU6DUA=
31 github.com/sendgrid/sendgrid-go v3.14.0+incompatible/go.mod h1:QRQt+LX/NmgVEvmdRw0VT/QgUn499+iza2FnDca9fg8=
32-github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8=
33-github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4=
34 github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
35 github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
36 github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
37@@ -289,7 +280,6 @@ github.com/simplesurance/go-ip-anonymizer v0.0.0-20200429124537-35a880f8e87d/go.
38 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
39 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
40 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
41-github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
42 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
43 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
44 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
45@@ -407,12 +397,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
46 google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
47 google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
48 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
49-gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
50 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
51 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
52 gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
53 gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
54-gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
55 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
56 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
57 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+2,
-5
1@@ -259,11 +259,8 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
2 opts.bail(err)
3 return
4 } else if cmd == "sub" {
5- err = pubsub.PubSub.Sub(fmt.Sprintf("%s/%s", user.Name, repoName), &psub.Sub{
6- ID: uuid.NewString(),
7- Writer: sesh,
8- Done: make(chan struct{}),
9- Data: make(chan []byte),
10+ err = pubsub.PubSub.Sub(sesh.Context(), uuid.NewString(), sesh, []*psub.Channel{
11+ psub.NewChannel(fmt.Sprintf("%s/%s", user.Name, repoName)),
12 })
13
14 if err != nil {
+2,
-4
1@@ -230,10 +230,8 @@ func createServeMux(handler *CliHandler, pubsub *psub.Cfg) func(ctx ssh.Context)
2 )
3 handler.Logger.Info("sending event", "url", furl)
4
5- err := pubsub.PubSub.Pub(fmt.Sprintf("%s@%s:%s", user.Name, img, tag), &psub.Pub{
6- ID: uuid.NewString(),
7- Done: make(chan struct{}),
8- Reader: strings.NewReader(furl),
9+ err := pubsub.PubSub.Pub(ctx, uuid.NewString(), bytes.NewBufferString(furl), []*psub.Channel{
10+ psub.NewChannel(fmt.Sprintf("%s/%s:%s", user.Name, img, tag)),
11 })
12
13 if err != nil {
+119,
-77
1@@ -2,6 +2,7 @@ package pubsub
2
3 import (
4 "bytes"
5+ "context"
6 "flag"
7 "fmt"
8 "io"
9@@ -126,43 +127,55 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
10 channelFilter = ""
11 }
12
13- channels := pubsub.PubSub.GetChannels(channelFilter)
14- pipes := pubsub.PubSub.GetPipes(channelFilter)
15+ var channels []*psub.Channel
16
17- if len(channels) == 0 && len(pipes) == 0 {
18- wish.Println(sesh, "no pubsub channels or pipes found")
19+ for channelID, channel := range pubsub.PubSub.GetChannels() {
20+ if strings.HasPrefix(channelID, channelFilter) {
21+ channels = append(channels, channel)
22+ }
23+ }
24+
25+ if len(channels) == 0 {
26+ wish.Println(sesh, "no pubsub channels found")
27 } else {
28 var outputData string
29 if len(channels) > 0 {
30 outputData += "Channel Information\r\n"
31 for _, channel := range channels {
32- outputData += fmt.Sprintf("- %s:\r\n", channel.Name)
33+ outputData += fmt.Sprintf("- %s:\r\n", channel.ID)
34+ outputData += "\tClients:\r\n"
35+
36+ var pubs []*psub.Client
37+ var subs []*psub.Client
38+ var pipes []*psub.Client
39+
40+ for _, client := range channel.GetClients() {
41+ if client.Direction == psub.ChannelDirectionInput {
42+ pubs = append(pubs, client)
43+ } else if client.Direction == psub.ChannelDirectionOutput {
44+ subs = append(subs, client)
45+ } else if client.Direction == psub.ChannelDirectionInputOutput {
46+ pipes = append(pipes, client)
47+ }
48+ }
49+
50 outputData += "\tPubs:\r\n"
51
52- channel.Pubs.Range(func(I string, J *psub.Pub) bool {
53- outputData += fmt.Sprintf("\t- %s:\r\n", I)
54- return true
55- })
56+ for _, pub := range pubs {
57+ outputData += fmt.Sprintf("\t- %s:\r\n", pub.ID)
58+ }
59
60 outputData += "\tSubs:\r\n"
61
62- channel.Subs.Range(func(I string, J *psub.Sub) bool {
63- outputData += fmt.Sprintf("\t- %s:\r\n", I)
64- return true
65- })
66- }
67- }
68+ for _, sub := range subs {
69+ outputData += fmt.Sprintf("\t- %s:\r\n", sub.ID)
70+ }
71
72- if len(pipes) > 0 {
73- outputData += "Pipe Information\r\n"
74- for _, pipe := range pipes {
75- outputData += fmt.Sprintf("- %s:\r\n", pipe.Name)
76- outputData += "\tClients:\r\n"
77+ outputData += "\tPipes:\r\n"
78
79- pipe.Clients.Range(func(I string, J *psub.PipeClient) bool {
80- outputData += fmt.Sprintf("\t- %s:\r\n", I)
81- return true
82- })
83+ for _, pipe := range pipes {
84+ outputData += fmt.Sprintf("\t- %s:\r\n", pipe.ID)
85+ }
86 }
87 }
88
89@@ -185,21 +198,19 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
90 )
91
92 if cmd == "pub" {
93- defaultTimeout, _ := time.ParseDuration("720h")
94-
95 pubCmd := flagSet("pub", sesh)
96 empty := pubCmd.Bool("e", false, "Send an empty message to subs")
97 public := pubCmd.Bool("p", false, "Anyone can sub to this channel")
98- timeout := pubCmd.Duration("t", defaultTimeout, "Timeout as a Go duration before cancelling the pub event. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is 30 days.")
99+ timeout := pubCmd.Duration("t", 30*24*time.Hour, "Timeout as a Go duration before cancelling the pub event. Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Default is 30 days.")
100 if !flagCheck(pubCmd, channelName, cmdArgs) {
101 return
102 }
103
104- var reader io.Reader
105+ var rw io.ReadWriter
106 if *empty {
107- reader = bytes.NewReader(make([]byte, 1))
108+ rw = bytes.NewBuffer(make([]byte, 1))
109 } else {
110- reader = sesh
111+ rw = sesh
112 }
113
114 if channelName == "" {
115@@ -217,42 +228,78 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
116 )
117
118 wish.Println(sesh, "sending msg ...")
119- pub := &psub.Pub{
120- ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
121- Done: make(chan struct{}),
122- Reader: reader,
123- }
124
125 count := 0
126- channelInfo := pubsub.PubSub.GetChannel(name)
127-
128- if channelInfo != nil {
129- channelInfo.Subs.Range(func(I string, J *psub.Sub) bool {
130- count++
131- return true
132- })
133+ for channelID, channel := range pubsub.PubSub.GetChannels() {
134+ if channelID == name {
135+ for _, client := range channel.GetClients() {
136+ if client.Direction == psub.ChannelDirectionOutput || client.Direction == psub.ChannelDirectionInputOutput {
137+ count++
138+ }
139+ }
140+ break
141+ }
142 }
143
144+ var pubCtx context.Context = ctx
145+
146 tt := *timeout
147 if count == 0 {
148- str := "no subs found ... waiting"
149+ termMsg := "no subs found ... waiting"
150 if tt > 0 {
151- str += " " + tt.String()
152+ termMsg += " " + tt.String()
153 }
154- wish.Println(sesh, str)
155- }
156+ wish.Println(sesh, termMsg)
157+
158+ downCtx, cancelFunc := context.WithCancel(ctx)
159+ pubCtx = downCtx
160+
161+ ready := make(chan struct{})
162+
163+ go func() {
164+ for {
165+ select {
166+ case <-ctx.Done():
167+ cancelFunc()
168+ return
169+ default:
170+ count := 0
171+ for channelID, channel := range pubsub.PubSub.GetChannels() {
172+ if channelID == name {
173+ for _, client := range channel.GetClients() {
174+ if client.Direction == psub.ChannelDirectionOutput || client.Direction == psub.ChannelDirectionInputOutput {
175+ count++
176+ }
177+ }
178+ break
179+ }
180+ }
181+
182+ if count > 0 {
183+ close(ready)
184+ return
185+ }
186+ }
187+ }
188+ }()
189
190- go func() {
191 select {
192- case <-ctx.Done():
193- pub.Cleanup()
194+ case <-ready:
195 case <-time.After(tt):
196+ cancelFunc()
197 wish.Fatalln(sesh, "timeout reached, exiting ...")
198- pub.Cleanup()
199 }
200- }()
201+ }
202+
203+ err = pubsub.PubSub.Pub(
204+ pubCtx,
205+ fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
206+ rw,
207+ []*psub.Channel{
208+ psub.NewChannel(name),
209+ },
210+ )
211
212- err = pubsub.PubSub.Pub(name, pub)
213 wish.Println(sesh, "msg sent!")
214 if err != nil {
215 wish.Errorln(sesh, err)
216@@ -270,18 +317,15 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
217 name = toPublicChannel(channelName)
218 }
219
220- sub := &psub.Sub{
221- ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
222- Writer: sesh,
223- Done: make(chan struct{}),
224- Data: make(chan []byte),
225- }
226+ err = pubsub.PubSub.Sub(
227+ ctx,
228+ fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
229+ sesh,
230+ []*psub.Channel{
231+ psub.NewChannel(name),
232+ },
233+ )
234
235- go func() {
236- <-ctx.Done()
237- sub.Cleanup()
238- }()
239- err = pubsub.PubSub.Sub(name, sub)
240 if err != nil {
241 wish.Errorln(sesh, err)
242 }
243@@ -309,24 +353,22 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
244 )
245 }
246
247- pipe := &psub.PipeClient{
248- ID: fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
249- Done: make(chan struct{}),
250- Data: make(chan psub.PipeMessage),
251- ReadWriter: sesh,
252- Replay: *replay,
253- }
254+ readErr, writeErr := pubsub.PubSub.Pipe(
255+ ctx,
256+ fmt.Sprintf("%s (%s@%s)", uuid.NewString(), user.Name, sesh.RemoteAddr().String()),
257+ sesh,
258+ []*psub.Channel{
259+ psub.NewChannel(name),
260+ },
261+ *replay,
262+ )
263
264- go func() {
265- <-ctx.Done()
266- pipe.Cleanup()
267- }()
268- readErr, writeErr := pubsub.PubSub.Pipe(name, pipe)
269 if readErr != nil {
270- wish.Errorln(sesh, readErr)
271+ wish.Errorln(sesh, "error reading from pipe", readErr)
272 }
273+
274 if writeErr != nil {
275- wish.Errorln(sesh, writeErr)
276+ wish.Errorln(sesh, "error writing to pipe", writeErr)
277 }
278 }
279
+4,
-3
1@@ -32,9 +32,10 @@ func StartSshServer() {
2 pubsub := &psub.Cfg{
3 Logger: logger,
4 PubSub: &psub.PubSubMulticast{
5- Logger: logger,
6- Channels: syncmap.New[string, *psub.Channel](),
7- Pipes: syncmap.New[string, *psub.Pipe](),
8+ Logger: logger,
9+ Connector: &psub.BaseConnector{
10+ Channels: syncmap.New[string, *psub.Channel](),
11+ },
12 },
13 }
14