- commit
- 63a91e3
- parent
- af3ceae
- author
- Antonio Mika
- date
- 2024-10-04 01:52:17 +0000 UTC
Merge pull request #143 from picosh/am/send-log-sink Setup a send log sink
5 files changed,
+408,
-11
+19,
-0
1@@ -76,6 +76,25 @@ jobs:
2 platforms: ${{ env.PLATFORMS }}
3 registry: ${{ env.REGISTRY }}
4 ssh: false
5+ build-pico:
6+ runs-on: ubuntu-22.04
7+ needs: test
8+ steps:
9+ - name: Checkout repo
10+ uses: actions/checkout@v3
11+ - name: Setup docker
12+ uses: ./.github/actions/setup
13+ with:
14+ registry: ${{ env.REGISTRY }}
15+ username: ${{ github.actor }}
16+ password: ${{ secrets.GITHUB_TOKEN }}
17+ - name: Run docker build for pico
18+ uses: ./.github/actions/build
19+ with:
20+ app: pico
21+ platforms: ${{ env.PLATFORMS }}
22+ registry: ${{ env.REGISTRY }}
23+ web: false
24 build-bouncer:
25 runs-on: ubuntu-22.04
26 needs: test
+10,
-0
1@@ -74,6 +74,8 @@ services:
2 - web
3 env_file:
4 - .env.prod
5+ volumes:
6+ - ./data/pastes-ssh/data:/app/ssh_data
7 pastes-ssh:
8 networks:
9 pastes:
10@@ -116,6 +118,8 @@ services:
11 - web
12 env_file:
13 - .env.prod
14+ volumes:
15+ - ./data/pubsub-ssh/data:/app/ssh_data
16 pubsub-ssh:
17 networks:
18 pubsub:
19@@ -158,6 +162,8 @@ services:
20 - web
21 env_file:
22 - .env.prod
23+ volumes:
24+ - ./data/prose-ssh/data:/app/ssh_data
25 prose-ssh:
26 networks:
27 prose:
28@@ -202,6 +208,7 @@ services:
29 - .env.prod
30 volumes:
31 - ./data/storage/data:/app/.storage
32+ - ./data/imgs-ssh/data:/app/ssh_data
33 imgs-ssh:
34 networks:
35 imgs:
36@@ -248,6 +255,7 @@ services:
37 - .env.prod
38 volumes:
39 - ./data/storage/data:/app/.storage
40+ - ./data/pgs-ssh/data:/app/ssh_data
41 pgs-ssh:
42 networks:
43 pgs:
44@@ -296,6 +304,8 @@ services:
45 - web
46 env_file:
47 - .env.prod
48+ volumes:
49+ - ./data/feeds-ssh/data:/app/ssh_data
50 feeds-ssh:
51 networks:
52 feeds:
+34,
-10
1@@ -148,6 +148,8 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
2 return
3 }
4
5+ isAdmin := handler.DBPool.HasFeatureForUser(user.ID, "admin")
6+
7 cmd := strings.TrimSpace(args[0])
8 if cmd == "help" {
9 wish.Println(sesh, helpStr(toSshCmd(handler.Cfg)))
10@@ -155,7 +157,7 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
11 return
12 } else if cmd == "ls" {
13 topicFilter := fmt.Sprintf("%s/", user.Name)
14- if handler.DBPool.HasFeatureForUser(user.ID, "admin") {
15+ if isAdmin {
16 topicFilter = ""
17 }
18
19@@ -252,10 +254,18 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
20 if topic == "" {
21 topic = uuid.NewString()
22 }
23- name := toTopic(user.Name, topic)
24- if *public {
25- name = toPublicTopic(topic)
26+
27+ var name string
28+
29+ if isAdmin && strings.HasPrefix(topic, "/") {
30+ name = strings.TrimPrefix(topic, "/")
31+ } else {
32+ name = toTopic(user.Name, topic)
33+ if *public {
34+ name = toPublicTopic(topic)
35+ }
36 }
37+
38 wish.Printf(
39 sesh,
40 "subscribe to this channel:\n\tssh %s sub %s\n",
41@@ -363,9 +373,15 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
42 "topic", topic,
43 )
44
45- name := toTopic(user.Name, topic)
46- if *public {
47- name = toPublicTopic(topic)
48+ var name string
49+
50+ if isAdmin && strings.HasPrefix(topic, "/") {
51+ name = strings.TrimPrefix(topic, "/")
52+ } else {
53+ name = toTopic(user.Name, topic)
54+ if *public {
55+ name = toPublicTopic(topic)
56+ }
57 }
58
59 err = pubsub.Sub(
60@@ -405,10 +421,18 @@ func WishMiddleware(handler *CliHandler) wish.Middleware {
61 if isCreator {
62 topic = uuid.NewString()
63 }
64- name := toTopic(user.Name, topic)
65- if *public {
66- name = toPublicTopic(topic)
67+
68+ var name string
69+
70+ if isAdmin && strings.HasPrefix(topic, "/") {
71+ name = strings.TrimPrefix(topic, "/")
72+ } else {
73+ name = toTopic(user.Name, topic)
74+ if *public {
75+ name = toPublicTopic(topic)
76+ }
77 }
78+
79 if isCreator {
80 wish.Printf(
81 sesh,
1@@ -271,7 +271,19 @@ func CreateLogger(space string) *slog.Logger {
2 log := slog.New(
3 slog.NewTextHandler(os.Stdout, opts),
4 )
5- return log.With("service", space)
6+
7+ newLogger := log
8+
9+ if strings.ToLower(GetEnv("PICO_SENDLOG_ENABLED", "true")) == "true" {
10+ newLog, err := SendLogRegister(log, 100)
11+ if err == nil {
12+ newLogger = newLog
13+ } else {
14+ slog.Error("unable to start send logger", "error", err)
15+ }
16+ }
17+
18+ return newLogger.With("service", space)
19 }
20
21 func LoggerWithUser(logger *slog.Logger, user *db.User) *slog.Logger {
1@@ -0,0 +1,332 @@
2+package shared
3+
4+import (
5+ "context"
6+ "errors"
7+ "fmt"
8+ "io"
9+ "log/slog"
10+ "net"
11+ "os"
12+ "path/filepath"
13+ "slices"
14+ "strings"
15+ "sync"
16+ "time"
17+
18+ "golang.org/x/crypto/ssh"
19+)
20+
21+type MultiHandler struct {
22+ Handlers []slog.Handler
23+ mu sync.Mutex
24+}
25+
26+func (m *MultiHandler) Enabled(ctx context.Context, l slog.Level) bool {
27+ m.mu.Lock()
28+ defer m.mu.Unlock()
29+
30+ for _, h := range m.Handlers {
31+ if h.Enabled(ctx, l) {
32+ return true
33+ }
34+ }
35+
36+ return false
37+}
38+
39+func (m *MultiHandler) Handle(ctx context.Context, r slog.Record) error {
40+ m.mu.Lock()
41+ defer m.mu.Unlock()
42+
43+ var errs []error
44+ for _, h := range m.Handlers {
45+ if h.Enabled(ctx, r.Level) {
46+ errs = append(errs, h.Handle(ctx, r.Clone()))
47+ }
48+ }
49+
50+ return errors.Join(errs...)
51+}
52+
53+func (m *MultiHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
54+ m.mu.Lock()
55+ defer m.mu.Unlock()
56+
57+ var handlers []slog.Handler
58+
59+ for _, h := range m.Handlers {
60+ handlers = append(handlers, h.WithAttrs(slices.Clone(attrs)))
61+ }
62+
63+ return &MultiHandler{
64+ Handlers: handlers,
65+ }
66+}
67+
68+func (m *MultiHandler) WithGroup(name string) slog.Handler {
69+ if name == "" {
70+ return m
71+ }
72+
73+ m.mu.Lock()
74+ defer m.mu.Unlock()
75+
76+ var handlers []slog.Handler
77+
78+ for _, h := range m.Handlers {
79+ handlers = append(handlers, h.WithGroup(name))
80+ }
81+
82+ return &MultiHandler{
83+ Handlers: handlers,
84+ }
85+}
86+
87+type SendLogWriter struct {
88+ SSHClient *ssh.Client
89+ Session *ssh.Session
90+ StdinPipe io.WriteCloser
91+ Done chan struct{}
92+ Messages chan []byte
93+ Timeout time.Duration
94+ BufferSize int
95+ closeOnce sync.Once
96+ closeMessageOnce sync.Once
97+ startOnce sync.Once
98+ connecMu sync.Mutex
99+}
100+
101+func (c *SendLogWriter) Close() error {
102+ c.connecMu.Lock()
103+ defer c.connecMu.Unlock()
104+
105+ if c.Done != nil {
106+ c.closeOnce.Do(func() {
107+ close(c.Done)
108+ })
109+ }
110+
111+ if c.Messages != nil {
112+ c.closeMessageOnce.Do(func() {
113+ close(c.Messages)
114+ })
115+ }
116+
117+ var errs []error
118+
119+ if c.StdinPipe != nil {
120+ errs = append(errs, c.StdinPipe.Close())
121+ }
122+
123+ if c.Session != nil {
124+ errs = append(errs, c.Session.Close())
125+ }
126+
127+ if c.SSHClient != nil {
128+ errs = append(errs, c.SSHClient.Close())
129+ }
130+
131+ return errors.Join(errs...)
132+}
133+
134+func (c *SendLogWriter) Open() error {
135+ c.Close()
136+
137+ c.connecMu.Lock()
138+
139+ c.Done = make(chan struct{})
140+ c.Messages = make(chan []byte, c.BufferSize)
141+
142+ sshClient, err := createSSHClient(
143+ GetEnv("PICO_SENDLOG_ENDPOINT", "send.pico.sh:22"),
144+ GetEnv("PICO_SENDLOG_KEY", "ssh_data/term_info_ed25519"),
145+ GetEnv("PICO_SENDLOG_PASSPHRASE", ""),
146+ GetEnv("PICO_SENDLOG_REMOTE_HOST", "send.pico.sh"),
147+ GetEnv("PICO_SENDLOG_USER", "pico"),
148+ )
149+ if err != nil {
150+ c.connecMu.Unlock()
151+ return err
152+ }
153+
154+ session, err := sshClient.NewSession()
155+ if err != nil {
156+ c.connecMu.Unlock()
157+ return err
158+ }
159+
160+ stdinPipe, err := session.StdinPipe()
161+ if err != nil {
162+ c.connecMu.Unlock()
163+ return err
164+ }
165+
166+ err = session.Start("pub log-sink -b=false")
167+ if err != nil {
168+ c.connecMu.Unlock()
169+ return err
170+ }
171+
172+ c.SSHClient = sshClient
173+ c.Session = session
174+ c.StdinPipe = stdinPipe
175+
176+ c.closeOnce = sync.Once{}
177+ c.startOnce = sync.Once{}
178+
179+ c.connecMu.Unlock()
180+
181+ c.Start()
182+
183+ return nil
184+}
185+
186+func (c *SendLogWriter) Start() {
187+ c.startOnce.Do(func() {
188+ go func() {
189+ defer c.Reconnect()
190+
191+ for {
192+ select {
193+ case data, ok := <-c.Messages:
194+ _, err := c.StdinPipe.Write(data)
195+ if !ok || err != nil {
196+ slog.Error("received error on write, reopening logger", "error", err)
197+ return
198+ }
199+ case <-c.Done:
200+ return
201+ }
202+ }
203+ }()
204+ })
205+}
206+
207+func (c *SendLogWriter) Write(data []byte) (int, error) {
208+ var (
209+ n int
210+ err error
211+ )
212+
213+ ok := c.connecMu.TryLock()
214+
215+ if !ok {
216+ return n, fmt.Errorf("unable to acquire lock to write")
217+ }
218+
219+ defer c.connecMu.Unlock()
220+
221+ if c.Messages == nil || c.Done == nil {
222+ return n, fmt.Errorf("logger not viable")
223+ }
224+
225+ select {
226+ case c.Messages <- data:
227+ n = len(data)
228+ case <-time.After(c.Timeout):
229+ err = fmt.Errorf("unable to send data within timeout")
230+ case <-c.Done:
231+ break
232+ }
233+
234+ return n, err
235+}
236+
237+func (c *SendLogWriter) Reconnect() {
238+ go func() {
239+ for {
240+ err := c.Open()
241+ if err != nil {
242+ slog.Error("unable to open send logger. retrying in 10 seconds", "error", err)
243+ } else {
244+ return
245+ }
246+
247+ <-time.After(10 * time.Second)
248+ }
249+ }()
250+}
251+
252+func createSSHClient(remoteHost string, keyLocation string, keyPassphrase string, remoteHostname string, remoteUser string) (*ssh.Client, error) {
253+ if !strings.Contains(remoteHost, ":") {
254+ remoteHost += ":22"
255+ }
256+
257+ rawConn, err := net.Dial("tcp", remoteHost)
258+ if err != nil {
259+ return nil, err
260+ }
261+
262+ keyPath, err := filepath.Abs(keyLocation)
263+ if err != nil {
264+ return nil, err
265+ }
266+
267+ f, err := os.Open(keyPath)
268+ if err != nil {
269+ return nil, err
270+ }
271+ defer f.Close()
272+
273+ data, err := io.ReadAll(f)
274+ if err != nil {
275+ return nil, err
276+ }
277+
278+ var signer ssh.Signer
279+
280+ if keyPassphrase != "" {
281+ signer, err = ssh.ParsePrivateKeyWithPassphrase(data, []byte(keyPassphrase))
282+ } else {
283+ signer, err = ssh.ParsePrivateKey(data)
284+ }
285+
286+ if err != nil {
287+ return nil, err
288+ }
289+
290+ sshConn, chans, reqs, err := ssh.NewClientConn(rawConn, remoteHostname, &ssh.ClientConfig{
291+ Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
292+ HostKeyCallback: ssh.InsecureIgnoreHostKey(),
293+ User: remoteUser,
294+ })
295+
296+ if err != nil {
297+ return nil, err
298+ }
299+
300+ sshClient := ssh.NewClient(sshConn, chans, reqs)
301+
302+ return sshClient, nil
303+}
304+
305+func SendLogRegister(logger *slog.Logger, buffer int) (*slog.Logger, error) {
306+ if buffer < 0 {
307+ buffer = 0
308+ }
309+
310+ currentHandler := logger.Handler()
311+
312+ logWriter := &SendLogWriter{
313+ Timeout: 10 * time.Millisecond,
314+ BufferSize: buffer,
315+ }
316+
317+ logWriter.Reconnect()
318+
319+ return slog.New(
320+ &MultiHandler{
321+ Handlers: []slog.Handler{
322+ currentHandler,
323+ slog.NewJSONHandler(logWriter, &slog.HandlerOptions{
324+ AddSource: true,
325+ Level: slog.LevelDebug,
326+ }),
327+ },
328+ },
329+ ), nil
330+}
331+
332+var _ io.Writer = (*SendLogWriter)(nil)
333+var _ slog.Handler = (*MultiHandler)(nil)