azblob: add support for the latest SDK

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-10-01 14:04:53 +02:00
parent 0e8c41bbd1
commit a42e9ffa6b
3 changed files with 1396 additions and 336 deletions

16
go.mod
View File

@@ -5,14 +5,14 @@ go 1.19
require ( require (
cloud.google.com/go/storage v1.27.0 cloud.google.com/go/storage v1.27.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.3 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.3
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.0
github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962
github.com/alexedwards/argon2id v0.0.0-20211130144151-3585854a6387 github.com/alexedwards/argon2id v0.0.0-20211130144151-3585854a6387
github.com/aws/aws-sdk-go-v2 v1.16.16 github.com/aws/aws-sdk-go-v2 v1.16.16
github.com/aws/aws-sdk-go-v2/config v1.17.7 github.com/aws/aws-sdk-go-v2/config v1.17.8
github.com/aws/aws-sdk-go-v2/credentials v1.12.20 github.com/aws/aws-sdk-go-v2/credentials v1.12.21
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.17 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.17
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.33 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.34
github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.13.19 github.com/aws/aws-sdk-go-v2/service/marketplacemetering v1.13.19
github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11 github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.2 github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.16.2
@@ -52,7 +52,7 @@ require (
github.com/rs/xid v1.4.0 github.com/rs/xid v1.4.0
github.com/rs/zerolog v1.28.0 github.com/rs/zerolog v1.28.0
github.com/sftpgo/sdk v0.1.2-0.20220913155952-81743fa5ded5 github.com/sftpgo/sdk v0.1.2-0.20220913155952-81743fa5ded5
github.com/shirou/gopsutil/v3 v3.22.8 github.com/shirou/gopsutil/v3 v3.22.9
github.com/spf13/afero v1.9.2 github.com/spf13/afero v1.9.2
github.com/spf13/cobra v1.5.0 github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.13.0 github.com/spf13/viper v1.13.0
@@ -64,7 +64,7 @@ require (
github.com/yl2chen/cidranger v1.0.3-0.20210928021809-d1cb2c52f37a github.com/yl2chen/cidranger v1.0.3-0.20210928021809-d1cb2c52f37a
go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt v1.3.6
go.uber.org/automaxprocs v1.5.1 go.uber.org/automaxprocs v1.5.1
gocloud.dev v0.26.0 gocloud.dev v0.27.0
golang.org/x/crypto v0.0.0-20220924013350-4ba4fb4dd9e7 golang.org/x/crypto v0.0.0-20220924013350-4ba4fb4dd9e7
golang.org/x/net v0.0.0-20220923203811-8be639271d50 golang.org/x/net v0.0.0-20220923203811-8be639271d50
golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1 golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1
@@ -90,7 +90,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.17 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.23 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.11.23 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.6 // indirect
github.com/aws/smithy-go v1.13.3 // indirect github.com/aws/smithy-go v1.13.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/boombuler/barcode v1.0.1 // indirect github.com/boombuler/barcode v1.0.1 // indirect
@@ -156,7 +156,7 @@ require (
golang.org/x/tools v0.1.12 // indirect golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220929141241-1ce7b20da813 // indirect google.golang.org/genproto v0.0.0-20220930163606-c98284e70a91 // indirect
google.golang.org/grpc v1.49.0 // indirect google.golang.org/grpc v1.49.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect

1267
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -26,6 +26,7 @@ import (
"io" "io"
"mime" "mime"
"net/http" "net/http"
"net/url"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@@ -36,7 +37,11 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/eikenb/pipeat" "github.com/eikenb/pipeat"
"github.com/pkg/sftp" "github.com/pkg/sftp"
@@ -58,7 +63,7 @@ type AzureBlobFs struct {
// if not empty this fs is mouted as virtual folder in the specified path // if not empty this fs is mouted as virtual folder in the specified path
mountPath string mountPath string
config *AzBlobFsConfig config *AzBlobFsConfig
containerClient *azblob.ContainerClient containerClient *container.Client
ctxTimeout time.Duration ctxTimeout time.Duration
ctxLongTimeout time.Duration ctxLongTimeout time.Duration
} }
@@ -94,36 +99,8 @@ func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsCo
fs.setConfigDefaults() fs.setConfigDefaults()
version := version.Get()
clientOptions := &azblob.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: fmt.Sprintf("SFTPGo-%v_%v", version.Version, version.CommitHash),
},
}
if fs.config.SASURL.GetPayload() != "" { if fs.config.SASURL.GetPayload() != "" {
parts, err := azblob.NewBlobURLParts(fs.config.SASURL.GetPayload()) return fs.initFsFromSASURL()
if err != nil {
return fs, fmt.Errorf("invalid SAS URL: %w", err)
}
svc, err := azblob.NewServiceClientWithNoCredential(fs.config.SASURL.GetPayload(), clientOptions)
if err != nil {
return fs, fmt.Errorf("invalid credentials: %v", err)
}
if parts.ContainerName != "" {
if fs.config.Container != "" && fs.config.Container != parts.ContainerName {
return fs, fmt.Errorf("container name in SAS URL %#v and container provided %#v do not match",
parts.ContainerName, fs.config.Container)
}
fs.config.Container = parts.ContainerName
fs.containerClient, err = svc.NewContainerClient("")
} else {
if fs.config.Container == "" {
return fs, errors.New("container is required with this SAS URL")
}
fs.containerClient, err = svc.NewContainerClient(fs.config.Container)
}
return fs, err
} }
credential, err := azblob.NewSharedKeyCredential(fs.config.AccountName, fs.config.AccountKey.GetPayload()) credential, err := azblob.NewSharedKeyCredential(fs.config.AccountName, fs.config.AccountKey.GetPayload())
@@ -136,14 +113,48 @@ func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsCo
} else { } else {
endpoint = fmt.Sprintf("https://%s.%s/", fs.config.AccountName, fs.config.Endpoint) endpoint = fmt.Sprintf("https://%s.%s/", fs.config.AccountName, fs.config.Endpoint)
} }
svc, err := azblob.NewServiceClientWithSharedKey(endpoint, credential, clientOptions) containerURL := runtime.JoinPaths(endpoint, fs.config.Container)
svc, err := container.NewClientWithSharedKeyCredential(containerURL, credential, getAzContainerClientOptions())
if err != nil { if err != nil {
return fs, fmt.Errorf("invalid credentials: %v", err) return fs, fmt.Errorf("invalid credentials: %v", err)
} }
fs.containerClient, err = svc.NewContainerClient(fs.config.Container) fs.containerClient = svc
return fs, err return fs, err
} }
func (fs *AzureBlobFs) initFsFromSASURL() (Fs, error) {
parts, err := azblob.ParseURL(fs.config.SASURL.GetPayload())
if err != nil {
return fs, fmt.Errorf("invalid SAS URL: %w", err)
}
if parts.BlobName != "" {
return fs, fmt.Errorf("SAS URL with blob name not supported")
}
if parts.ContainerName != "" {
if fs.config.Container != "" && fs.config.Container != parts.ContainerName {
return fs, fmt.Errorf("container name in SAS URL %#v and container provided %#v do not match",
parts.ContainerName, fs.config.Container)
}
svc, err := container.NewClientWithNoCredential(fs.config.SASURL.GetPayload(), getAzContainerClientOptions())
if err != nil {
return fs, fmt.Errorf("invalid credentials: %v", err)
}
fs.config.Container = parts.ContainerName
fs.containerClient = svc
return fs, nil
}
if fs.config.Container == "" {
return fs, errors.New("container is required with this SAS URL")
}
sasURL := runtime.JoinPaths(fs.config.SASURL.GetPayload(), fs.config.Container)
svc, err := container.NewClientWithNoCredential(sasURL, getAzContainerClientOptions())
if err != nil {
return fs, fmt.Errorf("invalid credentials: %v", err)
}
fs.containerClient = svc
return fs, nil
}
// Name returns the name for the Fs implementation // Name returns the name for the Fs implementation
func (fs *AzureBlobFs) Name() string { func (fs *AzureBlobFs) Name() string {
if !fs.config.SASURL.IsEmpty() { if !fs.config.SASURL.IsEmpty() {
@@ -200,17 +211,12 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReader
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
blockBlob, err := fs.containerClient.NewBlockBlobClient(name)
if err != nil {
r.Close()
w.Close()
return nil, nil, nil, err
}
ctx, cancelFn := context.WithCancel(context.Background()) ctx, cancelFn := context.WithCancel(context.Background())
go func() { go func() {
defer cancelFn() defer cancelFn()
blockBlob := fs.containerClient.NewBlockBlobClient(name)
err := fs.handleMultipartDownload(ctx, blockBlob, offset, w) err := fs.handleMultipartDownload(ctx, blockBlob, offset, w)
w.CloseWithError(err) //nolint:errcheck w.CloseWithError(err) //nolint:errcheck
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, w.GetWrittenBytes(), err) fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, w.GetWrittenBytes(), err)
@@ -226,16 +232,10 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(),
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
blockBlob, err := fs.containerClient.NewBlockBlobClient(name)
if err != nil {
r.Close()
w.Close()
return nil, nil, nil, err
}
ctx, cancelFn := context.WithCancel(context.Background()) ctx, cancelFn := context.WithCancel(context.Background())
p := NewPipeWriter(w) p := NewPipeWriter(w)
headers := azblob.BlobHTTPHeaders{} headers := blob.HTTPHeaders{}
var contentType string var contentType string
if flag == -1 { if flag == -1 {
contentType = dirMimeType contentType = dirMimeType
@@ -249,6 +249,7 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(),
go func() { go func() {
defer cancelFn() defer cancelFn()
blockBlob := fs.containerClient.NewBlockBlobClient(name)
err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers) err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers)
r.CloseWithError(err) //nolint:errcheck r.CloseWithError(err) //nolint:errcheck
p.Done(err) p.Done(err)
@@ -281,31 +282,22 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
return fmt.Errorf("cannot rename non empty directory: %#v", source) return fmt.Errorf("cannot rename non empty directory: %#v", source)
} }
} }
dstBlob, err := fs.containerClient.NewBlockBlobClient(target)
if err != nil {
return err
}
srcBlob, err := fs.containerClient.NewBlockBlobClient(source)
if err != nil {
return err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout)) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxLongTimeout))
defer cancelFn() defer cancelFn()
srcBlob := fs.containerClient.NewBlockBlobClient(url.PathEscape(source))
dstBlob := fs.containerClient.NewBlockBlobClient(target)
resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions()) resp, err := dstBlob.StartCopyFromURL(ctx, srcBlob.URL(), fs.getCopyOptions())
if err != nil { if err != nil {
metric.AZCopyObjectCompleted(err) metric.AZCopyObjectCompleted(err)
return err return err
} }
copyStatus := azblob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus))) copyStatus := blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
nErrors := 0 nErrors := 0
for copyStatus == azblob.CopyStatusTypePending { for copyStatus == blob.CopyStatusTypePending {
// Poll until the copy is complete. // Poll until the copy is complete.
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
resp, err := dstBlob.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{ resp, err := dstBlob.GetProperties(ctx, &blob.GetPropertiesOptions{})
BlobAccessConditions: &azblob.BlobAccessConditions{},
})
if err != nil { if err != nil {
// A GetProperties failure may be transient, so allow a couple // A GetProperties failure may be transient, so allow a couple
// of them before giving up. // of them before giving up.
@@ -315,10 +307,10 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
return err return err
} }
} else { } else {
copyStatus = azblob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus))) copyStatus = blob.CopyStatusType(util.GetStringFromPointer((*string)(resp.CopyStatus)))
} }
} }
if copyStatus != azblob.CopyStatusTypeSuccess { if copyStatus != blob.CopyStatusTypeSuccess {
err := fmt.Errorf("copy failed with status: %s", copyStatus) err := fmt.Errorf("copy failed with status: %s", copyStatus)
metric.AZCopyObjectCompleted(err) metric.AZCopyObjectCompleted(err)
return err return err
@@ -340,15 +332,14 @@ func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
return fmt.Errorf("cannot remove non empty directory: %#v", name) return fmt.Errorf("cannot remove non empty directory: %#v", name)
} }
} }
blobBlock, err := fs.containerClient.NewBlockBlobClient(name)
if err != nil {
return err
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn() defer cancelFn()
_, err = blobBlock.Delete(ctx, &azblob.BlobDeleteOptions{ blobBlock := fs.containerClient.NewBlockBlobClient(name)
DeleteSnapshots: azblob.DeleteSnapshotsOptionTypeInclude.ToPtr(), deletSnapshots := blob.DeleteSnapshotsOptionTypeInclude
_, err := blobBlock.Delete(ctx, &blob.DeleteOptions{
DeleteSnapshots: &deletSnapshots,
}) })
metric.AZDeleteObjectCompleted(err) metric.AZDeleteObjectCompleted(err)
if plugin.Handler.HasMetadater() && err == nil && !isDir { if plugin.Handler.HasMetadater() && err == nil && !isDir {
@@ -431,65 +422,63 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
} }
prefixes := make(map[string]bool) prefixes := make(map[string]bool)
pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobsHierarchyOptions{ pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Include: []azblob.ListBlobsIncludeItem{}, Include: container.ListBlobsInclude{},
Prefix: &prefix, Prefix: &prefix,
}) })
hasNext := true for pager.More() {
for hasNext {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn() defer cancelFn()
if hasNext = pager.NextPage(ctx); hasNext { resp, err := pager.NextPage(ctx)
resp := pager.PageResponse() if err != nil {
metric.AZListObjectsCompleted(err)
for _, blobPrefix := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobPrefixes { return result, err
name := util.GetStringFromPointer(blobPrefix.Name) }
// we don't support prefixes == "/" this will be sent if a key starts with "/" for _, blobPrefix := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobPrefixes {
if name == "" || name == "/" { name := util.GetStringFromPointer(blobPrefix.Name)
continue // we don't support prefixes == "/" this will be sent if a key starts with "/"
} if name == "" || name == "/" {
// sometime we have duplicate prefixes, maybe an Azurite bug continue
name = strings.TrimPrefix(name, prefix)
if _, ok := prefixes[strings.TrimSuffix(name, "/")]; ok {
continue
}
result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
prefixes[strings.TrimSuffix(name, "/")] = true
} }
// sometime we have duplicate prefixes, maybe an Azurite bug
name = strings.TrimPrefix(name, prefix)
if _, ok := prefixes[strings.TrimSuffix(name, "/")]; ok {
continue
}
result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
prefixes[strings.TrimSuffix(name, "/")] = true
}
for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems { for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
name := util.GetStringFromPointer(blobItem.Name) name := util.GetStringFromPointer(blobItem.Name)
name = strings.TrimPrefix(name, prefix) name = strings.TrimPrefix(name, prefix)
size := int64(0) size := int64(0)
isDir := false isDir := false
modTime := time.Unix(0, 0) modTime := time.Unix(0, 0)
if blobItem.Properties != nil { if blobItem.Properties != nil {
size = util.GetIntFromPointer(blobItem.Properties.ContentLength) size = util.GetIntFromPointer(blobItem.Properties.ContentLength)
modTime = util.GetTimeFromPointer(blobItem.Properties.LastModified) modTime = util.GetTimeFromPointer(blobItem.Properties.LastModified)
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType) contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
isDir = (contentType == dirMimeType) isDir = (contentType == dirMimeType)
if isDir { if isDir {
// check if the dir is already included, it will be sent as blob prefix if it contains at least one item // check if the dir is already included, it will be sent as blob prefix if it contains at least one item
if _, ok := prefixes[name]; ok { if _, ok := prefixes[name]; ok {
continue continue
}
prefixes[name] = true
} }
prefixes[name] = true
} }
if t, ok := modTimes[name]; ok {
modTime = util.GetTimeFromMsecSinceEpoch(t)
}
result = append(result, NewFileInfo(name, isDir, size, modTime, false))
} }
if t, ok := modTimes[name]; ok {
modTime = util.GetTimeFromMsecSinceEpoch(t)
}
result = append(result, NewFileInfo(name, isDir, size, modTime, false))
} }
} }
metric.AZListObjectsCompleted(nil)
err = pager.Err() return result, nil
metric.AZListObjectsCompleted(err)
return result, err
} }
// IsUploadResumeSupported returns true if resuming uploads is supported. // IsUploadResumeSupported returns true if resuming uploads is supported.
@@ -511,14 +500,9 @@ func (*AzureBlobFs) IsNotExist(err error) bool {
if err == nil { if err == nil {
return false return false
} }
var errStorage *azblob.StorageError var respErr *azcore.ResponseError
if errors.As(err, &errStorage) { if errors.As(err, &respErr) {
return errStorage.StatusCode() == http.StatusNotFound return respErr.StatusCode == http.StatusNotFound
}
var errResp *azcore.ResponseError
if errors.As(err, &errResp) {
return errResp.StatusCode == http.StatusNotFound
} }
// os.ErrNotExist can be returned internally by fs.Stat // os.ErrNotExist can be returned internally by fs.Stat
return errors.Is(err, os.ErrNotExist) return errors.Is(err, os.ErrNotExist)
@@ -530,17 +514,10 @@ func (*AzureBlobFs) IsPermission(err error) bool {
if err == nil { if err == nil {
return false return false
} }
var errStorage *azblob.StorageError var respErr *azcore.ResponseError
if errors.As(err, &errStorage) { if errors.As(err, &respErr) {
statusCode := errStorage.StatusCode() return respErr.StatusCode == http.StatusForbidden || respErr.StatusCode == http.StatusUnauthorized
return statusCode == http.StatusForbidden || statusCode == http.StatusUnauthorized
} }
var errResp *azcore.ResponseError
if errors.As(err, &errResp) {
return errResp.StatusCode == http.StatusForbidden || errResp.StatusCode == http.StatusUnauthorized
}
return false return false
} }
@@ -565,36 +542,35 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
numFiles := 0 numFiles := 0
size := int64(0) size := int64(0)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{ pager := fs.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: &fs.config.KeyPrefix, Prefix: &fs.config.KeyPrefix,
}) })
hasNext := true for pager.More() {
for hasNext {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn() defer cancelFn()
if hasNext = pager.NextPage(ctx); hasNext { resp, err := pager.NextPage(ctx)
resp := pager.PageResponse() if err != nil {
for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems { metric.AZListObjectsCompleted(err)
if blobItem.Properties != nil { return numFiles, size, err
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType) }
isDir := (contentType == dirMimeType) for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
blobSize := util.GetIntFromPointer(blobItem.Properties.ContentLength) if blobItem.Properties != nil {
if isDir && blobSize == 0 { contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
continue isDir := (contentType == dirMimeType)
} blobSize := util.GetIntFromPointer(blobItem.Properties.ContentLength)
numFiles++ if isDir && blobSize == 0 {
size += blobSize continue
} }
numFiles++
size += blobSize
} }
} }
} }
metric.AZListObjectsCompleted(nil)
err := pager.Err() return numFiles, size, nil
metric.AZListObjectsCompleted(err)
return numFiles, size, err
} }
func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) { func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
@@ -604,37 +580,36 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e
prefix = strings.TrimPrefix(fsPrefix, "/") prefix = strings.TrimPrefix(fsPrefix, "/")
} }
pager := fs.containerClient.ListBlobsHierarchy("/", &azblob.ContainerListBlobsHierarchyOptions{ pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Include: []azblob.ListBlobsIncludeItem{}, Include: container.ListBlobsInclude{},
Prefix: &prefix, Prefix: &prefix,
}) })
hasNext := true for pager.More() {
for hasNext {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn() defer cancelFn()
if hasNext = pager.NextPage(ctx); hasNext { resp, err := pager.NextPage(ctx)
resp := pager.PageResponse() if err != nil {
for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems { metric.AZListObjectsCompleted(err)
name := util.GetStringFromPointer(blobItem.Name) return fileNames, err
name = strings.TrimPrefix(name, prefix) }
if blobItem.Properties != nil { for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType) name := util.GetStringFromPointer(blobItem.Name)
isDir := (contentType == dirMimeType) name = strings.TrimPrefix(name, prefix)
if isDir { if blobItem.Properties != nil {
continue contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
} isDir := (contentType == dirMimeType)
fileNames[name] = true if isDir {
continue
} }
fileNames[name] = true
} }
} }
} }
metric.AZListObjectsCompleted(nil)
err := pager.Err() return fileNames, nil
metric.AZListObjectsCompleted(err)
return fileNames, err
} }
// CheckMetadata checks the metadata consistency // CheckMetadata checks the metadata consistency
@@ -680,45 +655,40 @@ func (fs *AzureBlobFs) GetRelativePath(name string) string {
// directory in the tree, including root // directory in the tree, including root
func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error { func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
prefix := fs.getPrefix(root) prefix := fs.getPrefix(root)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{ pager := fs.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: &prefix, Prefix: &fs.config.KeyPrefix,
}) })
hasNext := true for pager.More() {
for hasNext {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn() defer cancelFn()
if hasNext = pager.NextPage(ctx); hasNext { resp, err := pager.NextPage(ctx)
resp := pager.PageResponse() if err != nil {
for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems { metric.AZListObjectsCompleted(err)
name := util.GetStringFromPointer(blobItem.Name) return err
if fs.isEqual(name, prefix) { }
continue for _, blobItem := range resp.ListBlobsFlatSegmentResponse.Segment.BlobItems {
} name := util.GetStringFromPointer(blobItem.Name)
blobSize := int64(0) if fs.isEqual(name, prefix) {
lastModified := time.Unix(0, 0) continue
isDir := false }
if blobItem.Properties != nil { blobSize := int64(0)
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType) lastModified := time.Unix(0, 0)
isDir = (contentType == dirMimeType) isDir := false
blobSize = util.GetIntFromPointer(blobItem.Properties.ContentLength) if blobItem.Properties != nil {
lastModified = util.GetTimeFromPointer(blobItem.Properties.LastModified) contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
} isDir = (contentType == dirMimeType)
err := walkFn(name, NewFileInfo(name, isDir, blobSize, lastModified, false), nil) blobSize = util.GetIntFromPointer(blobItem.Properties.ContentLength)
if err != nil { lastModified = util.GetTimeFromPointer(blobItem.Properties.LastModified)
return err }
} err := walkFn(name, NewFileInfo(name, isDir, blobSize, lastModified, false), nil)
if err != nil {
return err
} }
} }
} }
err := pager.Err()
if err != nil {
metric.AZListObjectsCompleted(err)
return err
}
metric.AZListObjectsCompleted(nil) metric.AZListObjectsCompleted(nil)
return walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil) return walkFn(root, NewFileInfo(root, true, 0, time.Unix(0, 0), false), nil)
} }
@@ -744,17 +714,12 @@ func (fs *AzureBlobFs) ResolvePath(virtualPath string) (string, error) {
return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil return fs.Join(fs.config.KeyPrefix, strings.TrimPrefix(virtualPath, "/")), nil
} }
func (fs *AzureBlobFs) headObject(name string) (azblob.BlobGetPropertiesResponse, error) { func (fs *AzureBlobFs) headObject(name string) (blob.GetPropertiesResponse, error) {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn() defer cancelFn()
blobClient, err := fs.containerClient.NewBlockBlobClient(name) resp, err := fs.containerClient.NewBlockBlobClient(name).GetProperties(ctx, &blob.GetPropertiesOptions{})
if err != nil {
return azblob.BlobGetPropertiesResponse{}, err
}
resp, err := blobClient.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{
BlobAccessConditions: &azblob.BlobAccessConditions{},
})
metric.AZHeadObjectCompleted(err) metric.AZHeadObjectCompleted(err)
return resp, err return resp, err
} }
@@ -831,41 +796,47 @@ func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
prefix := fs.getPrefix(name) prefix := fs.getPrefix(name)
maxResults := int32(1) maxResults := int32(1)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobsFlatOptions{ pager := fs.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
MaxResults: &maxResults, MaxResults: &maxResults,
Prefix: &prefix, Prefix: &prefix,
}) })
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout)) if pager.More() {
defer cancelFn() ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
resp, err := pager.NextPage(ctx)
if err != nil {
metric.AZListObjectsCompleted(err)
return result, err
}
if pager.NextPage(ctx) {
resp := pager.PageResponse()
result = len(resp.ListBlobsFlatSegmentResponse.Segment.BlobItems) > 0 result = len(resp.ListBlobsFlatSegmentResponse.Segment.BlobItems) > 0
} }
err := pager.Err() metric.AZListObjectsCompleted(nil)
metric.AZListObjectsCompleted(err) return result, nil
return result, err
} }
func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob *azblob.BlockBlobClient, buf []byte, func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob *blockblob.Client, buf []byte,
w io.WriterAt, offset, count, writeOffset int64, w io.WriterAt, offset, count, writeOffset int64,
) error { ) error {
if count == 0 { if count == 0 {
return nil return nil
} }
resp, err := blockBlob.Download(ctx, &azblob.BlobDownloadOptions{
Offset: &offset, resp, err := blockBlob.DownloadStream(ctx, &blob.DownloadStreamOptions{
Count: &count, Range: blob.HTTPRange{
Offset: offset,
Count: count,
},
}) })
if err != nil { if err != nil {
return err return err
} }
body := resp.Body(&azblob.RetryReaderOptions{MaxRetryRequests: 2}) defer resp.BlobClientDownloadResponse.Body.Close()
defer body.Close()
_, err = io.ReadAtLeast(body, buf, int(count)) _, err = io.ReadAtLeast(resp.BlobClientDownloadResponse.Body, buf, int(count))
if err != nil { if err != nil {
return err return err
} }
@@ -874,12 +845,10 @@ func (fs *AzureBlobFs) downloadPart(ctx context.Context, blockBlob *azblob.Block
return err return err
} }
func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *azblob.BlockBlobClient, func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *blockblob.Client,
offset int64, writer io.WriterAt, offset int64, writer io.WriterAt,
) error { ) error {
props, err := blockBlob.GetProperties(ctx, &azblob.BlobGetPropertiesOptions{ props, err := blockBlob.GetProperties(ctx, &blob.GetPropertiesOptions{})
BlobAccessConditions: &azblob.BlobAccessConditions{},
})
if err != nil { if err != nil {
fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err) fsLog(fs, logger.LevelError, "unable to get blob properties, download aborted: %+v", err)
return err return err
@@ -937,6 +906,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *a
defer cancelFn() defer cancelFn()
count := end - start count := end - start
err := fs.downloadPart(innerCtx, blockBlob, buf, writer, start, count, writeOffset) err := fs.downloadPart(innerCtx, blockBlob, buf, writer, start, count, writeOffset)
if err != nil { if err != nil {
errOnce.Do(func() { errOnce.Do(func() {
@@ -957,7 +927,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob *a
} }
func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader, func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Reader,
blockBlob *azblob.BlockBlobClient, httpHeaders *azblob.BlobHTTPHeaders, blockBlob *blockblob.Client, httpHeaders *blob.HTTPHeaders,
) error { ) error {
partSize := fs.config.UploadPartSize partSize := fs.config.UploadPartSize
guard := make(chan struct{}, fs.config.UploadConcurrency) guard := make(chan struct{}, fs.config.UploadConcurrency)
@@ -1019,7 +989,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout)) innerCtx, cancelFn := context.WithDeadline(poolCtx, time.Now().Add(blockCtxTimeout))
defer cancelFn() defer cancelFn()
_, err := blockBlob.StageBlock(innerCtx, blockID, bufferReader, &azblob.BlockBlobStageBlockOptions{}) _, err := blockBlob.StageBlock(innerCtx, blockID, bufferReader, &blockblob.StageBlockOptions{})
if err != nil { if err != nil {
errOnce.Do(func() { errOnce.Do(func() {
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err) fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", err)
@@ -1039,11 +1009,11 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
return poolError return poolError
} }
commitOptions := azblob.BlockBlobCommitBlockListOptions{ commitOptions := blockblob.CommitBlockListOptions{
BlobHTTPHeaders: httpHeaders, HTTPHeaders: httpHeaders,
} }
if fs.config.AccessTier != "" { if fs.config.AccessTier != "" {
commitOptions.Tier = (*azblob.AccessTier)(&fs.config.AccessTier) commitOptions.Tier = (*blob.AccessTier)(&fs.config.AccessTier)
} }
_, err := blockBlob.CommitBlockList(ctx, blocks, &commitOptions) _, err := blockBlob.CommitBlockList(ctx, blocks, &commitOptions)
@@ -1097,10 +1067,10 @@ func (fs *AzureBlobFs) preserveModificationTime(source, target string, fi os.Fil
} }
} }
func (fs *AzureBlobFs) getCopyOptions() *azblob.BlobStartCopyOptions { func (fs *AzureBlobFs) getCopyOptions() *blob.StartCopyFromURLOptions {
copyOptions := &azblob.BlobStartCopyOptions{} copyOptions := &blob.StartCopyFromURLOptions{}
if fs.config.AccessTier != "" { if fs.config.AccessTier != "" {
copyOptions.Tier = (*azblob.AccessTier)(&fs.config.AccessTier) copyOptions.Tier = (*blob.AccessTier)(&fs.config.AccessTier)
} }
return copyOptions return copyOptions
} }
@@ -1115,6 +1085,17 @@ func (fs *AzureBlobFs) getStorageID() string {
return fmt.Sprintf("azblob://%v", fs.config.Container) return fmt.Sprintf("azblob://%v", fs.config.Container)
} }
func getAzContainerClientOptions() *container.ClientOptions {
version := version.Get()
return &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: fmt.Sprintf("SFTPGo-%v_%v", version.Version, version.CommitHash),
},
},
}
}
type bytesReaderWrapper struct { type bytesReaderWrapper struct {
*bytes.Reader *bytes.Reader
} }