s3: improve rename performance

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-06-30 18:25:40 +02:00
parent 756b122ab8
commit d3d788c8d0
4 changed files with 194 additions and 132 deletions

View File

@@ -17,6 +17,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
@@ -904,6 +905,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *a
finished := false
var wg sync.WaitGroup
var errOnce sync.Once
var hasError int32
var poolError error
poolCtx, poolCancel := context.WithCancel(ctx)
@@ -920,7 +922,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *a
offset = end
guard <- struct{}{}
if poolError != nil {
if atomic.LoadInt32(&hasError) == 1 {
fsLog(fs, logger.LevelDebug, "pool error, download for part %v not started", part)
break
}
@@ -941,8 +943,9 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *a
err := fs.downloadPart(innerCtx, blockBlob, buf, writer, start, count, writeOffset)
if err != nil {
errOnce.Do(func() {
poolError = err
fsLog(fs, logger.LevelError, "multipart download error: %+v", poolError)
fsLog(fs, logger.LevelError, "multipart download error: %+v", err)
atomic.StoreInt32(&hasError, 1)
poolError = fmt.Errorf("multipart download error: %w", err)
poolCancel()
})
}
@@ -971,6 +974,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
var blocks []string
var wg sync.WaitGroup
var errOnce sync.Once
var hasError int32
var poolError error
poolCtx, poolCancel := context.WithCancel(ctx)
@@ -998,7 +1002,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
blocks = append(blocks, blockID)
guard <- struct{}{}
if poolError != nil {
if atomic.LoadInt32(&hasError) == 1 {
fsLog(fs, logger.LevelError, "pool error, upload for part %v not started", part)
pool.releaseBuffer(buf)
break
@@ -1021,8 +1025,9 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
_, err := blockBlob.StageBlock(innerCtx, blockID, bufferReader, &azblob.BlockBlobStageBlockOptions{})
if err != nil {
errOnce.Do(func() {
poolError = err
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", poolError)
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
atomic.StoreInt32(&hasError, 1)
poolError = fmt.Errorf("multipart upload error: %w", err)
poolCancel()
})
}

View File

@@ -14,7 +14,10 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
@@ -300,7 +303,7 @@ func (fs *S3Fs) Rename(source, target string) error {
copySource = pathEscape(copySource)
if fi.Size() > 500*1024*1024 {
fsLog(fs, logger.LevelDebug, "renaming file %#v with size %v, a multipart copy is required, this may take a while",
fsLog(fs, logger.LevelDebug, "renaming file %q with size %d using multipart copy",
source, fi.Size())
err = fs.doMultipartCopy(copySource, target, contentType, fi.Size())
} else {
@@ -822,43 +825,99 @@ func (fs *S3Fs) doMultipartCopy(source, target, contentType string, fileSize int
if uploadID == "" {
return errors.New("unable to get multipart copy upload ID")
}
maxPartSize := int64(500 * 1024 * 1024)
completedParts := make([]types.CompletedPart, 0)
partNumber := int32(1)
for copied := int64(0); copied < fileSize; copied += maxPartSize {
innerCtx, innerCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer innerCancelFn()
partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
Bucket: aws.String(fs.config.Bucket),
CopySource: aws.String(source),
Key: aws.String(target),
PartNumber: partNumber,
UploadId: aws.String(uploadID),
CopySourceRange: aws.String(getMultipartCopyRange(copied, maxPartSize, fileSize)),
})
if err != nil {
fsLog(fs, logger.LevelError, "unable to copy part number %v: %+v", partNumber, err)
abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer abortCancelFn()
_, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(fs.config.Bucket),
Key: aws.String(target),
UploadId: aws.String(uploadID),
})
if errAbort != nil {
fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
}
return fmt.Errorf("error copying part number %v: %w", partNumber, err)
}
completedParts = append(completedParts, types.CompletedPart{
ETag: partResp.CopyPartResult.ETag,
PartNumber: partNumber,
})
partNumber++
// We use 32 MB part size and copy 10 parts in parallel.
// These values are arbitrary. We don't want to start too many goroutines
maxPartSize := int64(32 * 1024 * 1024)
if fileSize > int64(100*1024*1024*1024) {
maxPartSize = int64(500 * 1024 * 1024)
}
guard := make(chan struct{}, 10)
finished := false
var completedParts []types.CompletedPart
var partMutex sync.Mutex
var wg sync.WaitGroup
var hasError int32
var errOnce sync.Once
var copyError error
var partNumber int32
var offset int64
opCtx, opCancel := context.WithCancel(context.Background())
defer opCancel()
for partNumber = 1; !finished; partNumber++ {
start := offset
end := offset + maxPartSize
if end >= fileSize {
end = fileSize
finished = true
}
offset = end
guard <- struct{}{}
if atomic.LoadInt32(&hasError) == 1 {
fsLog(fs, logger.LevelDebug, "previous multipart copy error, copy for part %d not started", partNumber)
break
}
wg.Add(1)
go func(partNum int32, partStart, partEnd int64) {
defer func() {
<-guard
wg.Done()
}()
innerCtx, innerCancelFn := context.WithDeadline(opCtx, time.Now().Add(fs.ctxTimeout))
defer innerCancelFn()
partResp, err := fs.svc.UploadPartCopy(innerCtx, &s3.UploadPartCopyInput{
Bucket: aws.String(fs.config.Bucket),
CopySource: aws.String(source),
Key: aws.String(target),
PartNumber: partNum,
UploadId: aws.String(uploadID),
CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", partStart, partEnd-1)),
})
if err != nil {
errOnce.Do(func() {
fsLog(fs, logger.LevelError, "unable to copy part number %d: %+v", partNum, err)
atomic.StoreInt32(&hasError, 1)
copyError = fmt.Errorf("error copying part number %d: %w", partNum, err)
opCancel()
abortCtx, abortCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer abortCancelFn()
_, errAbort := fs.svc.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(fs.config.Bucket),
Key: aws.String(target),
UploadId: aws.String(uploadID),
})
if errAbort != nil {
fsLog(fs, logger.LevelError, "unable to abort multipart copy: %+v", errAbort)
}
})
return
}
partMutex.Lock()
completedParts = append(completedParts, types.CompletedPart{
ETag: partResp.CopyPartResult.ETag,
PartNumber: partNum,
})
partMutex.Unlock()
}(partNumber, start, end)
}
wg.Wait()
close(guard)
if copyError != nil {
return copyError
}
sort.Slice(completedParts, func(i, j int) bool {
return completedParts[i].PartNumber < completedParts[j].PartNumber
})
completeCtx, completeCancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer completeCancelFn()
@@ -929,15 +988,6 @@ func (fs *S3Fs) getStorageID() string {
return fmt.Sprintf("s3://%v", fs.config.Bucket)
}
func getMultipartCopyRange(start, maxPartSize, fileSize int64) string {
end := start + maxPartSize - 1
if end > fileSize {
end = fileSize - 1
}
return fmt.Sprintf("bytes=%v-%v", start, end)
}
func getAWSHTTPClient(timeout int, idleConnectionTimeout time.Duration) *awshttp.BuildableClient {
c := awshttp.NewBuildableClient().
WithDialerOptions(func(d *net.Dialer) {