vfs: implement GetDirSize for Cloud Storage providers

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2023-01-06 18:00:25 +01:00
parent a5d6441fb6
commit 7d19d3f10b
9 changed files with 222 additions and 208 deletions

View File

@@ -568,38 +568,7 @@ func (fs *S3Fs) CheckRootPath(username string, uid int, gid int) bool {
// ScanRootDirContents returns the number of files contained in the bucket,
// and their size
func (fs *S3Fs) ScanRootDirContents() (int, int64, error) {
numFiles := 0
size := int64(0)
paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
Bucket: aws.String(fs.config.Bucket),
Prefix: aws.String(fs.config.KeyPrefix),
})
for paginator.HasMorePages() {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
page, err := paginator.NextPage(ctx)
if err != nil {
metric.S3ListObjectsCompleted(err)
return numFiles, size, err
}
for _, fileObject := range page.Contents {
isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
if isDir && fileObject.Size == 0 {
continue
}
numFiles++
size += fileObject.Size
if numFiles%1000 == 0 {
fsLog(fs, logger.LevelDebug, "root dir scan in progress, files: %d, size: %d", numFiles, size)
}
}
}
metric.S3ListObjectsCompleted(nil)
return numFiles, size, nil
return fs.GetDirSize(fs.config.KeyPrefix)
}
func (fs *S3Fs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
@@ -647,8 +616,40 @@ func (fs *S3Fs) CheckMetadata() error {
// GetDirSize returns the number of files and the size for a folder
// including any subfolders
func (*S3Fs) GetDirSize(dirname string) (int, int64, error) {
return 0, 0, ErrVfsUnsupported
func (fs *S3Fs) GetDirSize(dirname string) (int, int64, error) {
prefix := fs.getPrefix(dirname)
numFiles := 0
size := int64(0)
paginator := s3.NewListObjectsV2Paginator(fs.svc, &s3.ListObjectsV2Input{
Bucket: aws.String(fs.config.Bucket),
Prefix: aws.String(prefix),
})
for paginator.HasMorePages() {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
page, err := paginator.NextPage(ctx)
if err != nil {
metric.S3ListObjectsCompleted(err)
return numFiles, size, err
}
for _, fileObject := range page.Contents {
isDir := strings.HasSuffix(util.GetStringFromPointer(fileObject.Key), "/")
if isDir && fileObject.Size == 0 {
continue
}
numFiles++
size += fileObject.Size
if numFiles%1000 == 0 {
fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
}
}
}
metric.S3ListObjectsCompleted(nil)
return numFiles, size, nil
}
// GetAtomicUploadPath returns the path to use for an atomic upload.