s3: improve rename performance

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-06-30 18:22:58 +02:00
parent bf2dcfe307
commit e46051299f
4 changed files with 194 additions and 132 deletions

View File

@@ -17,6 +17,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
@@ -904,6 +905,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *a
finished := false
var wg sync.WaitGroup
var errOnce sync.Once
var hasError int32
var poolError error
poolCtx, poolCancel := context.WithCancel(ctx)
@@ -920,7 +922,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *a
offset = end
guard <- struct{}{}
if poolError != nil {
if atomic.LoadInt32(&hasError) == 1 {
fsLog(fs, logger.LevelDebug, "pool error, download for part %v not started", part)
break
}
@@ -941,8 +943,9 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *a
err := fs.downloadPart(innerCtx, blockBlob, buf, writer, start, count, writeOffset)
if err != nil {
errOnce.Do(func() {
poolError = err
fsLog(fs, logger.LevelError, "multipart download error: %+v", poolError)
fsLog(fs, logger.LevelError, "multipart download error: %+v", err)
atomic.StoreInt32(&hasError, 1)
poolError = fmt.Errorf("multipart download error: %w", err)
poolCancel()
})
}
@@ -971,6 +974,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
var blocks []string
var wg sync.WaitGroup
var errOnce sync.Once
var hasError int32
var poolError error
poolCtx, poolCancel := context.WithCancel(ctx)
@@ -998,7 +1002,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
blocks = append(blocks, blockID)
guard <- struct{}{}
if poolError != nil {
if atomic.LoadInt32(&hasError) == 1 {
fsLog(fs, logger.LevelError, "pool error, upload for part %v not started", part)
pool.releaseBuffer(buf)
break
@@ -1021,8 +1025,9 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
_, err := blockBlob.StageBlock(innerCtx, blockID, bufferReader, &azblob.BlockBlobStageBlockOptions{})
if err != nil {
errOnce.Do(func() {
poolError = err
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", poolError)
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
atomic.StoreInt32(&hasError, 1)
poolError = fmt.Errorf("multipart upload error: %w", err)
poolCancel()
})
}