- commit
- e516cfd
- parent
- 61caf14
- author
- Antonio Mika
- date
- 2023-11-10 14:55:45 +0000 UTC
More work to properly support rsync
16 files changed,
+133,
-51
+17,
-3
1@@ -5,12 +5,26 @@
2 "version": "0.2.0",
3 "configurations": [
4 {
5- "name": "Launch Package",
6+ "name": "Debug App",
7 "type": "go",
8 "request": "launch",
9 "mode": "auto",
10- "program": "${workspaceFolder}/cmd/pgs/ssh/main.go",
11+ "program": "${workspaceFolder}/cmd/${input:service}/${input:type}/main.go",
12 "envFile": "${workspaceFolder}/.env"
13 }
14+ ],
15+ "inputs": [
16+ {
17+ "id": "service",
18+ "type": "promptString",
19+ "description": "The service to debug",
20+ "default": "pgs"
21+ },
22+ {
23+ "id": "type",
24+ "type": "promptString",
25+ "description": "The service type to debug",
26+ "default": "ssh"
27+ }
28 ]
29-}
30+}
+2,
-1
1@@ -43,7 +43,7 @@ func main() {
2 continue
3 }
4
5- reader, _, err := st.GetFile(bucket, post.Filename)
6+ reader, _, _, err := st.GetFile(bucket, post.Filename)
7 if err != nil {
8 cfg.Logger.Infof("file not found %s/%s", post.UserID, post.Filename)
9 continue
10@@ -69,6 +69,7 @@ func main() {
11 bucket,
12 fmt.Sprintf("%s.webp", shared.SanitizeFileExt(post.Filename)),
13 utils.NopReaderAtCloser(webpReader),
14+ &utils.FileEntry{},
15 )
16 if err != nil {
17 cfg.Logger.Error(err)
+3,
-0
1@@ -72,16 +72,19 @@ func (h *UploadAssetHandler) writeAsset(data *FileData) error {
2 }
3 } else {
4 reader := bytes.NewReader(data.Text)
5+
6 h.Cfg.Logger.Infof(
7 "(%s) uploading to (bucket: %s) (%s)",
8 data.User.Name,
9 data.Bucket.Name,
10 assetFilename,
11 )
12+
13 _, err := h.Storage.PutFile(
14 data.Bucket,
15 assetFilename,
16 utils.NopReaderAtCloser(reader),
17+ data.FileEntry,
18 )
19 if err != nil {
20 return err
+7,
-1
1@@ -15,6 +15,7 @@ import (
2 "github.com/picosh/pico/shared/storage"
3 "github.com/picosh/pico/wish/cms/util"
4 "github.com/picosh/pico/wish/send/utils"
5+ "go.uber.org/zap"
6 )
7
8 type ctxUserKey struct{}
9@@ -73,6 +74,10 @@ func NewUploadAssetHandler(dbpool db.DB, cfg *shared.ConfigSite, storage storage
10 }
11 }
12
13+func (h *UploadAssetHandler) GetLogger() *zap.SugaredLogger {
14+ return h.Cfg.Logger
15+}
16+
17 func (h *UploadAssetHandler) Read(s ssh.Session, entry *utils.FileEntry) (os.FileInfo, utils.ReaderAtCloser, error) {
18 user, err := getUser(s)
19 if err != nil {
20@@ -92,12 +97,13 @@ func (h *UploadAssetHandler) Read(s ssh.Session, entry *utils.FileEntry) (os.Fil
21 }
22
23 fname := shared.GetAssetFileName(entry)
24- contents, size, err := h.Storage.GetFile(bucket, fname)
25+ contents, size, modTime, err := h.Storage.GetFile(bucket, fname)
26 if err != nil {
27 return nil, nil, err
28 }
29
30 fileInfo.FSize = size
31+ fileInfo.FModTime = modTime
32
33 reader := utils.NewAllReaderAt(contents)
34
+6,
-1
1@@ -18,6 +18,7 @@ import (
2 "github.com/picosh/pico/shared/storage"
3 "github.com/picosh/pico/wish/cms/util"
4 "github.com/picosh/pico/wish/send/utils"
5+ "go.uber.org/zap"
6 )
7
8 var maxSize = 1 * shared.GB
9@@ -73,6 +74,10 @@ func (h *UploadImgHandler) removePost(data *PostMetaData) error {
10 return nil
11 }
12
13+func (h *UploadImgHandler) GetLogger() *zap.SugaredLogger {
14+ return h.Cfg.Logger
15+}
16+
17 func (h *UploadImgHandler) Read(s ssh.Session, entry *utils.FileEntry) (os.FileInfo, utils.ReaderAtCloser, error) {
18 user, err := getUser(s)
19 if err != nil {
20@@ -102,7 +107,7 @@ func (h *UploadImgHandler) Read(s ssh.Session, entry *utils.FileEntry) (os.FileI
21 return nil, nil, err
22 }
23
24- contents, _, err := h.Storage.GetFile(bucket, post.Filename)
25+ contents, _, _, err := h.Storage.GetFile(bucket, post.Filename)
26 if err != nil {
27 return nil, nil, err
28 }
+2,
-0
1@@ -60,6 +60,7 @@ func (h *UploadImgHandler) metaImg(data *PostMetaData) error {
2 bucket,
3 data.Filename,
4 utils.NopReaderAtCloser(reader),
5+ &utils.FileEntry{},
6 )
7 if err != nil {
8 return err
9@@ -106,6 +107,7 @@ func (h *UploadImgHandler) metaImg(data *PostMetaData) error {
10 bucket,
11 finalName,
12 utils.NopReaderAtCloser(webpReader),
13+ &utils.FileEntry{},
14 )
15 if err != nil {
16 return err
+5,
-0
1@@ -17,6 +17,7 @@ import (
2 "github.com/picosh/pico/shared/storage"
3 "github.com/picosh/pico/wish/cms/util"
4 "github.com/picosh/pico/wish/send/utils"
5+ "go.uber.org/zap"
6 )
7
8 type ctxUserKey struct{}
9@@ -61,6 +62,10 @@ func NewScpPostHandler(dbpool db.DB, cfg *shared.ConfigSite, hooks ScpFileHooks,
10 }
11 }
12
13+func (h *ScpUploadHandler) GetLogger() *zap.SugaredLogger {
14+ return h.Cfg.Logger
15+}
16+
17 func (h *ScpUploadHandler) Read(s ssh.Session, entry *utils.FileEntry) (os.FileInfo, utils.ReaderAtCloser, error) {
18 user, err := getUser(s)
19 if err != nil {
+1,
-1
1@@ -284,7 +284,7 @@ func imgHandler(w http.ResponseWriter, h *ImgHandler) {
2 fname = fmt.Sprintf("%s.webp", shared.SanitizeFileExt(post.Filename))
3 }
4
5- contents, _, err := h.Storage.GetFile(bucket, fname)
6+ contents, _, _, err := h.Storage.GetFile(bucket, fname)
7 if err != nil {
8 h.Logger.Infof(
9 "file not found %s/%s in storage (bucket: %s, name: %s)",
+2,
-2
1@@ -210,7 +210,7 @@ func assetHandler(w http.ResponseWriter, h *AssetHandler) {
2 }
3
4 var redirects []*RedirectRule
5- redirectFp, _, err := h.Storage.GetFile(bucket, filepath.Join(h.ProjectDir, "_redirects"))
6+ redirectFp, _, _, err := h.Storage.GetFile(bucket, filepath.Join(h.ProjectDir, "_redirects"))
7 if err == nil {
8 defer redirectFp.Close()
9 buf := new(strings.Builder)
10@@ -232,7 +232,7 @@ func assetHandler(w http.ResponseWriter, h *AssetHandler) {
11 assetFilepath := ""
12 status := 200
13 for _, fp := range routes {
14- c, _, err := h.Storage.GetFile(bucket, fp.Filepath)
15+ c, _, _, err := h.Storage.GetFile(bucket, fp.Filepath)
16 if err == nil {
17 contents = c
18 assetFilepath = fp.Filepath
1@@ -8,6 +8,7 @@ import (
2 "path"
3 "path/filepath"
4 "strings"
5+ "time"
6
7 "github.com/picosh/pico/wish/send/utils"
8 )
9@@ -83,21 +84,21 @@ func (s *StorageFS) DeleteBucket(bucket Bucket) error {
10 return os.RemoveAll(bucket.Path)
11 }
12
13-func (s *StorageFS) GetFile(bucket Bucket, fpath string) (utils.ReaderAtCloser, int64, error) {
14+func (s *StorageFS) GetFile(bucket Bucket, fpath string) (utils.ReaderAtCloser, int64, time.Time, error) {
15 dat, err := os.Open(filepath.Join(bucket.Path, fpath))
16 if err != nil {
17- return nil, 0, err
18+ return nil, 0, time.Time{}, err
19 }
20
21 info, err := dat.Stat()
22 if err != nil {
23- return nil, 0, err
24+ return nil, 0, time.Time{}, err
25 }
26
27- return dat, info.Size(), nil
28+ return dat, info.Size(), info.ModTime(), nil
29 }
30
31-func (s *StorageFS) PutFile(bucket Bucket, fpath string, contents utils.ReaderAtCloser) (string, error) {
32+func (s *StorageFS) PutFile(bucket Bucket, fpath string, contents utils.ReaderAtCloser, entry *utils.FileEntry) (string, error) {
33 loc := filepath.Join(bucket.Path, fpath)
34 err := os.MkdirAll(filepath.Dir(loc), os.ModePerm)
35 if err != nil {
36@@ -107,13 +108,19 @@ func (s *StorageFS) PutFile(bucket Bucket, fpath string, contents utils.ReaderAt
37 if err != nil {
38 return "", err
39 }
40- defer f.Close()
41
42 _, err = io.Copy(f, contents)
43 if err != nil {
44 return "", err
45 }
46
47+ f.Close()
48+
49+ if entry.Mtime > 0 {
50+ uTime := time.Unix(entry.Mtime, 0)
51+ os.Chtimes(loc, uTime, uTime)
52+ }
53+
54 return loc, nil
55 }
56
1@@ -6,7 +6,9 @@ import (
2 "fmt"
3 "net/url"
4 "os"
5+ "strconv"
6 "strings"
7+ "time"
8
9 "github.com/minio/madmin-go/v3"
10 "github.com/minio/minio-go/v7"
11@@ -110,11 +112,20 @@ func (s *StorageMinio) ListFiles(bucket Bucket, dir string, recursive bool) ([]o
12 isDir = true
13 }
14
15+ modTime := time.Time{}
16+
17+ if mtime, ok := obj.UserMetadata["Mtime"]; ok {
18+ mtimeUnix, err := strconv.Atoi(mtime)
19+ if err == nil {
20+ modTime = time.Unix(int64(mtimeUnix), 0)
21+ }
22+ }
23+
24 info := &utils.VirtualFile{
25 FName: strings.TrimSuffix(strings.TrimPrefix(obj.Key, resolved), "/"),
26 FIsDir: isDir,
27 FSize: obj.Size,
28- FModTime: obj.LastModified,
29+ FModTime: modTime,
30 }
31 fileList = append(fileList, info)
32 }
33@@ -126,24 +137,40 @@ func (s *StorageMinio) DeleteBucket(bucket Bucket) error {
34 return s.Client.RemoveBucket(context.TODO(), bucket.Name)
35 }
36
37-func (s *StorageMinio) GetFile(bucket Bucket, fpath string) (utils.ReaderAtCloser, int64, error) {
38- // we have to stat the object first to see if it exists
39- // https://github.com/minio/minio-go/issues/654
40+func (s *StorageMinio) GetFile(bucket Bucket, fpath string) (utils.ReaderAtCloser, int64, time.Time, error) {
41+ modTime := time.Time{}
42+
43 info, err := s.Client.StatObject(context.Background(), bucket.Name, fpath, minio.StatObjectOptions{})
44 if err != nil {
45- return nil, 0, err
46+ return nil, 0, modTime, err
47 }
48
49 obj, err := s.Client.GetObject(context.Background(), bucket.Name, fpath, minio.GetObjectOptions{})
50 if err != nil {
51- return nil, 0, err
52+ return nil, 0, modTime, err
53 }
54
55- return obj, info.Size, nil
56+ if mtime, ok := info.UserMetadata["Mtime"]; ok {
57+ mtimeUnix, err := strconv.Atoi(mtime)
58+ if err == nil {
59+ modTime = time.Unix(int64(mtimeUnix), 0)
60+ }
61+ }
62+
63+ return obj, info.Size, modTime, nil
64 }
65
66-func (s *StorageMinio) PutFile(bucket Bucket, fpath string, contents utils.ReaderAtCloser) (string, error) {
67- info, err := s.Client.PutObject(context.TODO(), bucket.Name, fpath, contents, -1, minio.PutObjectOptions{})
68+func (s *StorageMinio) PutFile(bucket Bucket, fpath string, contents utils.ReaderAtCloser, entry *utils.FileEntry) (string, error) {
69+ opts := minio.PutObjectOptions{}
70+
71+ if entry.Mtime > 0 {
72+ opts.UserMetadata = map[string]string{
73+ "Mtime": fmt.Sprint(entry.Mtime),
74+ }
75+ }
76+
77+ info, err := s.Client.PutObject(context.TODO(), bucket.Name, fpath, contents, -1, opts)
78+
79 if err != nil {
80 return "", err
81 }
1@@ -2,6 +2,7 @@ package storage
2
3 import (
4 "os"
5+ "time"
6
7 "github.com/picosh/pico/wish/send/utils"
8 )
9@@ -17,8 +18,8 @@ type ObjectStorage interface {
10
11 DeleteBucket(bucket Bucket) error
12 GetBucketQuota(bucket Bucket) (uint64, error)
13- GetFile(bucket Bucket, fpath string) (utils.ReaderAtCloser, int64, error)
14- PutFile(bucket Bucket, fpath string, contents utils.ReaderAtCloser) (string, error)
15+ GetFile(bucket Bucket, fpath string) (utils.ReaderAtCloser, int64, time.Time, error)
16+ PutFile(bucket Bucket, fpath string, contents utils.ReaderAtCloser, entry *utils.FileEntry) (string, error)
17 DeleteFile(bucket Bucket, fpath string) error
18 ListFiles(bucket Bucket, dir string, recursive bool) ([]os.FileInfo, error)
19 }
+5,
-0
1@@ -11,11 +11,16 @@ import (
2 "github.com/charmbracelet/wish"
3 "github.com/picosh/pico/wish/send"
4 "github.com/picosh/pico/wish/send/utils"
5+ "go.uber.org/zap"
6 )
7
8 type handler struct {
9 }
10
11+func (h *handler) GetLogger() *zap.SugaredLogger {
12+ return zap.NewNop().Sugar()
13+}
14+
15 func (h *handler) Write(session ssh.Session, file *utils.FileEntry) (string, error) {
16 str := fmt.Sprintf("Received file: %+v from session: %+v", file, session)
17 log.Print(str)
+28,
-23
1@@ -5,10 +5,9 @@ import (
2 "fmt"
3 "io"
4 "io/fs"
5- "log"
6 "os"
7 "path"
8- "path/filepath"
9+ "slices"
10 "strings"
11
12 "github.com/antoniomika/go-rsync-receiver/rsyncreceiver"
13@@ -23,30 +22,34 @@ type handler struct {
14 session ssh.Session
15 writeHandler utils.CopyFromClientHandler
16 root string
17+ recursive bool
18 }
19
20 func (h *handler) Skip(file *rsyncutils.ReceiverFile) bool {
21- log.Printf("SKIP %+v", file)
22- return file.FileMode().IsDir()
23+ if file.FileMode().IsDir() {
24+ return true
25+ }
26+
27+ fI, _, err := h.writeHandler.Read(h.session, &utils.FileEntry{Filepath: path.Join("/", h.root, file.Name)})
28+ if err == nil && fI.ModTime().Equal(file.ModTime) && file.Length == fI.Size() {
29+ return true
30+ }
31+
32+ return false
33 }
34
35 func (h *handler) List(rPath string) ([]fs.FileInfo, error) {
36- log.Println("LIST", rPath)
37 isDir := false
38 if rPath == "." {
39 rPath = "/"
40 isDir = true
41 }
42
43- list, err := h.writeHandler.List(h.session, rPath, isDir, true)
44+ list, err := h.writeHandler.List(h.session, rPath, isDir, h.recursive)
45 if err != nil {
46 return nil, err
47 }
48
49- for _, f := range list {
50- log.Printf("first %+v", f)
51- }
52-
53 var dirs []string
54
55 var newList []fs.FileInfo
56@@ -91,20 +94,23 @@ func (h *handler) List(rPath string) ([]fs.FileInfo, error) {
57 })
58 }
59
60+ slices.Reverse(newList)
61+
62+ onlyEmpty := true
63 for _, f := range newList {
64- log.Printf("%+v", f)
65+ if f.Name() != "" {
66+ onlyEmpty = false
67+ }
68 }
69
70- if len(newList) == 0 {
71- return nil, errors.New("no files to process")
72+ if len(newList) == 0 || onlyEmpty {
73+ return nil, errors.New("no files to send, the directory may not exist or could be empty")
74 }
75
76 return newList, nil
77 }
78
79 func (h *handler) Read(file *rsyncutils.SenderFile) (os.FileInfo, io.ReaderAt, error) {
80- log.Printf("READ %+v %s", file, h.root)
81-
82 filePath := file.WPath
83
84 if strings.HasSuffix(h.root, file.WPath) {
85@@ -113,22 +119,17 @@ func (h *handler) Read(file *rsyncutils.SenderFile) (os.FileInfo, io.ReaderAt, e
86 filePath = path.Join(h.root, file.Path, file.WPath)
87 }
88
89- log.Printf("READ %+v %s", file, filePath)
90-
91 return h.writeHandler.Read(h.session, &utils.FileEntry{Filepath: filePath})
92 }
93
94 func (h *handler) Put(file *rsyncutils.ReceiverFile) (int64, error) {
95- log.Printf("PUT %+v", file)
96- fpath := path.Join("/", h.root)
97 fileEntry := &utils.FileEntry{
98- Filepath: filepath.Join(fpath, file.Name),
99+ Filepath: path.Join("/", h.root, file.Name),
100 Mode: fs.FileMode(0600),
101 Size: file.Length,
102 Mtime: file.ModTime.Unix(),
103 Atime: file.ModTime.Unix(),
104 }
105- log.Printf("%+v", fileEntry)
106 fileEntry.Reader = file.Buf
107
108 msg, err := h.writeHandler.Write(h.session, fileEntry)
109@@ -165,8 +166,10 @@ func Middleware(writeHandler utils.CopyFromClientHandler) wish.Middleware {
110 opts, parser := rsyncsender.NewGetOpt()
111 _, _ = parser.Parse(cmdFlags)
112
113+ fileHandler.recursive = opts.Recurse
114+
115 if err := rsyncsender.ClientRun(opts, session, fileHandler, fileHandler.root, true); err != nil {
116- log.Println("error running rsync sender:", err)
117+ writeHandler.GetLogger().Error("error running rsync sender:", err)
118 }
119 return
120 }
121@@ -175,8 +178,10 @@ func Middleware(writeHandler utils.CopyFromClientHandler) wish.Middleware {
122 opts, parser := rsyncreceiver.NewGetOpt()
123 _, _ = parser.Parse(cmdFlags)
124
125+ fileHandler.recursive = opts.Recurse
126+
127 if _, err := rsyncreceiver.ClientRun(opts, session, fileHandler, true); err != nil {
128- log.Println("error running rsync receiver:", err)
129+ writeHandler.GetLogger().Error("error running rsync receiver:", err)
130 }
131 }
132 }
+1,
-2
1@@ -3,7 +3,6 @@ package sftp
2 import (
3 "errors"
4 "io"
5- "log"
6
7 "github.com/charmbracelet/ssh"
8 "github.com/picosh/pico/wish/send/utils"
9@@ -45,7 +44,7 @@ func SubsystemHandler(writeHandler utils.CopyFromClientHandler) ssh.SubsystemHan
10
11 err = requestServer.Serve()
12 if err != nil && !errors.Is(err, io.EOF) {
13- log.Println("Error serving sftp subsystem:", err)
14+ writeHandler.GetLogger().Error("Error serving sftp subsystem:", err)
15 }
16 }
17 }
+2,
-0
1@@ -10,6 +10,7 @@ import (
2 "strconv"
3
4 "github.com/charmbracelet/ssh"
5+ "go.uber.org/zap"
6 )
7
8 // NULL is an array with a single NULL byte.
9@@ -59,6 +60,7 @@ type CopyFromClientHandler interface {
10 Write(ssh.Session, *FileEntry) (string, error)
11 Read(ssh.Session, *FileEntry) (os.FileInfo, ReaderAtCloser, error)
12 List(ssh ssh.Session, path string, isDir bool, recursive bool) ([]os.FileInfo, error)
13+ GetLogger() *zap.SugaredLogger
14 Validate(ssh.Session) error
15 }
16