sftpd: improve truncate

quota usage and max allowed write size are now properly updated after a
truncate
This commit is contained in:
Nicola Murino
2020-08-22 10:12:00 +02:00
parent 7381a867ba
commit 5208e4a4ca
22 changed files with 433 additions and 171 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/drakkan/sftpgo/dataprovider"
"github.com/drakkan/sftpgo/logger"
"github.com/drakkan/sftpgo/metrics"
"github.com/drakkan/sftpgo/vfs"
)
var (
@@ -21,6 +22,7 @@ var (
// BaseTransfer contains protocols common transfer details for an upload or a download.
type BaseTransfer struct { //nolint:maligned
ID uint64
Fs vfs.Fs
File *os.File
Connection *BaseConnection
cancelFn func()
@@ -33,6 +35,7 @@ type BaseTransfer struct { //nolint:maligned
requestPath string
BytesSent int64
BytesReceived int64
MaxWriteSize int64
AbortTransfer int32
sync.Mutex
ErrTransfer error
@@ -40,7 +43,7 @@ type BaseTransfer struct { //nolint:maligned
// NewBaseTransfer returns a new BaseTransfer and adds it to the given connection
func NewBaseTransfer(file *os.File, conn *BaseConnection, cancelFn func(), fsPath, requestPath string, transferType int,
minWriteOffset, initialSize int64, isNewFile bool) *BaseTransfer {
minWriteOffset, initialSize, maxWriteSize int64, isNewFile bool, fs vfs.Fs) *BaseTransfer {
t := &BaseTransfer{
ID: conn.GetTransferID(),
File: file,
@@ -55,7 +58,9 @@ func NewBaseTransfer(file *os.File, conn *BaseConnection, cancelFn func(), fsPat
requestPath: requestPath,
BytesSent: 0,
BytesReceived: 0,
MaxWriteSize: maxWriteSize,
AbortTransfer: 0,
Fs: fs,
}
conn.AddTransfer(t)
@@ -110,18 +115,33 @@ func (t *BaseTransfer) SetCancelFn(cancelFn func()) {
// Truncate changes the size of the opened file.
// Supported for local fs only
func (t *BaseTransfer) Truncate(fsPath string, size int64) error {
func (t *BaseTransfer) Truncate(fsPath string, size int64) (int64, error) {
if fsPath == t.GetFsPath() {
if t.File != nil {
return t.File.Truncate(size)
initialSize := t.InitialSize
err := t.File.Truncate(size)
if err == nil {
t.Lock()
t.InitialSize = size
if t.MaxWriteSize > 0 {
sizeDiff := initialSize - size
t.MaxWriteSize += sizeDiff
metrics.TransferCompleted(atomic.LoadInt64(&t.BytesSent), atomic.LoadInt64(&t.BytesReceived), t.transferType, t.ErrTransfer)
atomic.StoreInt64(&t.BytesReceived, 0)
}
t.Unlock()
}
t.Connection.Log(logger.LevelDebug, "file %#v truncated to size %v max write size %v new initial size %v err: %v",
fsPath, size, t.MaxWriteSize, t.InitialSize, err)
return initialSize, err
}
if size == 0 {
if size == 0 && atomic.LoadInt64(&t.BytesSent) == 0 {
// for cloud providers the file is always truncated to zero, we don't support append/resume for uploads
return nil
return 0, nil
}
return ErrOpUnsupported
return 0, ErrOpUnsupported
}
return errTransferMismatch
return 0, errTransferMismatch
}
// TransferError is called if there is an unexpected error.
@@ -190,10 +210,17 @@ func (t *BaseTransfer) Close() error {
atomic.LoadInt64(&t.BytesSent), t.ErrTransfer)
go action.execute() //nolint:errcheck
} else {
fileSize := atomic.LoadInt64(&t.BytesReceived) + t.MinWriteOffset
info, err := t.Fs.Stat(t.fsPath)
if err == nil {
fileSize = info.Size()
}
t.Connection.Log(logger.LevelDebug, "upload file size %v stat error %v", fileSize, err)
t.updateQuota(numFiles, fileSize)
logger.TransferLog(uploadLogSender, t.fsPath, elapsed, atomic.LoadInt64(&t.BytesReceived), t.Connection.User.Username,
t.Connection.ID, t.Connection.protocol)
action := newActionNotification(&t.Connection.User, operationUpload, t.fsPath, "", "", t.Connection.protocol,
atomic.LoadInt64(&t.BytesReceived)+t.MinWriteOffset, t.ErrTransfer)
fileSize, t.ErrTransfer)
go action.execute() //nolint:errcheck
}
if t.ErrTransfer != nil {
@@ -202,26 +229,25 @@ func (t *BaseTransfer) Close() error {
err = t.ErrTransfer
}
}
t.updateQuota(numFiles)
return err
}
func (t *BaseTransfer) updateQuota(numFiles int) bool {
func (t *BaseTransfer) updateQuota(numFiles int, fileSize int64) bool {
// S3 uploads are atomic, if there is an error nothing is uploaded
if t.File == nil && t.ErrTransfer != nil {
return false
}
bytesReceived := atomic.LoadInt64(&t.BytesReceived)
if t.transferType == TransferUpload && (numFiles != 0 || bytesReceived > 0) {
sizeDiff := fileSize - t.InitialSize
if t.transferType == TransferUpload && (numFiles != 0 || sizeDiff > 0) {
vfolder, err := t.Connection.User.GetVirtualFolderForPath(path.Dir(t.requestPath))
if err == nil {
dataprovider.UpdateVirtualFolderQuota(vfolder.BaseVirtualFolder, numFiles, //nolint:errcheck
bytesReceived-t.InitialSize, false)
sizeDiff, false)
if vfolder.IsIncludedInUserQuota() {
dataprovider.UpdateUserQuota(t.Connection.User, numFiles, bytesReceived-t.InitialSize, false) //nolint:errcheck
dataprovider.UpdateUserQuota(t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
}
} else {
dataprovider.UpdateUserQuota(t.Connection.User, numFiles, bytesReceived-t.InitialSize, false) //nolint:errcheck
dataprovider.UpdateUserQuota(t.Connection.User, numFiles, sizeDiff, false) //nolint:errcheck
}
return true
}