azure blobs: add support for multipart downloads

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-02-21 19:01:31 +01:00
parent d31cccf85f
commit 8bbf54d2b6
12 changed files with 303 additions and 102 deletions

View File

@@ -33,7 +33,6 @@ import (
const (
azureDefaultEndpoint = "blob.core.windows.net"
maxResultsPerPage = 1000
)
// AzureBlobFs is a Fs implementation for Azure Blob storage.
@@ -193,37 +192,15 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReader
return nil, nil, nil, err
}
ctx, cancelFn := context.WithCancel(context.Background())
blockBlob := fs.containerClient.NewBlockBlobClient(name)
blobDownloadResponse, err := blockBlob.Download(ctx, &azblob.DownloadBlobOptions{
Offset: &offset,
})
if err != nil {
r.Close()
w.Close()
cancelFn()
return nil, nil, nil, err
}
body := blobDownloadResponse.Body(&azblob.RetryReaderOptions{
MaxRetryRequests: 2,
})
go func() {
defer cancelFn()
defer body.Close()
/*err := blockBlob.DownloadBlobToWriterAt(ctx, offset, 0, w, azblob.HighLevelDownloadFromBlobOptions{
// add download part size and concurrency
BlockSize: fs.config.UploadPartSize,
Parallelism: uint16(fs.config.UploadConcurrency),
RetryReaderOptionsPerBlock: azblob.RetryReaderOptions{
MaxRetryRequests: 2,
},
})*/
n, err := io.Copy(w, body)
err := fs.handleMultipartDownload(ctx, blockBlob, offset, w)
w.CloseWithError(err) //nolint:errcheck
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
metric.AZTransferCompleted(n, 1, err)
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, w.GetWrittenBytes(), err)
metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err)
}()
return nil, r, cancelFn, nil
@@ -253,15 +230,6 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(),
go func() {
defer cancelFn()
/*uploadOptions := azblob.UploadStreamToBlockBlobOptions{
BufferSize: int(fs.config.UploadPartSize),
MaxBuffers: fs.config.UploadConcurrency,
HTTPHeaders: &headers,
}
if fs.config.AccessTier != "" {
uploadOptions.AccessTier = (*azblob.AccessTier)(&fs.config.AccessTier)
}
_, err := blockBlob.UploadStreamToBlockBlob(ctx, r, uploadOptions)*/
err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers)
r.CloseWithError(err) //nolint:errcheck
p.Done(err)
@@ -449,11 +417,11 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
}
prefixes := make(map[string]bool)
maxResults := int32(maxResultsPerPage)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobHierarchySegmentOptions{
Include: []azblob.ListBlobsIncludeItem{},
Prefix: &prefix,
Maxresults: &maxResults,
Include: []azblob.ListBlobsIncludeItem{},
Prefix: &prefix,
Timeout: &timeout,
})
hasNext := true
@@ -574,10 +542,10 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
numFiles := 0
size := int64(0)
maxResults := int32(maxResultsPerPage)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{
Prefix: &fs.config.KeyPrefix,
Maxresults: &maxResults,
Prefix: &fs.config.KeyPrefix,
Timeout: &timeout,
})
hasNext := true
@@ -615,11 +583,11 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e
prefix = strings.TrimPrefix(fsPrefix, "/")
}
maxResults := int32(maxResultsPerPage)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobHierarchySegmentOptions{
Include: []azblob.ListBlobsIncludeItem{},
Prefix: &prefix,
Maxresults: &maxResults,
Include: []azblob.ListBlobsIncludeItem{},
Prefix: &prefix,
Timeout: &timeout,
})
hasNext := true
@@ -700,10 +668,10 @@ func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
}
}
maxResults := int32(maxResultsPerPage)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{
Prefix: &prefix,
Maxresults: &maxResults,
Prefix: &prefix,
Timeout: &timeout,
})
hasNext := true
@@ -826,11 +794,22 @@ func (fs *AzureBlobFs) setConfigDefaults() {
fs.config.Endpoint = azureDefaultEndpoint
}
if fs.config.UploadPartSize == 0 {
fs.config.UploadPartSize = 4
fs.config.UploadPartSize = 5
}
if fs.config.UploadPartSize < 1024*1024 {
fs.config.UploadPartSize *= 1024 * 1024
}
fs.config.UploadPartSize *= 1024 * 1024
if fs.config.UploadConcurrency == 0 {
fs.config.UploadConcurrency = 2
fs.config.UploadConcurrency = 5
}
if fs.config.DownloadPartSize == 0 {
fs.config.DownloadPartSize = 5
}
if fs.config.DownloadPartSize < 1024*1024 {
fs.config.DownloadPartSize *= 1024 * 1024
}
if fs.config.DownloadConcurrency == 0 {
fs.config.DownloadConcurrency = 5
}
}
@@ -854,20 +833,19 @@ func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
}
maxResults := int32(1)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{
Maxresults: &maxResults,
Prefix: &prefix,
Timeout: &timeout,
})
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
for pager.NextPage(ctx) {
if pager.NextPage(ctx) {
resp := pager.PageResponse()
result = len(resp.ContainerListBlobFlatSegmentResult.Segment.BlobItems) > 0
if result {
break
}
}
err := pager.Err()
@@ -875,6 +853,111 @@ func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
return result, err
}
func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob azblob.BlockBlobClient, buf []byte,
w io.WriterAt, offset, count, writeOffset int64,
) error {
if count == 0 {
return nil
}
resp, err := blockBlob.Download(ctx, &azblob.DownloadBlobOptions{
Offset: &offset,
Count: &count,
})
if err != nil {
return err
}
body := resp.Body(&azblob.RetryReaderOptions{MaxRetryRequests: 2})
defer body.Close()
_, err = io.ReadAtLeast(body, buf, int(count))
if err != nil {
return err
}
_, err = fs.writeAtFull(w, buf, writeOffset, int(count))
return err
}
func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob azblob.BlockBlobClient,
offset int64, writer io.WriterAt,
) error {
props, err := blockBlob.GetProperties(ctx, &azblob.GetBlobPropertiesOptions{
BlobAccessConditions: &azblob.BlobAccessConditions{},
})
if err != nil {
fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err)
return err
}
contentLength := util.GetIntFromPointer(props.ContentLength)
sizeToDownload := contentLength - offset
if sizeToDownload < 0 {
fsLog(fs, logger.LevelError, "invalid multipart download size or offset, size: %v, offset: %v, size to download: %v",
contentLength, offset, sizeToDownload)
return errors.New("the requested offset exceeds the file size")
}
if sizeToDownload == 0 {
fsLog(fs, logger.LevelDebug, "nothing to download, offset %v, content length %v", offset, contentLength)
return nil
}
partSize := fs.config.DownloadPartSize
guard := make(chan struct{}, fs.config.DownloadConcurrency)
blockCtxTimeout := time.Duration(fs.config.DownloadPartSize/(1024*1024)) * time.Minute
pool := newBufferAllocator(int(partSize))
finished := false
var wg sync.WaitGroup
var errOnce sync.Once
var poolError error
poolCtx, poolCancel := context.WithCancel(ctx)
defer poolCancel()
for part := 0; !finished; part++ {
start := offset
end := offset + partSize
if end >= contentLength {
end = contentLength
finished = true
}
writeOffset := int64(part) * partSize
offset = end
guard <- struct{}{}
if poolError != nil {
fsLog(fs, logger.LevelDebug, "pool error, download for part %v not started", part)
break
}
buf := pool.getBuffer()
wg.Add(1)
go func(start, end, writeOffset int64, buf []byte) {
defer func() {
pool.releaseBuffer(buf)
<-guard
wg.Done()
}()
innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout))
defer cancelFn()
count := end - start
err := fs.downloadPart(innerCtx, blockBlob, buf, writer, start, count, writeOffset)
if err != nil {
errOnce.Do(func() {
poolError = err
fsLog(fs, logger.LevelError, "multipart download error: %v", poolError)
poolCancel()
})
}
}(start, end, writeOffset, buf)
}
wg.Wait()
close(guard)
pool.free()
return poolError
}
func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader,
blockBlob azblob.BlockBlobClient, httpHeaders *azblob.BlobHTTPHeaders,
) error {
@@ -918,14 +1001,18 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
guard <- struct{}{}
if poolError != nil {
fsLog(fs, logger.LevelDebug, "pool error, upload for part %v not started", part)
fsLog(fs, logger.LevelError, "pool error, upload for part %v not started", part)
pool.releaseBuffer(buf)
break
}
wg.Add(1)
go func(blockID string, buf []byte, bufSize int) {
defer wg.Done()
defer func() {
pool.releaseBuffer(buf)
<-guard
wg.Done()
}()
bufferReader := &bytesReaderWrapper{
Reader: bytes.NewReader(buf[:bufSize]),
@@ -941,8 +1028,6 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
poolCancel()
})
}
pool.releaseBuffer(buf)
<-guard
}(blockID, buf, n)
}
@@ -965,8 +1050,20 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
return err
}
func (*AzureBlobFs) writeAtFull(w io.WriterAt, buf []byte, offset int64, count int) (int, error) {
written := 0
for written < count {
n, err := w.WriteAt(buf[written:count], offset+int64(written))
written += n
if err != nil {
return written, err
}
}
return written, nil
}
// copied from rclone
func (fs *AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) {
func (*AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) {
var nn int
for n < len(buf) && err == nil {
nn, err = r.Read(buf[n:])
@@ -976,7 +1073,7 @@ func (fs *AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) {
}
// copied from rclone
func (fs *AzureBlobFs) incrementBlockID(blockID []byte) {
func (*AzureBlobFs) incrementBlockID(blockID []byte) {
for i, digit := range blockID {
newDigit := digit + 1
blockID[i] = newDigit