add basic S3-Compatible Object Storage support

we have now an interface for filesystem backeds, this make easy to add
new filesystem backends
This commit is contained in:
Nicola Murino
2020-01-19 07:41:05 +01:00
parent 0b42dbc3c3
commit a4834f4a83
40 changed files with 2315 additions and 420 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/drakkan/sftpgo/dataprovider"
"github.com/drakkan/sftpgo/logger"
"github.com/drakkan/sftpgo/metrics"
"github.com/eikenb/pipeat"
)
const (
@@ -26,6 +27,9 @@ var (
// It implements the io Reader and Writer interface to handle files downloads and uploads
type Transfer struct {
file *os.File
writerAt *pipeat.PipeWriterAt
readerAt *pipeat.PipeReaderAt
cancelFn func()
path string
start time.Time
bytesSent int64
@@ -52,6 +56,9 @@ func (t *Transfer) TransferError(err error) {
return
}
t.transferError = err
if t.cancelFn != nil {
t.cancelFn()
}
elapsed := time.Since(t.start).Nanoseconds() / 1000000
logger.Warn(logSender, t.connectionID, "Unexpected error for transfer, path: %#v, error: \"%v\" bytes sent: %v, "+
"bytes received: %v transfer running since %v ms", t.path, t.transferError, t.bytesSent, t.bytesReceived, elapsed)
@@ -61,7 +68,13 @@ func (t *Transfer) TransferError(err error) {
// It handles download bandwidth throttling too
func (t *Transfer) ReadAt(p []byte, off int64) (n int, err error) {
t.lastActivity = time.Now()
readed, e := t.file.ReadAt(p, off)
var readed int
var e error
if t.readerAt != nil {
readed, e = t.readerAt.ReadAt(p, off)
} else {
readed, e = t.file.ReadAt(p, off)
}
t.lock.Lock()
t.bytesSent += int64(readed)
t.lock.Unlock()
@@ -82,7 +95,13 @@ func (t *Transfer) WriteAt(p []byte, off int64) (n int, err error) {
t.TransferError(err)
return 0, err
}
written, e := t.file.WriteAt(p, off)
var written int
var e error
if t.writerAt != nil {
written, e = t.writerAt.WriteAt(p, off)
} else {
written, e = t.file.WriteAt(p, off)
}
t.lock.Lock()
t.bytesReceived += int64(written)
t.lock.Unlock()
@@ -105,14 +124,14 @@ func (t *Transfer) Close() error {
if t.isFinished {
return errTransferClosed
}
err := t.file.Close()
err := t.closeIO()
t.isFinished = true
numFiles := 0
if t.isNewFile {
numFiles = 1
}
t.checkDownloadSize()
if t.transferType == transferUpload && t.file.Name() != t.path {
if t.transferType == transferUpload && t.file != nil && t.file.Name() != t.path {
if t.transferError == nil || uploadMode == uploadModeAtomicWithResume {
err = os.Rename(t.file.Name(), t.path)
logger.Debug(logSender, t.connectionID, "atomic upload completed, rename: %#v -> %#v, error: %v",
@@ -150,6 +169,18 @@ func (t *Transfer) Close() error {
return err
}
func (t *Transfer) closeIO() error {
var err error
if t.writerAt != nil {
err = t.writerAt.Close()
} else if t.readerAt != nil {
err = t.readerAt.Close()
} else {
err = t.file.Close()
}
return err
}
func (t *Transfer) checkDownloadSize() {
if t.transferType == transferDownload && t.transferError == nil && t.bytesSent < t.expectedSize {
t.transferError = fmt.Errorf("incomplete download: %v/%v bytes transferred", t.bytesSent, t.expectedSize)