diff --git a/internal/vfs/s3fs.go b/internal/vfs/s3fs.go index dc7118e3..d330133a 100644 --- a/internal/vfs/s3fs.go +++ b/internal/vfs/s3fs.go @@ -306,7 +306,7 @@ func (fs *S3Fs) Create(name string, flag, checks int) (File, PipeWriter, func(), } else { contentType = mime.TypeByExtension(path.Ext(name)) } - err := fs.handleMultipartUpload(ctx, r, name, contentType) + err := fs.handleUpload(ctx, r, name, contentType) r.CloseWithError(err) //nolint:errcheck p.Done(err) fsLog(fs, logger.LevelDebug, "upload completed, path: %q, acl: %q, readed bytes: %d, err: %+v", @@ -890,7 +890,36 @@ func (fs *S3Fs) abortMultipartUpload(name, uploadID string) error { return err } -func (fs *S3Fs) handleMultipartUpload(ctx context.Context, reader io.Reader, name, contentType string) error { +func (fs *S3Fs) singlePartUpload(ctx context.Context, name, contentType string, data []byte) error { + contentLength := int64(len(data)) + _, err := fs.svc.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(fs.config.Bucket), + Key: aws.String(name), + ACL: types.ObjectCannedACL(fs.config.ACL), + Body: bytes.NewReader(data), + ContentType: util.NilIfEmpty(contentType), + ContentLength: &contentLength, + SSECustomerKey: util.NilIfEmpty(fs.sseCustomerKey), + SSECustomerAlgorithm: util.NilIfEmpty(fs.sseCustomerAlgo), + SSECustomerKeyMD5: util.NilIfEmpty(fs.sseCustomerKeyMD5), + StorageClass: types.StorageClass(fs.config.StorageClass), + }) + return err +} + +func (fs *S3Fs) handleUpload(ctx context.Context, reader io.Reader, name, contentType string) error { + pool := newBufferAllocator(int(fs.config.UploadPartSize)) + defer pool.free() + + firstBuf := pool.getBuffer() + firstReadSize, err := readFill(reader, firstBuf) + if err == io.EOF { + return fs.singlePartUpload(ctx, name, contentType, firstBuf[:firstReadSize]) + } + if err != nil { + return err + } + uploadID, err := fs.initiateMultipartUpload(ctx, name, contentType) if err != nil { return err @@ -905,9 +934,6 @@ func (fs *S3Fs) handleMultipartUpload(ctx context.Context, reader io.Reader, nam var errOnce sync.Once var partNumber int32 - pool := newBufferAllocator(int(fs.config.UploadPartSize)) - defer pool.free() - poolCtx, poolCancel := context.WithCancel(ctx) defer poolCancel() @@ -921,12 +947,40 @@ func (fs *S3Fs) handleMultipartUpload(ctx context.Context, reader io.Reader, nam } } - for partNumber = 1; !finished; partNumber++ { + uploadPart := func(partNum int32, buf []byte, bytesRead int) { + defer func() { + pool.releaseBuffer(buf) + <-guard + wg.Done() + }() + + etag, err := fs.uploadPart(poolCtx, name, uploadID, partNum, buf[:bytesRead]) + if err != nil { + errOnce.Do(func() { + finalizeFailedUpload(err) + }) + return + } + partMutex.Lock() + completedParts = append(completedParts, types.CompletedPart{ + PartNumber: &partNum, + ETag: etag, + }) + partMutex.Unlock() + } + + partNumber = 1 + guard <- struct{}{} + + wg.Add(1) + go uploadPart(partNumber, firstBuf, firstReadSize) + + for partNumber = 2; !finished; partNumber++ { buf := pool.getBuffer() n, err := readFill(reader, buf) if err == io.EOF { - if n == 0 && partNumber > 1 { + if n == 0 { pool.releaseBuffer(buf) break } @@ -946,27 +1000,7 @@ func (fs *S3Fs) handleMultipartUpload(ctx context.Context, reader io.Reader, nam } wg.Add(1) - go func(partNum int32, buf []byte, bytesRead int) { - defer func() { - pool.releaseBuffer(buf) - <-guard - wg.Done() - }() - - etag, err := fs.uploadPart(poolCtx, name, uploadID, partNum, buf[:bytesRead]) - if err != nil { - errOnce.Do(func() { - finalizeFailedUpload(err) - }) - return - } - partMutex.Lock() - completedParts = append(completedParts, types.CompletedPart{ - PartNumber: &partNum, - ETag: etag, - }) - partMutex.Unlock() - }(partNumber, buf, n) + go uploadPart(partNumber, buf, n) } wg.Wait()