mirror of
https://github.com/drakkan/sftpgo.git
synced 2025-12-07 14:50:55 +03:00
add experimental plugin system
This commit is contained in:
36
vfs/s3fs.go
36
vfs/s3fs.go
@@ -23,8 +23,8 @@ import (
|
||||
"github.com/pkg/sftp"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
"github.com/drakkan/sftpgo/v2/metrics"
|
||||
"github.com/drakkan/sftpgo/v2/utils"
|
||||
"github.com/drakkan/sftpgo/v2/metric"
|
||||
"github.com/drakkan/sftpgo/v2/util"
|
||||
"github.com/drakkan/sftpgo/v2/version"
|
||||
)
|
||||
|
||||
@@ -178,6 +178,10 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun
|
||||
}
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
downloader := s3manager.NewDownloaderWithClient(fs.svc)
|
||||
/*downloader.RequestOptions = append(downloader.RequestOptions, func(r *request.Request) {
|
||||
newCtx, _ := context.WithTimeout(r.Context(), time.Minute)
|
||||
r.SetContext(newCtx)
|
||||
})*/
|
||||
var streamRange *string
|
||||
if offset > 0 {
|
||||
streamRange = aws.String(fmt.Sprintf("bytes=%v-", offset))
|
||||
@@ -192,7 +196,7 @@ func (fs *S3Fs) Open(name string, offset int64) (File, *pipeat.PipeReaderAt, fun
|
||||
})
|
||||
w.CloseWithError(err) //nolint:errcheck
|
||||
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
|
||||
metrics.S3TransferCompleted(n, 1, err)
|
||||
metric.S3TransferCompleted(n, 1, err)
|
||||
}()
|
||||
return nil, r, cancelFn, nil
|
||||
}
|
||||
@@ -219,8 +223,8 @@ func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error)
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Key: aws.String(key),
|
||||
Body: r,
|
||||
StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
|
||||
ContentType: utils.NilIfEmpty(contentType),
|
||||
StorageClass: util.NilIfEmpty(fs.config.StorageClass),
|
||||
ContentType: util.NilIfEmpty(contentType),
|
||||
}, func(u *s3manager.Uploader) {
|
||||
u.Concurrency = fs.config.UploadConcurrency
|
||||
u.PartSize = fs.config.UploadPartSize
|
||||
@@ -229,7 +233,7 @@ func (fs *S3Fs) Create(name string, flag int) (File, *PipeWriter, func(), error)
|
||||
p.Done(err)
|
||||
fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, response: %v, readed bytes: %v, err: %+v",
|
||||
name, response, r.GetReadedBytes(), err)
|
||||
metrics.S3TransferCompleted(r.GetReadedBytes(), 0, err)
|
||||
metric.S3TransferCompleted(r.GetReadedBytes(), 0, err)
|
||||
}()
|
||||
return nil, p, cancelFn, nil
|
||||
}
|
||||
@@ -280,10 +284,10 @@ func (fs *S3Fs) Rename(source, target string) error {
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
CopySource: aws.String(url.PathEscape(copySource)),
|
||||
Key: aws.String(target),
|
||||
StorageClass: utils.NilIfEmpty(fs.config.StorageClass),
|
||||
ContentType: utils.NilIfEmpty(contentType),
|
||||
StorageClass: util.NilIfEmpty(fs.config.StorageClass),
|
||||
ContentType: util.NilIfEmpty(contentType),
|
||||
})
|
||||
metrics.S3CopyObjectCompleted(err)
|
||||
metric.S3CopyObjectCompleted(err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -310,7 +314,7 @@ func (fs *S3Fs) Remove(name string, isDir bool) error {
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Key: aws.String(name),
|
||||
})
|
||||
metrics.S3DeleteObjectCompleted(err)
|
||||
metric.S3DeleteObjectCompleted(err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -418,7 +422,7 @@ func (fs *S3Fs) ReadDir(dirname string) ([]os.FileInfo, error) {
|
||||
}
|
||||
return true
|
||||
})
|
||||
metrics.S3ListObjectsCompleted(err)
|
||||
metric.S3ListObjectsCompleted(err)
|
||||
return result, err
|
||||
}
|
||||
|
||||
@@ -505,7 +509,7 @@ func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
|
||||
}
|
||||
return true
|
||||
})
|
||||
metrics.S3ListObjectsCompleted(err)
|
||||
metric.S3ListObjectsCompleted(err)
|
||||
return numFiles, size, err
|
||||
}
|
||||
|
||||
@@ -574,7 +578,7 @@ func (fs *S3Fs) Walk(root string, walkFn filepath.WalkFunc) error {
|
||||
}
|
||||
return true
|
||||
})
|
||||
metrics.S3ListObjectsCompleted(err)
|
||||
metric.S3ListObjectsCompleted(err)
|
||||
walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), err) //nolint:errcheck
|
||||
|
||||
return err
|
||||
@@ -621,7 +625,7 @@ func (fs *S3Fs) checkIfBucketExists() error {
|
||||
_, err := fs.svc.HeadBucketWithContext(ctx, &s3.HeadBucketInput{
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
})
|
||||
metrics.S3HeadBucketCompleted(err)
|
||||
metric.S3HeadBucketCompleted(err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -641,7 +645,7 @@ func (fs *S3Fs) hasContents(name string) (bool, error) {
|
||||
Prefix: aws.String(prefix),
|
||||
MaxKeys: &maxResults,
|
||||
})
|
||||
metrics.S3ListObjectsCompleted(err)
|
||||
metric.S3ListObjectsCompleted(err)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -664,7 +668,7 @@ func (fs *S3Fs) headObject(name string) (*s3.HeadObjectOutput, error) {
|
||||
Bucket: aws.String(fs.config.Bucket),
|
||||
Key: aws.String(name),
|
||||
})
|
||||
metrics.S3HeadObjectCompleted(err)
|
||||
metric.S3HeadObjectCompleted(err)
|
||||
return obj, err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user