- commit
- 416b082
- parent
- 0492352
- author
- Antonio Mika
- date
- 2024-09-10 22:58:12 +0000 UTC
Alternate pubsub implementation
7 files changed,
+60,
-65
+1,
-1
1@@ -57,7 +57,7 @@ services:
2 pubsub-ssh:
3 build:
4 args:
5- APP:pubsub
6+ APP: pubsub
7 target: release-ssh
8 env_file:
9 - .env.example
M
go.mod
+2,
-1
1@@ -17,6 +17,7 @@ replace github.com/gdamore/tcell/v2 => github.com/delthas/tcell/v2 v2.4.1-0.2023
2 require (
3 git.sr.ht/~delthas/senpai v0.3.1-0.20240425235039-206be659439e
4 github.com/alecthomas/chroma/v2 v2.14.0
5+ github.com/antoniomika/syncmap v1.0.0
6 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
7 github.com/charmbracelet/bubbles v0.18.0
8 github.com/charmbracelet/bubbletea v0.27.0
9@@ -36,7 +37,7 @@ require (
10 github.com/muesli/termenv v0.15.3-0.20240509142007-81b8f94111d5
11 github.com/neurosnap/go-exif-remove v0.0.0-20221010134343-50d1e3c35577
12 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a
13- github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b
14+ github.com/picosh/pubsub v0.0.0-20240910225407-529d97896161
15 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc
16 github.com/picosh/tunkit v0.0.0-20240709033345-8315d4f3cd0e
17 github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06
M
go.sum
+4,
-0
1@@ -17,6 +17,8 @@ github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsVi
2 github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU=
3 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
4 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
5+github.com/antoniomika/syncmap v1.0.0 h1:iFSfbQFQOvHZILFZF+hqWosO0no+W9+uF4y2VEyMKWU=
6+github.com/antoniomika/syncmap v1.0.0/go.mod h1:fK2829foEYnO4riNfyUn0SHQZt4ue3DStYjGU+sJj38=
7 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhPwqqXc4/vE0f7GvRjuAsbW+HOIe8KnA=
8 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
9 github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
10@@ -226,6 +228,8 @@ github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a h1:Cr1xODiyd/SjjBRtYA9
11 github.com/picosh/pobj v0.0.0-20240709135546-27097077b26a/go.mod h1:VIkR1MZBvxSK2OO47jikxikAO/sKb/vTmXX5ZuYTIvo=
12 github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b h1:/gGhT8y9rnrv8K9ZJKZYzdWvZcnazl8NGE1DGNrD8HU=
13 github.com/picosh/pubsub v0.0.0-20240909042445-92777a8b167b/go.mod h1:FKC8uot+40iXmuDzTfbxYDG5PIc3ghwkmP2iItBKH0I=
14+github.com/picosh/pubsub v0.0.0-20240910225407-529d97896161 h1:XKp88wHvv7YQXl4/BfnRCOLmcONTrJK2rE7XMr3gpdw=
15+github.com/picosh/pubsub v0.0.0-20240910225407-529d97896161/go.mod h1:vyHLOwIkdaBW+Wmc+3/yRzdnmKwv/oVnKtITHe46w58=
16 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc h1:IIsJuAFG2ju3cygKVKTIsYYZf21q5S3Dr1H4fGbfgJg=
17 github.com/picosh/send v0.0.0-20240820031602-5d3b1a4494cc/go.mod h1:RAgLDK3LrDK6pNeXtU9tjo28obl5DxShcTUk2nm/KCM=
18 github.com/picosh/senpai v0.0.0-20240503200611-af89e73973b0 h1:pBRIbiCj7K6rGELijb//dYhyCo8A3fvxW5dijrJVtjs=
+4,
-3
1@@ -259,12 +259,13 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
2 opts.bail(err)
3 return
4 } else if cmd == "sub" {
5- err = pubsub.PubSub.Sub(&psub.Subscriber{
6+ err = pubsub.PubSub.Sub(fmt.Sprintf("%s@%s", user.Name, repoName), &psub.Sub{
7 ID: uuid.NewString(),
8- Name: fmt.Sprintf("%s@%s", user.Name, repoName),
9 Writer: sesh,
10- Chan: make(chan error),
11+ Done: make(chan struct{}),
12+ Data: make(chan []byte),
13 })
14+
15 if err != nil {
16 wish.Errorln(sesh, err)
17 }
+4,
-2
1@@ -20,6 +20,7 @@ import (
2
3 "github.com/charmbracelet/ssh"
4 "github.com/charmbracelet/wish"
5+ "github.com/google/uuid"
6 "github.com/picosh/pico/db"
7 "github.com/picosh/pico/db/postgres"
8 "github.com/picosh/pico/shared"
9@@ -229,8 +230,9 @@ func createServeMux(handler *CliHandler, pubsub *psub.Cfg) func(ctx ssh.Context)
10 )
11 handler.Logger.Info("sending event", "url", furl)
12
13- err := pubsub.PubSub.Pub(&psub.Msg{
14- Name: fmt.Sprintf("%s@%s:%s", user.Name, img, tag),
15+ err := pubsub.PubSub.Pub(fmt.Sprintf("%s@%s:%s", user.Name, img, tag), &psub.Pub{
16+ ID: uuid.NewString(),
17+ Done: make(chan struct{}),
18 Reader: strings.NewReader(furl),
19 })
20
+42,
-56
1@@ -1,6 +1,7 @@
2 package pubsub
3
4 import (
5+ "bytes"
6 "flag"
7 "fmt"
8 "io"
9@@ -66,19 +67,6 @@ func toPublicChannel(name string) string {
10 return fmt.Sprintf("public@%s", name)
11 }
12
13-// extract user and scoped channel from channel.
14-func fromChannel(channel string) (string, string) {
15- sp := strings.SplitN(channel, "@", 2)
16- ln := len(sp)
17- if ln == 0 {
18- return "", ""
19- }
20- if ln == 1 {
21- return "", ""
22- }
23- return sp[0], sp[1]
24-}
25-
26 var helpStr = `Commands: [pub, sub, ls]
27
28 The simplest authenticated pubsub system. Send messages through
29@@ -123,26 +111,29 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
30 if cmd == "help" {
31 wish.Println(sesh, helpStr)
32 } else if cmd == "ls" {
33- subs := pubsub.PubSub.GetSubs()
34+ channels := pubsub.PubSub.GetChannels(fmt.Sprintf("%s@", user.Name))
35
36- if len(subs) == 0 {
37- wish.Println(sesh, "no subs found")
38+ if len(channels) == 0 {
39+ wish.Println(sesh, "no pubsub channels found")
40 } else {
41- writer := NewTabWriter(sesh)
42- fmt.Fprintln(writer, "Channel\tID")
43- for _, sub := range subs {
44- userName, _ := fromChannel(sub.Name)
45- if userName != "public" && userName != user.Name {
46- continue
47- }
48-
49- fmt.Fprintf(
50- writer,
51- "%s\t%s\n",
52- sub.Name, sub.ID,
53- )
54+ outputData := "Channel Information\n"
55+ for _, channel := range channels {
56+ outputData += fmt.Sprintf("- %s:\n", channel.Name)
57+ outputData += "\tPubs:\n"
58+
59+ channel.Pubs.Range(func(I string, J *psub.Pub) bool {
60+ outputData += fmt.Sprintf("\t- %s:\n", I)
61+ return true
62+ })
63+
64+ outputData += "\tSubs:\n"
65+
66+ channel.Subs.Range(func(I string, J *psub.Sub) bool {
67+ outputData += fmt.Sprintf("\t- %s:\n", I)
68+ return true
69+ })
70 }
71- writer.Flush()
72+ sesh.Write([]byte(outputData))
73 }
74 }
75 next(sesh)
76@@ -170,7 +161,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
77
78 var reader io.Reader
79 if *empty {
80- reader = strings.NewReader("")
81+ reader = bytes.NewReader(make([]byte, 1))
82 } else {
83 reader = sesh
84 }
85@@ -181,34 +172,32 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
86 }
87
88 wish.Println(sesh, "sending msg ...")
89- msg := &psub.Msg{
90- Name: name,
91+ pub := &psub.Pub{
92+ ID: uuid.NewString(),
93+ Done: make(chan struct{}),
94 Reader: reader,
95 }
96
97- // hacky: we want to notify when no subs are found so
98- // we duplicate some logic for now
99- subs := pubsub.PubSub.GetSubs()
100- found := false
101- for _, sub := range subs {
102- if pubsub.PubSub.PubMatcher(msg, sub) {
103- found = true
104- break
105- }
106+ count := 0
107+ channelInfo := pubsub.PubSub.GetChannel(name)
108+
109+ if channelInfo != nil {
110+ channelInfo.Subs.Range(func(I string, J *psub.Sub) bool {
111+ count++
112+ return true
113+ })
114 }
115- if !found {
116+
117+ if count == 0 {
118 wish.Println(sesh, "no subs found ... waiting")
119 }
120
121 go func() {
122 <-ctx.Done()
123- err := pubsub.PubSub.UnPub(msg)
124- if err != nil {
125- wish.Errorln(sesh, err)
126- }
127+ pub.Cleanup()
128 }()
129
130- err = pubsub.PubSub.Pub(msg)
131+ err = pubsub.PubSub.Pub(name, pub)
132 wish.Println(sesh, "msg sent!")
133 if err != nil {
134 wish.Errorln(sesh, err)
135@@ -226,21 +215,18 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
136 name = toPublicChannel(channelName)
137 }
138
139- sub := &psub.Subscriber{
140+ sub := &psub.Sub{
141 ID: uuid.NewString(),
142- Name: name,
143 Writer: sesh,
144- Chan: make(chan error),
145+ Done: make(chan struct{}),
146+ Data: make(chan []byte),
147 }
148
149 go func() {
150 <-ctx.Done()
151- err := pubsub.PubSub.UnSub(sub)
152- if err != nil {
153- wish.Errorln(sesh, err)
154- }
155+ sub.Cleanup()
156 }()
157- err = pubsub.PubSub.Sub(sub)
158+ err = pubsub.PubSub.Sub(name, sub)
159 if err != nil {
160 wish.Errorln(sesh, err)
161 }
+3,
-2
1@@ -8,6 +8,7 @@ import (
2 "syscall"
3 "time"
4
5+ "github.com/antoniomika/syncmap"
6 "github.com/charmbracelet/promwish"
7 "github.com/charmbracelet/wish"
8 "github.com/picosh/pico/db/postgres"
9@@ -29,8 +30,8 @@ func StartSshServer() {
10 pubsub := &psub.Cfg{
11 Logger: logger,
12 PubSub: &psub.PubSubMulticast{
13- Logger: logger,
14- Chan: make(chan *psub.Subscriber),
15+ Logger: logger,
16+ Channels: syncmap.New[string, *psub.Channel](),
17 },
18 }
19