azblobfs: update to the latest sdk and fix compatibility

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-04-25 17:34:52 +02:00
parent 504cd3efda
commit 97f8142b1e
4 changed files with 146 additions and 107 deletions

View File

@@ -12,7 +12,6 @@ import (
"io"
"mime"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
@@ -45,7 +44,7 @@ type AzureBlobFs struct {
mountPath string
config *AzBlobFsConfig
hasContainerAccess bool
containerClient azblob.ContainerClient
containerClient *azblob.ContainerClient
ctxTimeout time.Duration
ctxLongTimeout time.Duration
}
@@ -89,10 +88,10 @@ func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsCo
}
if fs.config.SASURL.GetPayload() != "" {
if _, err := url.Parse(fs.config.SASURL.GetPayload()); err != nil {
parts, err := azblob.NewBlobURLParts(fs.config.SASURL.GetPayload())
if err != nil {
return fs, fmt.Errorf("invalid SAS URL: %w", err)
}
parts := azblob.NewBlobURLParts(fs.config.SASURL.GetPayload())
if parts.ContainerName != "" {
if fs.config.Container != "" && fs.config.Container != parts.ContainerName {
return fs, fmt.Errorf("container name in SAS URL %#v and container provided %#v do not match",
@@ -109,8 +108,8 @@ func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsCo
return fs, fmt.Errorf("invalid credentials: %v", err)
}
fs.hasContainerAccess = false
fs.containerClient = svc.NewContainerClient(fs.config.Container)
return fs, nil
fs.containerClient, err = svc.NewContainerClient(fs.config.Container)
return fs, err
}
credential, err := azblob.NewSharedKeyCredential(fs.config.AccountName, fs.config.AccountKey.GetPayload())
@@ -128,8 +127,8 @@ func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsCo
return fs, fmt.Errorf("invalid credentials: %v", err)
}
fs.hasContainerAccess = true
fs.containerClient = svc.NewContainerClient(fs.config.Container)
return fs, nil
fs.containerClient, err = svc.NewContainerClient(fs.config.Container)
return fs, err
}
// Name returns the name for the Fs implementation
@@ -194,8 +193,13 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReader
if err != nil {
return nil, nil, nil, err
}
blockBlob, err := fs.containerClient.NewBlockBlobClient(name)
if err != nil {
r.Close()
w.Close()
return nil, nil, nil, err
}
ctx, cancelFn := context.WithCancel(context.Background())
blockBlob := fs.containerClient.NewBlockBlobClient(name)
go func() {
defer cancelFn()
@@ -215,10 +219,15 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(),
if err != nil {
return nil, nil, nil, err
}
blockBlob, err := fs.containerClient.NewBlockBlobClient(name)
if err != nil {
r.Close()
w.Close()
return nil, nil, nil, err
}
ctx, cancelFn := context.WithCancel(context.Background())
p := NewPipeWriter(w)
blockBlob := fs.containerClient.NewBlockBlobClient(name)
headers := azblob.BlobHTTPHeaders{}
var contentType string
if flag == -1 {
@@ -265,13 +274,19 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
return fmt.Errorf("cannot rename non empty directory: %#v", source)
}
}
dstBlob := fs.containerClient.NewBlockBlobClient(target)
srcURL := fs.containerClient.NewBlockBlobClient(source).URL()
dstBlob, err := fs.containerClient.NewBlockBlobClient(target)
if err != nil {
return err
}
srcBlob, err := fs.containerClient.NewBlockBlobClient(source)
if err != nil {
return err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
defer cancelFn()
resp, err := dstBlob.StartCopyFromURL(ctx, srcURL, fs.getCopyOptions())
resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions())
if err != nil {
metric.AZCopyObjectCompleted(err)
return err
@@ -281,7 +296,7 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
for copyStatus == azblob.CopyStatusTypePending {
// Poll until the copy is complete.
time.Sleep(500 * time.Millisecond)
resp, err := dstBlob.GetProperties(ctx, &azblob.GetBlobPropertiesOptions{
resp, err := dstBlob.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
BlobAccessConditions: &azblob.BlobAccessConditions{},
})
if err != nil {
@@ -303,16 +318,7 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
}
metric.AZCopyObjectCompleted(nil)
if plugin.Handler.HasMetadater() {
if !fi.IsDir() {
err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
if err != nil {
fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
source, target, err)
}
}
}
fs.preserveModificationTime(source, target, fi)
return fs.Remove(source, fi.IsDir())
}
@@ -327,11 +333,14 @@ func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
return fmt.Errorf("cannot remove non empty directory: %#v", name)
}
}
blobBlock := fs.containerClient.NewBlockBlobClient(name)
blobBlock, err := fs.containerClient.NewBlockBlobClient(name)
if err != nil {
return err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
_, err := blobBlock.Delete(ctx, &azblob.DeleteBlobOptions{
_, err = blobBlock.Delete(ctx, &azblob.BlobDeleteOptions{
DeleteSnapshots: azblob.DeleteSnapshotsOptionTypeInclude.ToPtr(),
})
metric.AZDeleteObjectCompleted(err)
@@ -415,11 +424,9 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
}
prefixes := make(map[string]bool)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobHierarchySegmentOptions{
pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobsHierarchyOptions{
Include: []azblob.ListBlobsIncludeItem{},
Prefix: &prefix,
Timeout: &timeout,
})
hasNext := true
@@ -430,7 +437,7 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
if hasNext = pager.NextPage(ctx); hasNext {
resp := pager.PageResponse()
for _, blobPrefix := range resp.ContainerListBlobHierarchySegmentResult.Segment.BlobPrefixes {
for _, blobPrefix := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobPrefixes {
name := util.GetStringFromPointer(blobPrefix.Name)
// we don't support prefixes == "/" this will be sent if a key starts with "/"
if name == "" || name == "/" {
@@ -445,7 +452,7 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
prefixes[strings.TrimSuffix(name, "/")] = true
}
for _, blobItem := range resp.ContainerListBlobHierarchySegmentResult.Segment.BlobItems {
for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
name := util.GetStringFromPointer(blobItem.Name)
name = strings.TrimPrefix(name, prefix)
size := int64(0)
@@ -551,10 +558,8 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
numFiles := 0
size := int64(0)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{
Prefix: &fs.config.KeyPrefix,
Timeout: &timeout,
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
Prefix: &fs.config.KeyPrefix,
})
hasNext := true
@@ -564,7 +569,7 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
if hasNext = pager.NextPage(ctx); hasNext {
resp := pager.PageResponse()
for _, blobItem := range resp.ContainerListBlobFlatSegmentResult.Segment.BlobItems {
for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
if blobItem.Properties != nil {
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
isDir := (contentType == dirMimeType)
@@ -592,11 +597,9 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e
prefix = strings.TrimPrefix(fsPrefix, "/")
}
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobHierarchySegmentOptions{
pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobsHierarchyOptions{
Include: []azblob.ListBlobsIncludeItem{},
Prefix: &prefix,
Timeout: &timeout,
})
hasNext := true
@@ -606,7 +609,7 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e
if hasNext = pager.NextPage(ctx); hasNext {
resp := pager.PageResponse()
for _, blobItem := range resp.ContainerListBlobHierarchySegmentResult.Segment.BlobItems {
for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
name := util.GetStringFromPointer(blobItem.Name)
name = strings.TrimPrefix(name, prefix)
if blobItem.Properties != nil {
@@ -670,10 +673,8 @@ func (fs *AzureBlobFs) GetRelativePath(name string) string {
// directory in the tree, including root
func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
prefix := fs.getPrefix(root)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{
Prefix: &prefix,
Timeout: &timeout,
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
Prefix: &prefix,
})
hasNext := true
@@ -683,7 +684,7 @@ func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
if hasNext = pager.NextPage(ctx); hasNext {
resp := pager.PageResponse()
for _, blobItem := range resp.ContainerListBlobFlatSegmentResult.Segment.BlobItems {
for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
name := util.GetStringFromPointer(blobItem.Name)
if fs.isEqual(name, prefix) {
continue
@@ -736,12 +737,15 @@ func (fs *AzureBlobFs) ResolvePath(virtualPath string) (string, error) {
return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
}
func (fs *AzureBlobFs) headObject(name string) (azblob.GetBlobPropertiesResponse, error) {
func (fs *AzureBlobFs) headObject(name string) (azblob.BlobGetPropertiesResponse, error) {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
blobClient := fs.containerClient.NewBlockBlobClient(name)
resp, err := blobClient.GetProperties(ctx, &azblob.GetBlobPropertiesOptions{
blobClient, err := fs.containerClient.NewBlockBlobClient(name)
if err != nil {
return azblob.BlobGetPropertiesResponse{}, err
}
resp, err := blobClient.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
BlobAccessConditions: &azblob.BlobAccessConditions{},
})
metric.AZHeadObjectCompleted(err)
@@ -819,7 +823,7 @@ func (fs *AzureBlobFs) checkIfBucketExists() error {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
_, err := fs.containerClient.GetProperties(ctx, &azblob.GetPropertiesOptionsContainer{})
_, err := fs.containerClient.GetProperties(ctx, &azblob.ContainerGetPropertiesOptions{})
metric.AZHeadContainerCompleted(err)
return err
}
@@ -829,11 +833,9 @@ func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
prefix := fs.getPrefix(name)
maxResults := int32(1)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{
Maxresults: &maxResults,
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{
MaxResults: &maxResults,
Prefix: &prefix,
Timeout: &timeout,
})
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
@@ -841,7 +843,7 @@ func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
if pager.NextPage(ctx) {
resp := pager.PageResponse()
result = len(resp.ContainerListBlobFlatSegmentResult.Segment.BlobItems) > 0
result = len(resp.ListBlobsFlatSegmentResponse.Segment.BlobItems) > 0
}
err := pager.Err()
@@ -849,13 +851,13 @@ func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
return result, err
}
func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob azblob.BlockBlobClient, buf []byte,
func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob *azblob.BlockBlobClient, buf []byte,
w io.WriterAt, offset, count, writeOffset int64,
) error {
if count == 0 {
return nil
}
resp, err := blockBlob.Download(ctx, &azblob.DownloadBlobOptions{
resp, err := blockBlob.Download(ctx, &azblob.BlobDownloadOptions{
Offset: &offset,
Count: &count,
})
@@ -874,10 +876,10 @@ func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob azblob.BlockB
return err
}
func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob azblob.BlockBlobClient,
func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *azblob.BlockBlobClient,
offset int64, writer io.WriterAt,
) error {
props, err := blockBlob.GetProperties(ctx, &azblob.GetBlobPropertiesOptions{
props, err := blockBlob.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
BlobAccessConditions: &azblob.BlobAccessConditions{},
})
if err != nil {
@@ -955,7 +957,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob az
}
func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader,
blockBlob azblob.BlockBlobClient, httpHeaders *azblob.BlobHTTPHeaders,
blockBlob *azblob.BlockBlobClient, httpHeaders *azblob.BlobHTTPHeaders,
) error {
partSize := fs.config.UploadPartSize
guard := make(chan struct{}, fs.config.UploadConcurrency)
@@ -1016,7 +1018,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout))
defer cancelFn()
_, err := blockBlob.StageBlock(innerCtx, blockID, bufferReader, &azblob.StageBlockOptions{})
_, err := blockBlob.StageBlock(innerCtx, blockID, bufferReader, &azblob.BlockBlobStageBlockOptions{})
if err != nil {
errOnce.Do(func() {
poolError = err
@@ -1035,7 +1037,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
return poolError
}
commitOptions := azblob.CommitBlockListOptions{
commitOptions := azblob.BlockBlobCommitBlockListOptions{
BlobHTTPHeaders: httpHeaders,
}
if fs.config.AccessTier != "" {
@@ -1080,8 +1082,21 @@ func (*AzureBlobFs) incrementBlockID(blockID []byte) {
}
}
func (fs *AzureBlobFs) getCopyOptions() *azblob.StartCopyBlobOptions {
copyOptions := &azblob.StartCopyBlobOptions{}
func (fs *AzureBlobFs) preserveModificationTime(source, target string, fi os.FileInfo) {
if plugin.Handler.HasMetadater() {
if !fi.IsDir() {
err := plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
if err != nil {
fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
source, target, err)
}
}
}
}
func (fs *AzureBlobFs) getCopyOptions() *azblob.BlobStartCopyOptions {
copyOptions := &azblob.BlobStartCopyOptions{}
if fs.config.AccessTier != "" {
copyOptions.Tier = (*azblob.AccessTier)(&fs.config.AccessTier)
}