s3fs: migrate to AWS SDK V2

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-03-15 19:16:50 +01:00
parent 7e7f662a23
commit 6f8b71b89f
5 changed files with 440 additions and 295 deletions

View File

@@ -199,7 +199,7 @@ func (fs *AzureBlobFs) Open(name string, offset int64) (File, *pipeat.PipeReader
err := fs.handleMultipartDownload(ctx, blockBlob, offset, w)
w.CloseWithError(err) //nolint:errcheck
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, w.GetWrittenBytes(), err)
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %+v", name, w.GetWrittenBytes(), err)
metric.AZTransferCompleted(w.GetWrittenBytes(), 1, err)
}()
@@ -233,7 +233,7 @@ func (fs *AzureBlobFs) Create(name string, flag int) (File, *PipeWriter, func(),
err := fs.handleMultipartUpload(ctx, r, blockBlob, &headers)
r.CloseWithError(err) //nolint:errcheck
p.Done(err)
fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, r.GetReadedBytes(), err)
fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %+v", name, r.GetReadedBytes(), err)
metric.AZTransferCompleted(r.GetReadedBytes(), 0, err)
}()
@@ -305,7 +305,7 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
if err != nil {
fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %v",
fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %+v",
source, target, err)
}
}
@@ -334,7 +334,7 @@ func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
metric.AZDeleteObjectCompleted(err)
if plugin.Handler.HasMetadater() && err == nil && !isDir {
if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %v", name, errMetadata)
fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %+v", name, errMetadata)
}
}
return err
@@ -499,7 +499,7 @@ func (*AzureBlobFs) IsNotExist(err error) bool {
return errResp.StatusCode() == http.StatusNotFound
}
return strings.Contains(err.Error(), "404")
return false
}
// IsPermission returns a boolean indicating whether the error is known to
@@ -510,10 +510,11 @@ func (*AzureBlobFs) IsPermission(err error) bool {
}
var errResp *azblob.StorageError
if errors.As(err, &errResp) {
return errResp.StatusCode() == http.StatusForbidden
statusCode := errResp.StatusCode()
return statusCode == http.StatusForbidden || statusCode == http.StatusUnauthorized
}
return strings.Contains(err.Error(), "403")
return false
}
// IsNotSupported returns true if the error indicate an unsupported operation
@@ -655,14 +656,7 @@ func (fs *AzureBlobFs) GetRelativePath(name string) string {
// Walk walks the file tree rooted at root, calling walkFn for each file or
// directory in the tree, including root
func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
prefix := ""
if root != "" && root != "." {
prefix = strings.TrimPrefix(root, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
prefix := fs.getPrefix(root)
timeout := int32(fs.ctxTimeout / time.Second)
pager := fs.containerClient.ListBlobsFlat(&azblob.ContainerListBlobFlatSegmentOptions{
Prefix: &prefix,
@@ -819,13 +813,7 @@ func (fs *AzureBlobFs) checkIfBucketExists() error {
func (fs *AzureBlobFs) hasContents(name string) (bool, error) {
result := false
prefix := ""
if name != "" && name != "." {
prefix = strings.TrimPrefix(name, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
prefix := fs.getPrefix(name)
maxResults := int32(1)
timeout := int32(fs.ctxTimeout / time.Second)
@@ -939,7 +927,7 @@ func (fs *AzureBlobFs) handleMultipartDownload(ctx context.Context, blockBlob az
if err != nil {
errOnce.Do(func() {
poolError = err
fsLog(fs, logger.LevelError, "multipart download error: %v", poolError)
fsLog(fs, logger.LevelError, "multipart download error: %+v", poolError)
poolCancel()
})
}
@@ -1019,7 +1007,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
if err != nil {
errOnce.Do(func() {
poolError = err
fsLog(fs, logger.LevelDebug, "multipart upload error: %v", poolError)
fsLog(fs, logger.LevelDebug, "multipart upload error: %+v", poolError)
poolCancel()
})
}