repos / pico

pico services - prose.sh, pastes.sh, imgs.sh, feeds.sh, pgs.sh
git clone https://github.com/picosh/pico.git

commit
5751e64
parent
e05304b
author
Antonio Mika
date
2024-10-03 21:45:33 +0000 UTC
Enable robust log handler
1 files changed,  +139, -78
M shared/sendlog.go
+139, -78
  1@@ -1,6 +1,7 @@
  2 package shared
  3 
  4 import (
  5+	"errors"
  6 	"fmt"
  7 	"io"
  8 	"log/slog"
  9@@ -16,88 +17,182 @@ import (
 10 )
 11 
 12 type SendLogWriter struct {
 13-	SSHClient *ssh.Client
 14-	Session   *ssh.Session
 15-	StdinPipe io.Writer
 16-	Done      chan struct{}
 17-	Messages  chan []byte
 18-	Timeout   time.Duration
 19-	closeOnce sync.Once
 20-	startOnce sync.Once
 21+	SSHClient  *ssh.Client
 22+	Session    *ssh.Session
 23+	StdinPipe  io.WriteCloser
 24+	Done       chan struct{}
 25+	Messages   chan []byte
 26+	Timeout    time.Duration
 27+	BufferSize int
 28+	closeOnce  sync.Once
 29+	startOnce  sync.Once
 30+	connecMu   sync.Mutex
 31+}
 32+
 33+func (c *SendLogWriter) Close() error {
 34+	c.connecMu.Lock()
 35+	defer c.connecMu.Unlock()
 36+
 37+	if c.Done != nil {
 38+		close(c.Done)
 39+	}
 40+
 41+	if c.Messages != nil {
 42+		close(c.Messages)
 43+	}
 44+
 45+	var errs []error
 46+
 47+	if c.StdinPipe != nil {
 48+		errs = append(errs, c.StdinPipe.Close())
 49+	}
 50+
 51+	if c.Session != nil {
 52+		errs = append(errs, c.Session.Close())
 53+	}
 54+
 55+	if c.SSHClient != nil {
 56+		errs = append(errs, c.SSHClient.Close())
 57+	}
 58+
 59+	return errors.Join(errs...)
 60+}
 61+
 62+func (c *SendLogWriter) Open() error {
 63+	c.Close()
 64+
 65+	c.connecMu.Lock()
 66+
 67+	c.Done = make(chan struct{})
 68+	c.Messages = make(chan []byte, c.BufferSize)
 69+
 70+	sshClient, err := createSSHClient(
 71+		"send.pico.sh:22",
 72+		"ssh_data/term_info_ed25519",
 73+		"",
 74+		"send.pico.sh",
 75+		"pico",
 76+	)
 77+	if err != nil {
 78+		c.connecMu.Unlock()
 79+		return err
 80+	}
 81+
 82+	session, err := sshClient.NewSession()
 83+	if err != nil {
 84+		c.connecMu.Unlock()
 85+		return err
 86+	}
 87+
 88+	stdinPipe, err := session.StdinPipe()
 89+	if err != nil {
 90+		c.connecMu.Unlock()
 91+		return err
 92+	}
 93+
 94+	err = session.Start("pub log-sink -b=false")
 95+	if err != nil {
 96+		c.connecMu.Unlock()
 97+		return err
 98+	}
 99+
100+	c.SSHClient = sshClient
101+	c.Session = session
102+	c.StdinPipe = stdinPipe
103+
104+	c.closeOnce = sync.Once{}
105+	c.startOnce = sync.Once{}
106+
107+	c.connecMu.Unlock()
108+
109+	c.Start()
110+
111+	return nil
112+}
113+
114+func (c *SendLogWriter) Start() {
115+	go func() {
116+		defer c.Reconnect()
117+
118+		for {
119+			select {
120+			case data, ok := <-c.Messages:
121+				_, err := c.StdinPipe.Write(data)
122+				if !ok || err != nil {
123+					slog.Error("received error on write, reopening logger", "error", err)
124+					return
125+				}
126+			case <-c.Done:
127+				return
128+			}
129+		}
130+	}()
131 }
132 
133 func (c *SendLogWriter) Write(data []byte) (int, error) {
134+	c.connecMu.Lock()
135+	defer c.connecMu.Unlock()
136+
137 	var (
138 		n   int
139 		err error
140 	)
141 
142+	if c.Messages == nil || c.Done == nil {
143+		return n, fmt.Errorf("logger not viable")
144+	}
145+
146 	select {
147 	case c.Messages <- data:
148 		n = len(data)
149 	case <-time.After(c.Timeout):
150 		err = fmt.Errorf("unable to send data within timeout")
151+	case <-c.Done:
152+		break
153 	}
154 
155 	return n, err
156 }
157 
158-func (c *SendLogWriter) Open() {
159+func (c *SendLogWriter) Reconnect() {
160 	go func() {
161 		for {
162-			select {
163-			case data := <-c.Messages:
164-				_, err := c.StdinPipe.Write(data)
165-				if err != nil {
166-					slog.Info("received error on write", "error", err)
167-				}
168-			case <-c.Done:
169+			err := c.Open()
170+			if err != nil {
171+				slog.Error("unable to open send logger. retrying in 10 seconds", "error", err)
172+			} else {
173 				return
174 			}
175+
176+			<-time.After(10 * time.Second)
177 		}
178 	}()
179 }
180 
181-func createSSHClient(remoteHost string, keyLocation string, keyPassphrase string, remoteHostname string, remoteUser string) *ssh.Client {
182+func createSSHClient(remoteHost string, keyLocation string, keyPassphrase string, remoteHostname string, remoteUser string) (*ssh.Client, error) {
183 	if !strings.Contains(remoteHost, ":") {
184 		remoteHost += ":22"
185 	}
186 
187 	rawConn, err := net.Dial("tcp", remoteHost)
188 	if err != nil {
189-		slog.Error(
190-			"Unable to create ssh client, tcp connection not established",
191-			slog.Any("error", err),
192-		)
193-		panic(err)
194+		return nil, err
195 	}
196 
197 	keyPath, err := filepath.Abs(keyLocation)
198 	if err != nil {
199-		slog.Error(
200-			"Unable to create ssh client, cannot find key file",
201-			slog.Any("error", err),
202-		)
203-		panic(err)
204+		return nil, err
205 	}
206 
207 	f, err := os.Open(keyPath)
208 	if err != nil {
209-		slog.Error(
210-			"Unable to create ssh client, unable to open key",
211-			slog.Any("error", err),
212-		)
213-		panic(err)
214+		return nil, err
215 	}
216 	defer f.Close()
217 
218 	data, err := io.ReadAll(f)
219 	if err != nil {
220-		slog.Error(
221-			"Unable to create ssh client, unable to read key",
222-			slog.Any("error", err),
223-		)
224-		panic(err)
225+		return nil, err
226 	}
227 
228 	var signer ssh.Signer
229@@ -109,11 +204,7 @@ func createSSHClient(remoteHost string, keyLocation string, keyPassphrase string
230 	}
231 
232 	if err != nil {
233-		slog.Error(
234-			"Unable to create ssh client, unable to parse key",
235-			slog.Any("error", err),
236-		)
237-		panic(err)
238+		return nil, err
239 	}
240 
241 	sshConn, chans, reqs, err := ssh.NewClientConn(rawConn, remoteHostname, &ssh.ClientConfig{
242@@ -121,17 +212,14 @@ func createSSHClient(remoteHost string, keyLocation string, keyPassphrase string
243 		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
244 		User:            remoteUser,
245 	})
246+
247 	if err != nil {
248-		slog.Error(
249-			"Unable to create ssh client, unable to create client conn",
250-			slog.Any("error", err),
251-		)
252-		panic(err)
253+		return nil, err
254 	}
255 
256 	sshClient := ssh.NewClient(sshConn, chans, reqs)
257 
258-	return sshClient
259+	return sshClient, nil
260 }
261 
262 func SendLogRegister(logger *slog.Logger, buffer int) (*slog.Logger, error) {
263@@ -141,39 +229,12 @@ func SendLogRegister(logger *slog.Logger, buffer int) (*slog.Logger, error) {
264 
265 	currentHandler := logger.Handler()
266 
267-	sshClient := createSSHClient(
268-		"send.pico.sh:22",
269-		os.Getenv("SSH_KEY"),
270-		os.Getenv("SSH_PASSPHRASE"),
271-		"send.pico.sh",
272-		os.Getenv("SSH_USER"),
273-	)
274-
275-	sesh, err := sshClient.NewSession()
276-	if err != nil {
277-		return logger, nil
278-	}
279-
280-	stdinPipe, err := sesh.StdinPipe()
281-	if err != nil {
282-		return logger, nil
283-	}
284-
285-	err = sesh.Start("pub log-sink -b=false")
286-	if err != nil {
287-		return logger, nil
288-	}
289-
290 	logWriter := &SendLogWriter{
291-		SSHClient: sshClient,
292-		Session:   sesh,
293-		StdinPipe: stdinPipe,
294-		Done:      make(chan struct{}),
295-		Messages:  make(chan []byte, buffer),
296-		Timeout:   10 * time.Millisecond,
297+		Timeout:    10 * time.Millisecond,
298+		BufferSize: buffer,
299 	}
300 
301-	logWriter.Open()
302+	logWriter.Reconnect()
303 
304 	return slog.New(
305 		slogmulti.Fanout(