add DirLister interface

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2024-02-15 20:53:56 +01:00
parent c60eb050ef
commit 1ff55bbfa7
35 changed files with 1362 additions and 669 deletions

View File

@@ -56,6 +56,10 @@ const (
azFolderKey = "hdi_isfolder"
)
var (
azureBlobDefaultPageSize = int32(5000)
)
// AzureBlobFs is a Fs implementation for Azure Blob storage.
type AzureBlobFs struct {
connectionID string
@@ -308,7 +312,7 @@ func (fs *AzureBlobFs) Rename(source, target string) (int, int64, error) {
if err != nil {
return -1, -1, err
}
return fs.renameInternal(source, target, fi)
return fs.renameInternal(source, target, fi, 0)
}
// Remove removes the named file or (empty) directory.
@@ -408,76 +412,23 @@ func (*AzureBlobFs) Truncate(_ string, _ int64) error {
// ReadDir reads the directory named by dirname and returns
// a list of directory entries.
func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
var result []os.FileInfo
func (fs *AzureBlobFs) ReadDir(dirname string) (DirLister, error) {
// dirname must be already cleaned
prefix := fs.getPrefix(dirname)
modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
if err != nil {
return result, err
}
prefixes := make(map[string]bool)
pager := fs.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Include: container.ListBlobsInclude{
//Metadata: true,
},
Prefix: &prefix,
Prefix: &prefix,
MaxResults: &azureBlobDefaultPageSize,
})
for pager.More() {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
resp, err := pager.NextPage(ctx)
if err != nil {
metric.AZListObjectsCompleted(err)
return result, err
}
for _, blobPrefix := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobPrefixes {
name := util.GetStringFromPointer(blobPrefix.Name)
// we don't support prefixes == "/" this will be sent if a key starts with "/"
if name == "" || name == "/" {
continue
}
// sometime we have duplicate prefixes, maybe an Azurite bug
name = strings.TrimPrefix(name, prefix)
if _, ok := prefixes[strings.TrimSuffix(name, "/")]; ok {
continue
}
result = append(result, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
prefixes[strings.TrimSuffix(name, "/")] = true
}
for _, blobItem := range resp.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
name := util.GetStringFromPointer(blobItem.Name)
name = strings.TrimPrefix(name, prefix)
size := int64(0)
isDir := false
modTime := time.Unix(0, 0)
if blobItem.Properties != nil {
size = util.GetIntFromPointer(blobItem.Properties.ContentLength)
modTime = util.GetTimeFromPointer(blobItem.Properties.LastModified)
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
isDir = checkDirectoryMarkers(contentType, blobItem.Metadata)
if isDir {
// check if the dir is already included, it will be sent as blob prefix if it contains at least one item
if _, ok := prefixes[name]; ok {
continue
}
prefixes[name] = true
}
}
if t, ok := modTimes[name]; ok {
modTime = util.GetTimeFromMsecSinceEpoch(t)
}
result = append(result, NewFileInfo(name, isDir, size, modTime, false))
}
}
metric.AZListObjectsCompleted(nil)
return result, nil
return &azureBlobDirLister{
paginator: pager,
timeout: fs.ctxTimeout,
prefix: prefix,
prefixes: make(map[string]bool),
}, nil
}
// IsUploadResumeSupported returns true if resuming uploads is supported.
@@ -569,7 +520,8 @@ func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, e
Include: container.ListBlobsInclude{
//Metadata: true,
},
Prefix: &prefix,
Prefix: &prefix,
MaxResults: &azureBlobDefaultPageSize,
})
for pager.More() {
@@ -615,7 +567,8 @@ func (fs *AzureBlobFs) GetDirSize(dirname string) (int, int64, error) {
Include: container.ListBlobsInclude{
Metadata: true,
},
Prefix: &prefix,
Prefix: &prefix,
MaxResults: &azureBlobDefaultPageSize,
})
for pager.More() {
@@ -684,7 +637,8 @@ func (fs *AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
Include: container.ListBlobsInclude{
Metadata: true,
},
Prefix: &prefix,
Prefix: &prefix,
MaxResults: &azureBlobDefaultPageSize,
})
for pager.More() {
@@ -863,7 +817,7 @@ func (fs *AzureBlobFs) copyFileInternal(source, target string) error {
return nil
}
func (fs *AzureBlobFs) renameInternal(source, target string, fi os.FileInfo) (int, int64, error) {
func (fs *AzureBlobFs) renameInternal(source, target string, fi os.FileInfo, recursion int) (int, int64, error) {
var numFiles int
var filesSize int64
@@ -881,24 +835,12 @@ func (fs *AzureBlobFs) renameInternal(source, target string, fi os.FileInfo) (in
return numFiles, filesSize, err
}
if renameMode == 1 {
entries, err := fs.ReadDir(source)
files, size, err := doRecursiveRename(fs, source, target, fs.renameInternal, recursion)
numFiles += files
filesSize += size
if err != nil {
return numFiles, filesSize, err
}
for _, info := range entries {
sourceEntry := fs.Join(source, info.Name())
targetEntry := fs.Join(target, info.Name())
files, size, err := fs.renameInternal(sourceEntry, targetEntry, info)
if err != nil {
if fs.IsNotExist(err) {
fsLog(fs, logger.LevelInfo, "skipping rename for %q: %v", sourceEntry, err)
continue
}
return numFiles, filesSize, err
}
numFiles += files
filesSize += size
}
}
} else {
if err := fs.copyFileInternal(source, target); err != nil {
@@ -1312,3 +1254,80 @@ func (b *bufferAllocator) free() {
b.available = nil
b.finalized = true
}
type azureBlobDirLister struct {
baseDirLister
paginator *runtime.Pager[container.ListBlobsHierarchyResponse]
timeout time.Duration
prefix string
prefixes map[string]bool
metricUpdated bool
}
func (l *azureBlobDirLister) Next(limit int) ([]os.FileInfo, error) {
if limit <= 0 {
return nil, errInvalidDirListerLimit
}
if len(l.cache) >= limit {
return l.returnFromCache(limit), nil
}
if !l.paginator.More() {
if !l.metricUpdated {
l.metricUpdated = true
metric.AZListObjectsCompleted(nil)
}
return l.returnFromCache(limit), io.EOF
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(l.timeout))
defer cancelFn()
page, err := l.paginator.NextPage(ctx)
if err != nil {
metric.AZListObjectsCompleted(err)
return l.cache, err
}
for _, blobPrefix := range page.ListBlobsHierarchySegmentResponse.Segment.BlobPrefixes {
name := util.GetStringFromPointer(blobPrefix.Name)
// we don't support prefixes == "/" this will be sent if a key starts with "/"
if name == "" || name == "/" {
continue
}
// sometime we have duplicate prefixes, maybe an Azurite bug
name = strings.TrimPrefix(name, l.prefix)
if _, ok := l.prefixes[strings.TrimSuffix(name, "/")]; ok {
continue
}
l.cache = append(l.cache, NewFileInfo(name, true, 0, time.Unix(0, 0), false))
l.prefixes[strings.TrimSuffix(name, "/")] = true
}
for _, blobItem := range page.ListBlobsHierarchySegmentResponse.Segment.BlobItems {
name := util.GetStringFromPointer(blobItem.Name)
name = strings.TrimPrefix(name, l.prefix)
size := int64(0)
isDir := false
modTime := time.Unix(0, 0)
if blobItem.Properties != nil {
size = util.GetIntFromPointer(blobItem.Properties.ContentLength)
modTime = util.GetTimeFromPointer(blobItem.Properties.LastModified)
contentType := util.GetStringFromPointer(blobItem.Properties.ContentType)
isDir = checkDirectoryMarkers(contentType, blobItem.Metadata)
if isDir {
// check if the dir is already included, it will be sent as blob prefix if it contains at least one item
if _, ok := l.prefixes[name]; ok {
continue
}
l.prefixes[name] = true
}
}
l.cache = append(l.cache, NewFileInfo(name, isDir, size, modTime, false))
}
return l.returnFromCache(limit), nil
}
func (l *azureBlobDirLister) Close() error {
clear(l.prefixes)
return l.baseDirLister.Close()
}