vfs: make PipeWriter an interface

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2023-10-22 16:30:04 +02:00
parent e3c4ee0833
commit 320e404e4d
12 changed files with 30 additions and 21 deletions

View File

@@ -32,7 +32,7 @@ type transfer struct {
expectedOffset int64 expectedOffset int64
} }
func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader, func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *vfs.PipeReader,
expectedOffset int64) *transfer { expectedOffset int64) *transfer {
var writer io.WriteCloser var writer io.WriteCloser
var reader io.ReadCloser var reader io.ReadCloser

View File

@@ -28,7 +28,7 @@ type httpdFile struct {
isFinished bool isFinished bool
} }
func newHTTPDFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader) *httpdFile { func newHTTPDFile(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *vfs.PipeReader) *httpdFile {
var writer io.WriteCloser var writer io.WriteCloser
var reader io.ReadCloser var reader io.ReadCloser
if baseTransfer.File != nil { if baseTransfer.File != nil {

View File

@@ -58,7 +58,7 @@ type transfer struct {
isFinished bool isFinished bool
} }
func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *vfs.PipeReader, func newTransfer(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *vfs.PipeReader,
errForRead error) *transfer { errForRead error) *transfer {
var writer writerAtCloser var writer writerAtCloser
var reader readerAtCloser var reader readerAtCloser

View File

@@ -224,7 +224,7 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *PipeReader, func(
} }
// Create creates or opens the named file for writing // Create creates or opens the named file for writing
func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) { func (fs *AzureBlobFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
if checks&CheckParentDir != 0 { if checks&CheckParentDir != 0 {
_, err := fs.Stat(path.Dir(name)) _, err := fs.Stat(path.Dir(name))
if err != nil { if err != nil {
@@ -1195,7 +1195,7 @@ func (fs *AzureBlobFs) getCopyOptions() *blob.StartCopyFromURLOptions {
return copyOptions return copyOptions
} }
func (fs *AzureBlobFs) downloadToWriter(name string, w *PipeWriter) error { func (fs *AzureBlobFs) downloadToWriter(name string, w PipeWriter) error {
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
defer cancelFn() defer cancelFn()

View File

@@ -154,7 +154,7 @@ func (fs *CryptFs) Open(name string, offset int64) (File, *PipeReader, func(), e
} }
// Create creates or opens the named file for writing // Create creates or opens the named file for writing
func (fs *CryptFs) Create(name string, _, _ int) (File, *PipeWriter, func(), error) { func (fs *CryptFs) Create(name string, _, _ int) (File, PipeWriter, func(), error) {
f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err

View File

@@ -167,7 +167,7 @@ func (fs *GCSFs) Open(name string, offset int64) (File, *PipeReader, func(), err
} }
// Create creates or opens the named file for writing // Create creates or opens the named file for writing
func (fs *GCSFs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) { func (fs *GCSFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
if checks&CheckParentDir != 0 { if checks&CheckParentDir != 0 {
_, err := fs.Stat(path.Dir(name)) _, err := fs.Stat(path.Dir(name))
if err != nil { if err != nil {
@@ -777,7 +777,7 @@ func (fs *GCSFs) setWriterAttrs(objectWriter *storage.Writer, contentType string
} }
} }
func (fs *GCSFs) downloadToWriter(name string, w *PipeWriter) error { func (fs *GCSFs) downloadToWriter(name string, w PipeWriter) error {
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
defer cancelFn() defer cancelFn()

View File

@@ -332,7 +332,7 @@ func (fs *HTTPFs) Open(name string, offset int64) (File, *PipeReader, func(), er
} }
// Create creates or opens the named file for writing // Create creates or opens the named file for writing
func (fs *HTTPFs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) { func (fs *HTTPFs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir) r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err

View File

@@ -134,7 +134,7 @@ func (fs *OsFs) Open(name string, offset int64) (File, *PipeReader, func(), erro
} }
// Create creates or opens the named file for writing // Create creates or opens the named file for writing
func (fs *OsFs) Create(name string, flag, _ int) (File, *PipeWriter, func(), error) { func (fs *OsFs) Create(name string, flag, _ int) (File, PipeWriter, func(), error) {
if !fs.useWriteBuffering(flag) { if !fs.useWriteBuffering(flag) {
var err error var err error
var f *os.File var f *os.File

View File

@@ -241,7 +241,7 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *PipeReader, func(), erro
} }
// Create creates or opens the named file for writing // Create creates or opens the named file for writing
func (fs *S3Fs) Create(name string, flag, checks int) (File, *PipeWriter, func(), error) { func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), error) {
if checks&CheckParentDir != 0 { if checks&CheckParentDir != 0 {
_, err := fs.Stat(path.Dir(name)) _, err := fs.Stat(path.Dir(name))
if err != nil { if err != nil {
@@ -1050,7 +1050,7 @@ func (*S3Fs) GetAvailableDiskSize(_ string) (*sftp.StatVFS, error) {
return nil, ErrStorageSizeUnavailable return nil, ErrStorageSizeUnavailable
} }
func (fs *S3Fs) downloadToWriter(name string, w *PipeWriter) error { func (fs *S3Fs) downloadToWriter(name string, w PipeWriter) error {
fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name) fsLog(fs, logger.LevelDebug, "starting download before resuming upload, path %q", name)
ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout) ctx, cancelFn := context.WithTimeout(context.Background(), preResumeTimeout)
defer cancelFn() defer cancelFn()

View File

@@ -372,7 +372,7 @@ func (fs *SFTPFs) Open(name string, offset int64) (File, *PipeReader, func(), er
} }
// Create creates or opens the named file for writing // Create creates or opens the named file for writing
func (fs *SFTPFs) Create(name string, flag, _ int) (File, *PipeWriter, func(), error) { func (fs *SFTPFs) Create(name string, flag, _ int) (File, PipeWriter, func(), error) {
client, err := fs.conn.getClient() client, err := fs.conn.getClient()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err

View File

@@ -110,7 +110,7 @@ type Fs interface {
Stat(name string) (os.FileInfo, error) Stat(name string) (os.FileInfo, error)
Lstat(name string) (os.FileInfo, error) Lstat(name string) (os.FileInfo, error)
Open(name string, offset int64) (File, *PipeReader, func(), error) Open(name string, offset int64) (File, *PipeReader, func(), error)
Create(name string, flag, checks int) (File, *PipeWriter, func(), error) Create(name string, flag, checks int) (File, PipeWriter, func(), error)
Rename(source, target string) (int, int64, error) Rename(source, target string) (int, int64, error)
Remove(name string, isDir bool) error Remove(name string, isDir bool) error
Mkdir(name string) error Mkdir(name string) error
@@ -174,6 +174,15 @@ type File interface {
Truncate(size int64) error Truncate(size int64) error
} }
// PipeWriter defines an interface representing a SFTPGo pipe writer
type PipeWriter interface {
io.Writer
io.WriterAt
io.Closer
Done(err error)
GetWrittenBytes() int64
}
// Metadater defines an interface to implement to return metadata for a file // Metadater defines an interface to implement to return metadata for a file
type Metadater interface { type Metadater interface {
Metadata() map[string]string Metadata() map[string]string
@@ -707,16 +716,16 @@ func (c *CryptFsConfig) validate() error {
return nil return nil
} }
// PipeWriter defines a wrapper for pipeat.PipeWriterAt. // pipeWriter defines a wrapper for pipeat.PipeWriterAt.
type PipeWriter struct { type pipeWriter struct {
*pipeat.PipeWriterAt *pipeat.PipeWriterAt
err error err error
done chan bool done chan bool
} }
// NewPipeWriter initializes a new PipeWriter // NewPipeWriter initializes a new PipeWriter
func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter { func NewPipeWriter(w *pipeat.PipeWriterAt) PipeWriter {
return &PipeWriter{ return &pipeWriter{
PipeWriterAt: w, PipeWriterAt: w,
err: nil, err: nil,
done: make(chan bool), done: make(chan bool),
@@ -724,7 +733,7 @@ func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter {
} }
// Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any. // Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
func (p *PipeWriter) Close() error { func (p *pipeWriter) Close() error {
p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null p.PipeWriterAt.Close() //nolint:errcheck // the returned error is always null
<-p.done <-p.done
return p.err return p.err
@@ -732,7 +741,7 @@ func (p *PipeWriter) Close() error {
// Done unlocks other goroutines waiting on Close(). // Done unlocks other goroutines waiting on Close().
// It must be called when the upload ends // It must be called when the upload ends
func (p *PipeWriter) Done(err error) { func (p *pipeWriter) Done(err error) {
p.err = err p.err = err
p.done <- true p.done <- true
} }

View File

@@ -51,7 +51,7 @@ type webDavFile struct {
readTried atomic.Bool readTried atomic.Bool
} }
func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter *vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt) *webDavFile { func newWebDavFile(baseTransfer *common.BaseTransfer, pipeWriter vfs.PipeWriter, pipeReader *pipeat.PipeReaderAt) *webDavFile {
var writer io.WriteCloser var writer io.WriteCloser
var reader io.ReadCloser var reader io.ReadCloser
if baseTransfer.File != nil { if baseTransfer.File != nil {