s3: use multipart uploads only when multiple parts are needed

Fixes #2016

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2025-07-11 18:45:30 +02:00
parent 66a20f34f8
commit bdd097b1c7

View File

@@ -306,7 +306,7 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(),
} else { } else {
contentType = mime.TypeByExtension(path.Ext(name)) contentType = mime.TypeByExtension(path.Ext(name))
} }
err := fs.handleMultipartUpload(ctx, r, name, contentType) err := fs.handleUpload(ctx, r, name, contentType)
r.CloseWithError(err) //nolint:errcheck r.CloseWithError(err) //nolint:errcheck
p.Done(err) p.Done(err)
fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v", fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v",
@@ -890,7 +890,36 @@ func (fs *S3Fs) abortMultipartUpload(name, uploadID string) error {
return err return err
} }
func (fs *S3Fs) handleMultipartUpload(ctx context.Context, reader io.Reader, name, contentType string) error { func (fs *S3Fs) singlePartUpload(ctx context.Context, name, contentType string, data []byte) error {
contentLength := int64(len(data))
_, err := fs.svc.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(fs.config.Bucket),
Key: aws.String(name),
ACL: types.ObjectCannedACL(fs.config.ACL),
Body: bytes.NewReader(data),
ContentType: util.NilIfEmpty(contentType),
ContentLength: &contentLength,
SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey),
SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo),
SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5),
StorageClass: types.StorageClass(fs.config.StorageClass),
})
return err
}
func (fs *S3Fs) handleUpload(ctx context.Context, reader io.Reader, name, contentType string) error {
pool := newBufferAllocator(int(fs.config.UploadPartSize))
defer pool.free()
firstBuf := pool.getBuffer()
firstReadSize, err := readFill(reader, firstBuf)
if err == io.EOF {
return fs.singlePartUpload(ctx, name, contentType, firstBuf[:firstReadSize])
}
if err != nil {
return err
}
uploadID, err := fs.initiateMultipartUpload(ctx, name, contentType) uploadID, err := fs.initiateMultipartUpload(ctx, name, contentType)
if err != nil { if err != nil {
return err return err
@@ -905,9 +934,6 @@ func (fs *S3Fs) handleMultipartUpload(ctx context.Context, reader io.Reader, nam
var errOnce sync.Once var errOnce sync.Once
var partNumber int32 var partNumber int32
pool := newBufferAllocator(int(fs.config.UploadPartSize))
defer pool.free()
poolCtx, poolCancel := context.WithCancel(ctx) poolCtx, poolCancel := context.WithCancel(ctx)
defer poolCancel() defer poolCancel()
@@ -921,12 +947,40 @@ func (fs *S3Fs) handleMultipartUpload(ctx context.Context, reader io.Reader, nam
} }
} }
for partNumber = 1; !finished; partNumber++ { uploadPart := func(partNum int32, buf []byte, bytesRead int) {
defer func() {
pool.releaseBuffer(buf)
<-guard
wg.Done()
}()
etag, err := fs.uploadPart(poolCtx, name, uploadID, partNum, buf[:bytesRead])
if err != nil {
errOnce.Do(func() {
finalizeFailedUpload(err)
})
return
}
partMutex.Lock()
completedParts = append(completedParts, types.CompletedPart{
PartNumber: &partNum,
ETag: etag,
})
partMutex.Unlock()
}
partNumber = 1
guard <- struct{}{}
wg.Add(1)
go uploadPart(partNumber, firstBuf, firstReadSize)
for partNumber = 2; !finished; partNumber++ {
buf := pool.getBuffer() buf := pool.getBuffer()
n, err := readFill(reader, buf) n, err := readFill(reader, buf)
if err == io.EOF { if err == io.EOF {
if n == 0 && partNumber > 1 { if n == 0 {
pool.releaseBuffer(buf) pool.releaseBuffer(buf)
break break
} }
@@ -946,27 +1000,7 @@ func (fs *S3Fs) handleMultipartUpload(ctx context.Context, reader io.Reader, nam
} }
wg.Add(1) wg.Add(1)
go func(partNum int32, buf []byte, bytesRead int) { go uploadPart(partNumber, buf, n)
defer func() {
pool.releaseBuffer(buf)
<-guard
wg.Done()
}()
etag, err := fs.uploadPart(poolCtx, name, uploadID, partNum, buf[:bytesRead])
if err != nil {
errOnce.Do(func() {
finalizeFailedUpload(err)
})
return
}
partMutex.Lock()
completedParts = append(completedParts, types.CompletedPart{
PartNumber: &partNum,
ETag: etag,
})
partMutex.Unlock()
}(partNumber, buf, n)
} }
wg.Wait() wg.Wait()