From dc42680e1c6e64e33986bd48cc2c7374d8e00aa9 Mon Sep 17 00:00:00 2001 From: Nicola Murino Date: Sun, 25 Aug 2024 15:17:17 +0200 Subject: [PATCH] add pipeReaderAt and pipeWriterAt interfaces Signed-off-by: Nicola Murino --- internal/vfs/azblobfs.go | 5 ++-- internal/vfs/cryptfs.go | 5 ++-- internal/vfs/gcsfs.go | 20 ++++++++------ internal/vfs/httpfs.go | 5 ++-- internal/vfs/osfs.go | 5 ++-- internal/vfs/s3fs.go | 5 ++-- internal/vfs/sftpfs.go | 5 ++-- internal/vfs/vfs.go | 44 +++++++++++++++++++++++-------- internal/webdavd/file.go | 3 +-- internal/webdavd/internal_test.go | 2 +- 10 files changed, 59 insertions(+), 40 deletions(-) diff --git a/internal/vfs/azblobfs.go b/internal/vfs/azblobfs.go index 1c69680a..9391b132 100644 --- a/internal/vfs/azblobfs.go +++ b/internal/vfs/azblobfs.go @@ -42,7 +42,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" - "github.com/eikenb/pipeat" "github.com/google/uuid" "github.com/pkg/sftp" @@ -213,7 +212,7 @@ func (fs *AzureBlobFs) Lstat(name string) (os.FileInfo, error) { // Open opens the named file for reading func (fs *AzureBlobFs) Open(name string, offset int64) (File, PipeReader, func(), error) { - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, fs.config.DownloadPartSize*int64(fs.config.DownloadConcurrency)+1) if err != nil { return nil, nil, nil, err } @@ -241,7 +240,7 @@ func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter, return nil, nil, nil, err } } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, fs.config.UploadPartSize+1024*1024) if err != nil { return nil, nil, nil, err } diff --git a/internal/vfs/cryptfs.go b/internal/vfs/cryptfs.go index 3f20fb37..7b2082f8 100644 --- a/internal/vfs/cryptfs.go +++ b/internal/vfs/cryptfs.go @@ -24,7 +24,6 @@ import ( "net/http" "os" - "github.com/eikenb/pipeat" "github.com/minio/sio" "golang.org/x/crypto/hkdf" @@ -89,7 +88,7 @@ func (fs *CryptFs) Open(name string, offset int64) (File, PipeReader, func(), er f.Close() return nil, nil, nil, err } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { f.Close() return nil, nil, nil, err @@ -175,7 +174,7 @@ func (fs *CryptFs) Create(name string, _, _ int) (File, PipeWriter, func(), erro f.Close() return nil, nil, nil, err } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { f.Close() return nil, nil, nil, err diff --git a/internal/vfs/gcsfs.go b/internal/vfs/gcsfs.go index a169d750..c301915a 100644 --- a/internal/vfs/gcsfs.go +++ b/internal/vfs/gcsfs.go @@ -32,7 +32,6 @@ import ( "time" "cloud.google.com/go/storage" - "github.com/eikenb/pipeat" "github.com/pkg/sftp" "github.com/rs/xid" "google.golang.org/api/googleapi" @@ -89,13 +88,16 @@ func NewGCSFs(connectionID, localTempDir, mountPath string, config GCSFsConfig) } ctx := context.Background() if fs.config.AutomaticCredentials > 0 { - fs.svc, err = storage.NewClient(ctx) + fs.svc, err = storage.NewClient(ctx, storage.WithJSONReads()) } else { err = fs.config.Credentials.TryDecrypt() if err != nil { return fs, err } - fs.svc, err = storage.NewClient(ctx, option.WithCredentialsJSON([]byte(fs.config.Credentials.GetPayload()))) + fs.svc, err = storage.NewClient(ctx, + storage.WithJSONReads(), + option.WithCredentialsJSON([]byte(fs.config.Credentials.GetPayload())), + ) } return fs, err } @@ -128,7 +130,7 @@ func (fs *GCSFs) Lstat(name string) (os.FileInfo, error) { // Open opens the named file for reading func (fs *GCSFs) Open(name string, offset int64) (File, PipeReader, func(), error) { - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { return nil, nil, nil, err } @@ -176,7 +178,11 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func() return nil, nil, nil, err } } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + chunkSize := googleapi.DefaultUploadChunkSize + if fs.config.UploadPartSize > 0 { + chunkSize = int(fs.config.UploadPartSize) * 1024 * 1024 + } + r, w, err := createPipeFn(fs.localTempDir, int64(chunkSize+1024*1024)) if err != nil { return nil, nil, nil, err } @@ -220,9 +226,7 @@ func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func() objectWriter = obj.NewWriter(ctx) } - if fs.config.UploadPartSize > 0 { - objectWriter.ChunkSize = int(fs.config.UploadPartSize) * 1024 * 1024 - } + objectWriter.ChunkSize = chunkSize if fs.config.UploadPartMaxTime > 0 { objectWriter.ChunkRetryDeadline = time.Duration(fs.config.UploadPartMaxTime) * time.Second } diff --git a/internal/vfs/httpfs.go b/internal/vfs/httpfs.go index b6ddb6fa..f9550924 100644 --- a/internal/vfs/httpfs.go +++ b/internal/vfs/httpfs.go @@ -32,7 +32,6 @@ import ( "strings" "time" - "github.com/eikenb/pipeat" "github.com/pkg/sftp" "github.com/sftpgo/sdk" @@ -317,7 +316,7 @@ func (fs *HTTPFs) Lstat(name string) (os.FileInfo, error) { // Open opens the named file for reading func (fs *HTTPFs) Open(name string, offset int64) (File, PipeReader, func(), error) { - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { return nil, nil, nil, err } @@ -351,7 +350,7 @@ func (fs *HTTPFs) Open(name string, offset int64) (File, PipeReader, func(), err // Create creates or opens the named file for writing func (fs *HTTPFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) { - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { return nil, nil, nil, err } diff --git a/internal/vfs/osfs.go b/internal/vfs/osfs.go index f885cb5b..3746a443 100644 --- a/internal/vfs/osfs.go +++ b/internal/vfs/osfs.go @@ -28,7 +28,6 @@ import ( "strings" "time" - "github.com/eikenb/pipeat" fscopy "github.com/otiai10/copy" "github.com/pkg/sftp" "github.com/rs/xid" @@ -116,7 +115,7 @@ func (fs *OsFs) Open(name string, offset int64) (File, PipeReader, func(), error if fs.readBufferSize <= 0 { return f, nil, nil, err } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { f.Close() return nil, nil, nil, err @@ -149,7 +148,7 @@ func (fs *OsFs) Create(name string, flag, _ int) (File, PipeWriter, func(), erro if err != nil { return nil, nil, nil, err } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { f.Close() return nil, nil, nil, err diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index 20a8f092..70884f83 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -49,7 +49,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/sts" - "github.com/eikenb/pipeat" "github.com/pkg/sftp" "github.com/drakkan/sftpgo/v2/internal/logger" @@ -229,7 +228,7 @@ func (fs *S3Fs) Lstat(name string) (os.FileInfo, error) { // Open opens the named file for reading func (fs *S3Fs) Open(name string, offset int64) (File, PipeReader, func(), error) { - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, fs.config.DownloadPartSize*int64(fs.config.DownloadConcurrency)+1) if err != nil { return nil, nil, nil, err } @@ -287,7 +286,7 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), return nil, nil, nil, err } } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, fs.config.UploadPartSize+1024*1024) if err != nil { return nil, nil, nil, err } diff --git a/internal/vfs/sftpfs.go b/internal/vfs/sftpfs.go index 670ea175..31554c6e 100644 --- a/internal/vfs/sftpfs.go +++ b/internal/vfs/sftpfs.go @@ -35,7 +35,6 @@ import ( "sync/atomic" "time" - "github.com/eikenb/pipeat" "github.com/pkg/sftp" "github.com/robfig/cron/v3" "github.com/rs/xid" @@ -405,7 +404,7 @@ func (fs *SFTPFs) Open(name string, offset int64) (File, PipeReader, func(), err if fs.config.BufferSize == 0 { return f, nil, nil, nil } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { f.Close() return nil, nil, nil, err @@ -445,7 +444,7 @@ func (fs *SFTPFs) Create(name string, flag, _ int) (File, PipeWriter, func(), er if err != nil { return nil, nil, nil, err } - r, w, err := pipeat.PipeInDir(fs.localTempDir) + r, w, err := createPipeFn(fs.localTempDir, 0) if err != nil { f.Close() return nil, nil, nil, err diff --git a/internal/vfs/vfs.go b/internal/vfs/vfs.go index 03bec8bd..31c9e9eb 100644 --- a/internal/vfs/vfs.go +++ b/internal/vfs/vfs.go @@ -73,6 +73,12 @@ var ( uploadMode int ) +var ( + createPipeFn = func(dirPath string, _ int64) (pipeReaderAt, pipeWriterAt, error) { + return pipeat.PipeInDir(dirPath) + } +) + // SetAllowSelfConnections sets the desired behaviour for self connections func SetAllowSelfConnections(value int) { allowSelfConnections = value @@ -196,6 +202,22 @@ type PipeReader interface { Metadata() map[string]string } +type pipeReaderAt interface { + Read(p []byte) (int, error) + ReadAt(p []byte, offset int64) (int, error) + GetReadedBytes() int64 + Close() error + CloseWithError(err error) error +} + +type pipeWriterAt interface { + Write(p []byte) (int, error) + WriteAt(p []byte, offset int64) (int, error) + GetWrittenBytes() int64 + Close() error + CloseWithError(err error) error +} + // DirLister defines an interface for a directory lister type DirLister interface { Next(limit int) ([]os.FileInfo, error) @@ -867,25 +889,25 @@ func (c *CryptFsConfig) validate() error { return nil } -// pipeWriter defines a wrapper for pipeat.PipeWriterAt. +// pipeWriter defines a wrapper for a pipeWriterAt. type pipeWriter struct { - *pipeat.PipeWriterAt + pipeWriterAt err error done chan bool } // NewPipeWriter initializes a new PipeWriter -func NewPipeWriter(w *pipeat.PipeWriterAt) PipeWriter { +func NewPipeWriter(w pipeWriterAt) PipeWriter { return &pipeWriter{ - PipeWriterAt: w, + pipeWriterAt: w, err: nil, done: make(chan bool), } } -// Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any. +// Close waits for the upload to end, closes the pipeWriterAt and returns an error if any. func (p *pipeWriter) Close() error { - p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null + p.pipeWriterAt.Close() //nolint:errcheck // the returned error is always null <-p.done return p.err } @@ -897,10 +919,10 @@ func (p *pipeWriter) Done(err error) { p.done <- true } -func newPipeWriterAtOffset(w *pipeat.PipeWriterAt, offset int64) PipeWriter { +func newPipeWriterAtOffset(w pipeWriterAt, offset int64) PipeWriter { return &pipeWriterAtOffset{ pipeWriter: &pipeWriter{ - PipeWriterAt: w, + pipeWriterAt: w, err: nil, done: make(chan bool), }, @@ -929,15 +951,15 @@ func (p *pipeWriterAtOffset) Write(buf []byte) (int, error) { } // NewPipeReader initializes a new PipeReader -func NewPipeReader(r *pipeat.PipeReaderAt) PipeReader { +func NewPipeReader(r pipeReaderAt) PipeReader { return &pipeReader{ - PipeReaderAt: r, + pipeReaderAt: r, } } // pipeReader defines a wrapper for pipeat.PipeReaderAt. type pipeReader struct { - *pipeat.PipeReaderAt + pipeReaderAt mu sync.RWMutex metadata map[string]string } diff --git a/internal/webdavd/file.go b/internal/webdavd/file.go index 1f68dd65..32820bf5 100644 --- a/internal/webdavd/file.go +++ b/internal/webdavd/file.go @@ -28,7 +28,6 @@ import ( "time" "github.com/drakkan/webdav" - "github.com/eikenb/pipeat" "github.com/drakkan/sftpgo/v2/internal/common" "github.com/drakkan/sftpgo/v2/internal/dataprovider" @@ -52,7 +51,7 @@ type webDavFile struct { readTried atomic.Bool } -func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt) *webDavFile { +func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader vfs.PipeReader) *webDavFile { var writer io.WriteCloser var reader io.ReadCloser if baseTransfer.File != nil { diff --git a/internal/webdavd/internal_test.go b/internal/webdavd/internal_test.go index a3751e22..94e60313 100644 --- a/internal/webdavd/internal_test.go +++ b/internal/webdavd/internal_test.go @@ -806,7 +806,7 @@ func TestTransferReadWriteErrors(t *testing.T) { r, w, err := pipeat.Pipe() assert.NoError(t, err) - davFile = newWebDavFile(baseTransfer, nil, r) + davFile = newWebDavFile(baseTransfer, nil, vfs.NewPipeReader(r)) davFile.Connection.RemoveTransfer(davFile.BaseTransfer) davFile = newWebDavFile(baseTransfer, vfs.NewPipeWriter(w), nil) davFile.Connection.RemoveTransfer(davFile.BaseTransfer)