conditional support for recursive renaming for cloud providers

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2023-01-06 12:33:50 +01:00
parent f0dedbfabf
commit 8cad436421
26 changed files with 650 additions and 473 deletions

View File

@@ -120,8 +120,7 @@ func (fs *GCSFs) Stat(name string) (os.FileInfo, error) {
if fs.config.KeyPrefix == name+"/" {
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
}
_, info, err := fs.getObjectStat(name)
return info, err
return fs.getObjectStat(name)
}
// Lstat returns a FileInfo describing the named file
@@ -224,71 +223,15 @@ func (fs *GCSFs) Create(name string, flag int) (File, *PipeWriter, func(), error
}
// Rename renames (moves) source to target.
// We don't support renaming non empty directories since we should
// rename all the contents too and this could take long time: think
// about directories with thousands of files, for each file we should
// execute a CopyObject call.
func (fs *GCSFs) Rename(source, target string) error {
func (fs *GCSFs) Rename(source, target string) (int, int64, error) {
if source == target {
return nil
return -1, -1, nil
}
realSourceName, fi, err := fs.getObjectStat(source)
fi, err := fs.getObjectStat(source)
if err != nil {
return err
return -1, -1, err
}
if fi.IsDir() {
hasContents, err := fs.hasContents(source)
if err != nil {
return err
}
if hasContents {
return fmt.Errorf("cannot rename non empty directory: %#v", source)
}
if err := fs.mkdirInternal(target); err != nil {
return err
}
} else {
src := fs.svc.Bucket(fs.config.Bucket).Object(realSourceName)
dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
attrs, statErr := fs.headObject(target)
if statErr == nil {
dst = dst.If(storage.Conditions{GenerationMatch: attrs.Generation})
} else if fs.IsNotExist(statErr) {
dst = dst.If(storage.Conditions{DoesNotExist: true})
} else {
fsLog(fs, logger.LevelWarn, "unable to set precondition for rename, target %q, stat err: %v",
target, statErr)
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
defer cancelFn()
copier := dst.CopierFrom(src)
if fs.config.StorageClass != "" {
copier.StorageClass = fs.config.StorageClass
}
if fs.config.ACL != "" {
copier.PredefinedACL = fs.config.ACL
}
contentType := mime.TypeByExtension(path.Ext(source))
if contentType != "" {
copier.ContentType = contentType
}
_, err = copier.Run(ctx)
metric.GCSCopyObjectCompleted(err)
if err != nil {
return err
}
if plugin.Handler.HasMetadater() {
err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
if err != nil {
fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
source, target, err)
}
}
}
return fs.Remove(source, fi.IsDir())
return fs.renameInternal(source, target, fi)
}
// Remove removes the named file or (empty) directory.
@@ -526,51 +469,7 @@ func (fs *GCSFs) CheckRootPath(username string, uid int, gid int) bool {
// ScanRootDirContents returns the number of files contained in the bucket,
// and their size
func (fs *GCSFs) ScanRootDirContents() (int, int64, error) {
numFiles := 0
size := int64(0)
query := &storage.Query{Prefix: fs.config.KeyPrefix}
err := query.SetAttrSelection(gcsDefaultFieldsSelection)
if err != nil {
return numFiles, size, err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
defer cancelFn()
bkt := fs.svc.Bucket(fs.config.Bucket)
it := bkt.Objects(ctx, query)
pager := iterator.NewPager(it, defaultGCSPageSize, "")
for {
var objects []*storage.ObjectAttrs
pageToken, err := pager.NextPage(&objects)
if err != nil {
metric.GCSListObjectsCompleted(err)
return numFiles, size, err
}
for _, attrs := range objects {
if !attrs.Deleted.IsZero() {
continue
}
isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType
if isDir && attrs.Size == 0 {
continue
}
numFiles++
size += attrs.Size
if numFiles%1000 == 0 {
fsLog(fs, logger.LevelDebug, "root dir scan in progress, files: %d, size: %d", numFiles, size)
}
}
objects = nil
if pageToken == "" {
break
}
}
metric.GCSListObjectsCompleted(nil)
return numFiles, size, err
return fs.GetDirSize(fs.config.KeyPrefix)
}
func (fs *GCSFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
@@ -636,8 +535,54 @@ func (fs *GCSFs) CheckMetadata() error {
// GetDirSize returns the number of files and the size for a folder
// including any subfolders
func (*GCSFs) GetDirSize(dirname string) (int, int64, error) {
return 0, 0, ErrVfsUnsupported
func (fs *GCSFs) GetDirSize(dirname string) (int, int64, error) {
prefix := fs.getPrefix(dirname)
numFiles := 0
size := int64(0)
query := &storage.Query{Prefix: prefix}
err := query.SetAttrSelection(gcsDefaultFieldsSelection)
if err != nil {
return numFiles, size, err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
defer cancelFn()
bkt := fs.svc.Bucket(fs.config.Bucket)
it := bkt.Objects(ctx, query)
pager := iterator.NewPager(it, defaultGCSPageSize, "")
for {
var objects []*storage.ObjectAttrs
pageToken, err := pager.NextPage(&objects)
if err != nil {
metric.GCSListObjectsCompleted(err)
return numFiles, size, err
}
for _, attrs := range objects {
if !attrs.Deleted.IsZero() {
continue
}
isDir := strings.HasSuffix(attrs.Name, "/") || attrs.ContentType == dirMimeType
if isDir && attrs.Size == 0 {
continue
}
numFiles++
size += attrs.Size
if numFiles%1000 == 0 {
fsLog(fs, logger.LevelDebug, "dirname %q scan in progress, files: %d, size: %d", dirname, numFiles, size)
}
}
objects = nil
if pageToken == "" {
break
}
}
metric.GCSListObjectsCompleted(nil)
return numFiles, size, err
}
// GetAtomicUploadPath returns the path to use for an atomic upload.
@@ -755,35 +700,118 @@ func (fs *GCSFs) resolve(name, prefix, contentType string) (string, bool) {
}
// getObjectStat returns the stat result and the real object name as first value
func (fs *GCSFs) getObjectStat(name string) (string, os.FileInfo, error) {
func (fs *GCSFs) getObjectStat(name string) (os.FileInfo, error) {
attrs, err := fs.headObject(name)
var info os.FileInfo
if err == nil {
objSize := attrs.Size
objectModTime := attrs.Updated
isDir := attrs.ContentType == dirMimeType || strings.HasSuffix(attrs.Name, "/")
info, err = updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, objSize, objectModTime, false))
return name, info, err
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, objSize, objectModTime, false))
}
if !fs.IsNotExist(err) {
return "", nil, err
return nil, err
}
// now check if this is a prefix (virtual directory)
hasContents, err := fs.hasContents(name)
if err != nil {
return "", nil, err
return nil, err
}
if hasContents {
info, err = updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
return name, info, err
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
}
// finally check if this is an object with a trailing /
attrs, err = fs.headObject(name + "/")
if err != nil {
return "", nil, err
return nil, err
}
info, err = updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, attrs.Size, attrs.Updated, false))
return name + "/", info, err
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, attrs.Size, attrs.Updated, false))
}
func (fs *GCSFs) copyFileInternal(source, target string) error {
src := fs.svc.Bucket(fs.config.Bucket).Object(source)
dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
attrs, statErr := fs.headObject(target)
if statErr == nil {
dst = dst.If(storage.Conditions{GenerationMatch: attrs.Generation})
} else if fs.IsNotExist(statErr) {
dst = dst.If(storage.Conditions{DoesNotExist: true})
} else {
fsLog(fs, logger.LevelWarn, "unable to set precondition for rename, target %q, stat err: %v",
target, statErr)
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
defer cancelFn()
copier := dst.CopierFrom(src)
if fs.config.StorageClass != "" {
copier.StorageClass = fs.config.StorageClass
}
if fs.config.ACL != "" {
copier.PredefinedACL = fs.config.ACL
}
contentType := mime.TypeByExtension(path.Ext(source))
if contentType != "" {
copier.ContentType = contentType
}
_, err := copier.Run(ctx)
metric.GCSCopyObjectCompleted(err)
return err
}
func (fs *GCSFs) renameInternal(source, target string, fi os.FileInfo) (int, int64, error) {
var numFiles int
var filesSize int64
if fi.IsDir() {
if renameMode == 0 {
hasContents, err := fs.hasContents(source)
if err != nil {
return numFiles, filesSize, err
}
if hasContents {
return numFiles, filesSize, fmt.Errorf("cannot rename non empty directory: %q", source)
}
}
if err := fs.mkdirInternal(target); err != nil {
return numFiles, filesSize, err
}
if renameMode == 1 {
entries, err := fs.ReadDir(source)
if err != nil {
return numFiles, filesSize, err
}
for _, info := range entries {
sourceEntry := fs.Join(source, info.Name())
targetEntry := fs.Join(target, info.Name())
files, size, err := fs.renameInternal(sourceEntry, targetEntry, info)
if err != nil {
return numFiles, filesSize, err
}
numFiles += files
filesSize += size
}
}
} else {
if err := fs.copyFileInternal(source, target); err != nil {
return numFiles, filesSize, err
}
numFiles++
filesSize += fi.Size()
if plugin.Handler.HasMetadater() {
err := plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
if err != nil {
fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %q -> %q: %+v",
source, target, err)
}
}
}
err := fs.Remove(source, fi.IsDir())
if fs.IsNotExist(err) {
err = nil
}
return numFiles, filesSize, err
}
func (fs *GCSFs) mkdirInternal(name string) error {