osfs: add optional buffering

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2023-05-16 18:08:14 +02:00
parent e10487ad57
commit adad8e658b
32 changed files with 895 additions and 170 deletions

View File

@@ -15,6 +15,7 @@
package vfs
import (
"bufio"
"errors"
"fmt"
"io"
@@ -30,6 +31,7 @@ import (
fscopy "github.com/otiai10/copy"
"github.com/pkg/sftp"
"github.com/rs/xid"
"github.com/sftpgo/sdk"
"github.com/drakkan/sftpgo/v2/internal/logger"
"github.com/drakkan/sftpgo/v2/internal/util"
@@ -54,16 +56,33 @@ type OsFs struct {
connectionID string
rootDir string
// if not empty this fs is mouted as virtual folder in the specified path
mountPath string
mountPath string
localTempDir string
readBufferSize int
writeBufferSize int
}
// NewOsFs returns an OsFs object that allows to interact with local Os filesystem
func NewOsFs(connectionID, rootDir, mountPath string) Fs {
func NewOsFs(connectionID, rootDir, mountPath string, config *sdk.OSFsConfig) Fs {
var tempDir string
if tempPath != "" {
tempDir = tempPath
} else {
tempDir = filepath.Clean(os.TempDir())
}
var readBufferSize, writeBufferSize int
if config != nil {
readBufferSize = config.ReadBufferSize * 1024 * 1024
writeBufferSize = config.WriteBufferSize * 1024 * 1024
}
return &OsFs{
name: osFsName,
connectionID: connectionID,
rootDir: rootDir,
mountPath: getMountPath(mountPath),
name: osFsName,
connectionID: connectionID,
rootDir: rootDir,
mountPath: getMountPath(mountPath),
localTempDir: tempDir,
readBufferSize: readBufferSize,
writeBufferSize: writeBufferSize,
}
}
@@ -88,7 +107,7 @@ func (fs *OsFs) Lstat(name string) (os.FileInfo, error) {
}
// Open opens the named file for reading
func (*OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
func (fs *OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func(), error) {
f, err := os.Open(name)
if err != nil {
return nil, nil, nil, err
@@ -100,19 +119,65 @@ func (*OsFs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, func()
return nil, nil, nil, err
}
}
return f, nil, nil, err
if fs.readBufferSize <= 0 {
return f, nil, nil, err
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
f.Close()
return nil, nil, nil, err
}
go func() {
br := bufio.NewReaderSize(f, fs.readBufferSize)
n, err := doCopy(w, br, nil)
w.CloseWithError(err) //nolint:errcheck
f.Close()
fsLog(fs, logger.LevelDebug, "download completed, path: %q size: %v, err: %v", name, n, err)
}()
return nil, r, nil, nil
}
// Create creates or opens the named file for writing
func (*OsFs) Create(name string, flag, _ int) (File, *PipeWriter, func(), error) {
var err error
var f *os.File
if flag == 0 {
f, err = os.Create(name)
} else {
f, err = os.OpenFile(name, flag, 0666)
func (fs *OsFs) Create(name string, flag, _ int) (File, *PipeWriter, func(), error) {
if !fs.useWriteBuffering(flag) {
var err error
var f *os.File
if flag == 0 {
f, err = os.Create(name)
} else {
f, err = os.OpenFile(name, flag, 0666)
}
return f, nil, nil, err
}
return f, nil, nil, err
f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return nil, nil, nil, err
}
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
f.Close()
return nil, nil, nil, err
}
p := NewPipeWriter(w)
go func() {
bw := bufio.NewWriterSize(f, fs.writeBufferSize)
n, err := doCopy(bw, r, nil)
errFlush := bw.Flush()
if err == nil && errFlush != nil {
err = errFlush
}
errClose := f.Close()
if err == nil && errClose != nil {
err = errClose
}
r.CloseWithError(err) //nolint:errcheck
p.Done(err)
fsLog(fs, logger.LevelDebug, "upload completed, path: %q, readed bytes: %v, err: %v", name, n, err)
}()
return nil, p, nil, nil
}
// Rename renames (moves) source to target
@@ -124,10 +189,16 @@ func (fs *OsFs) Rename(source, target string) (int, int64, error) {
if err != nil && isCrossDeviceError(err) {
fsLog(fs, logger.LevelError, "cross device error detected while renaming %q -> %q. Trying a copy and remove, this could take a long time",
source, target)
var readBufferSize uint
if fs.readBufferSize > 0 {
readBufferSize = uint(fs.readBufferSize)
}
err = fscopy.Copy(source, target, fscopy.Options{
OnSymlink: func(src string) fscopy.SymlinkAction {
return fscopy.Skip
},
CopyBufferSize: readBufferSize,
})
if err != nil {
fsLog(fs, logger.LevelError, "cross device copy error: %v", err)
@@ -509,3 +580,21 @@ func (*OsFs) Close() error {
func (*OsFs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error) {
return getStatFS(dirName)
}
func (fs *OsFs) useWriteBuffering(flag int) bool {
if fs.writeBufferSize <= 0 {
return false
}
if flag == 0 {
return true
}
if flag&os.O_TRUNC == 0 {
fsLog(fs, logger.LevelDebug, "truncate flag missing, buffering write not possible")
return false
}
if flag&os.O_RDWR != 0 {
fsLog(fs, logger.LevelDebug, "read and write flag found, buffering write not possible")
return false
}
return true
}