copy: fix quota for FsFileCopier

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2024-03-12 08:43:23 +01:00
parent f38966c6ac
commit ca2757d41e
7 changed files with 89 additions and 32 deletions

View File

@@ -636,8 +636,25 @@ func (fs *GCSFs) ResolvePath(virtualPath string) (string, error) {
}
// CopyFile implements the FsFileCopier interface
func (fs *GCSFs) CopyFile(source, target string, _ int64) error {
return fs.copyFileInternal(source, target)
func (fs *GCSFs) CopyFile(source, target string, srcSize int64) (int, int64, error) {
numFiles := 1
sizeDiff := srcSize
var conditions *storage.Conditions
attrs, err := fs.headObject(target)
if err == nil {
sizeDiff -= attrs.Size
numFiles = 0
conditions = &storage.Conditions{GenerationMatch: attrs.Generation}
} else {
if !fs.IsNotExist(err) {
return 0, 0, err
}
conditions = &storage.Conditions{DoesNotExist: true}
}
if err := fs.copyFileInternal(source, target, conditions); err != nil {
return 0, 0, err
}
return numFiles, sizeDiff, nil
}
func (fs *GCSFs) resolve(name, prefix, contentType string) (string, bool) {
@@ -732,17 +749,21 @@ func (fs *GCSFs) composeObjects(ctx context.Context, dst, partialObject *storage
return err
}
func (fs *GCSFs) copyFileInternal(source, target string) error {
func (fs *GCSFs) copyFileInternal(source, target string, conditions *storage.Conditions) error {
src := fs.svc.Bucket(fs.config.Bucket).Object(source)
dst := fs.svc.Bucket(fs.config.Bucket).Object(target)
attrs, statErr := fs.headObject(target)
if statErr == nil {
dst = dst.If(storage.Conditions{GenerationMatch: attrs.Generation})
} else if fs.IsNotExist(statErr) {
dst = dst.If(storage.Conditions{DoesNotExist: true})
if conditions != nil {
dst = dst.If(*conditions)
} else {
fsLog(fs, logger.LevelWarn, "unable to set precondition for rename, target %q, stat err: %v",
target, statErr)
attrs, err := fs.headObject(target)
if err == nil {
dst = dst.If(storage.Conditions{GenerationMatch: attrs.Generation})
} else if fs.IsNotExist(err) {
dst = dst.If(storage.Conditions{DoesNotExist: true})
} else {
fsLog(fs, logger.LevelWarn, "unable to set precondition for copy, target %q, stat err: %v",
target, err)
}
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
@@ -790,7 +811,7 @@ func (fs *GCSFs) renameInternal(source, target string, fi os.FileInfo, recursion
}
}
} else {
if err := fs.copyFileInternal(source, target); err != nil {
if err := fs.copyFileInternal(source, target, nil); err != nil {
return numFiles, filesSize, err
}
numFiles++