add support for metadata plugins

This commit is contained in:
Nicola Murino
2021-12-16 18:18:36 +01:00
parent 1472a0f415
commit a587228cf0
37 changed files with 2283 additions and 132 deletions

View File

@@ -26,6 +26,8 @@ import (
"github.com/drakkan/sftpgo/v2/logger"
"github.com/drakkan/sftpgo/v2/metric"
"github.com/drakkan/sftpgo/v2/sdk/plugin"
"github.com/drakkan/sftpgo/v2/util"
"github.com/drakkan/sftpgo/v2/version"
)
@@ -104,6 +106,7 @@ func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsCo
return fs, fmt.Errorf("container name in SAS URL %#v and container provided %#v do not match",
parts.ContainerName, fs.config.Container)
}
fs.config.Container = parts.ContainerName
fs.svc = nil
fs.containerURL = azblob.NewContainerURL(*u, pipeline)
} else {
@@ -168,17 +171,18 @@ func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) {
return nil, err
}
}
return NewFileInfo(name, true, 0, time.Now(), false), nil
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
}
if fs.config.KeyPrefix == name+"/" {
return NewFileInfo(name, true, 0, time.Now(), false), nil
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
}
attrs, err := fs.headObject(name)
if err == nil {
isDir := (attrs.ContentType() == dirMimeType)
metric.AZListObjectsCompleted(nil)
return NewFileInfo(name, isDir, attrs.ContentLength(), attrs.LastModified(), false), nil
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, isDir, attrs.ContentLength(),
attrs.LastModified(), false))
}
if !fs.IsNotExist(err) {
return nil, err
@@ -189,7 +193,7 @@ func (fs *AzureBlobFs) Stat(name string) (os.FileInfo, error) {
return nil, err
}
if hasContents {
return NewFileInfo(name, true, 0, time.Now(), false), nil
return updateFileInfoModTime(fs.getStorageID(), name, NewFileInfo(name, true, 0, time.Now(), false))
}
return nil, errors.New("404 no such file or directory")
}
@@ -335,6 +339,16 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
return err
}
metric.AZCopyObjectCompleted(nil)
if plugin.Handler.HasMetadater() {
if !fi.IsDir() {
err = plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(target),
util.GetTimeAsMsSinceEpoch(fi.ModTime()))
if err != nil {
fsLog(fs, logger.LevelWarn, "unable to preserve modification time after renaming %#v -> %#v: %v",
source, target, err)
}
}
}
return fs.Remove(source, fi.IsDir())
}
@@ -355,6 +369,11 @@ func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
_, err := blobBlockURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
metric.AZDeleteObjectCompleted(err)
if plugin.Handler.HasMetadater() && err == nil && !isDir {
if errMetadata := plugin.Handler.RemoveMetadata(fs.getStorageID(), ensureAbsPath(name)); errMetadata != nil {
fsLog(fs, logger.LevelWarn, "unable to remove metadata for path %#v: %v", name, errMetadata)
}
}
return err
}
@@ -397,8 +416,22 @@ func (*AzureBlobFs) Chmod(name string, mode os.FileMode) error {
}
// Chtimes changes the access and modification times of the named file.
func (*AzureBlobFs) Chtimes(name string, atime, mtime time.Time) error {
return ErrVfsUnsupported
func (fs *AzureBlobFs) Chtimes(name string, atime, mtime time.Time, isUploading bool) error {
if !plugin.Handler.HasMetadater() {
return ErrVfsUnsupported
}
if !isUploading {
info, err := fs.Stat(name)
if err != nil {
return err
}
if info.IsDir() {
return ErrVfsUnsupported
}
}
return plugin.Handler.SetModificationTime(fs.getStorageID(), ensureAbsPath(name),
util.GetTimeAsMsSinceEpoch(mtime))
}
// Truncate changes the size of the named file.
@@ -413,14 +446,12 @@ func (*AzureBlobFs) Truncate(name string, size int64) error {
func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
var result []os.FileInfo
// dirname must be already cleaned
prefix := ""
if dirname != "" && dirname != "." {
prefix = strings.TrimPrefix(dirname, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
prefix := fs.getPrefix(dirname)
modTimes, err := getFolderModTimes(fs.getStorageID(), dirname)
if err != nil {
return result, err
}
prefixes := make(map[string]bool)
for marker := (azblob.Marker{}); marker.NotDone(); {
@@ -473,7 +504,11 @@ func (fs *AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
prefixes[name] = true
}
}
result = append(result, NewFileInfo(name, isDir, size, blobInfo.Properties.LastModified, false))
modTime := blobInfo.Properties.LastModified
if t, ok := modTimes[name]; ok {
modTime = util.GetTimeFromMsecSinceEpoch(t)
}
result = append(result, NewFileInfo(name, isDir, size, modTime, false))
}
}
@@ -596,6 +631,54 @@ func (fs *AzureBlobFs) ScanRootDirContents() (int, int64, error) {
return numFiles, size, nil
}
func (fs *AzureBlobFs) getFileNamesInPrefix(fsPrefix string) (map[string]bool, error) {
fileNames := make(map[string]bool)
prefix := ""
if fsPrefix != "/" {
prefix = strings.TrimPrefix(fsPrefix, "/")
}
for marker := (azblob.Marker{}); marker.NotDone(); {
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(fs.ctxTimeout))
defer cancelFn()
listBlob, err := fs.containerURL.ListBlobsHierarchySegment(ctx, marker, "/", azblob.ListBlobsSegmentOptions{
Details: azblob.BlobListingDetails{
Copy: false,
Metadata: false,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
},
Prefix: prefix,
})
if err != nil {
metric.AZListObjectsCompleted(err)
return fileNames, err
}
marker = listBlob.NextMarker
for idx := range listBlob.Segment.BlobItems {
blobInfo := &listBlob.Segment.BlobItems[idx]
name := strings.TrimPrefix(blobInfo.Name, prefix)
if blobInfo.Properties.ContentType != nil {
if *blobInfo.Properties.ContentType == dirMimeType {
continue
}
}
fileNames[name] = true
}
}
metric.AZListObjectsCompleted(nil)
return fileNames, nil
}
// CheckMetadata checks the metadata consistency
func (fs *AzureBlobFs) CheckMetadata() error {
return fsMetadataCheck(fs, fs.getStorageID(), fs.config.KeyPrefix)
}
// GetDirSize returns the number of files and the size for a folder
// including any subfolders
func (*AzureBlobFs) GetDirSize(dirname string) (int, int64, error) {
@@ -733,6 +816,17 @@ func (*AzureBlobFs) GetAvailableDiskSize(dirName string) (*sftp.StatVFS, error)
return nil, ErrStorageSizeUnavailable
}
func (*AzureBlobFs) getPrefix(name string) string {
prefix := ""
if name != "" && name != "." {
prefix = strings.TrimPrefix(name, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}
return prefix
}
func (fs *AzureBlobFs) isEqual(key string, virtualName string) bool {
if key == virtualName {
return true
@@ -905,6 +999,16 @@ func (fs *AzureBlobFs) incrementBlockID(blockID []byte) {
}
}
func (fs *AzureBlobFs) getStorageID() string {
if fs.config.Endpoint != "" {
if !strings.HasSuffix(fs.config.Endpoint, "/") {
return fmt.Sprintf("azblob://%v/%v", fs.config.Endpoint, fs.config.Container)
}
return fmt.Sprintf("azblob://%v%v", fs.config.Endpoint, fs.config.Container)
}
return fmt.Sprintf("azblob://%v", fs.config.Container)
}
type bufferAllocator struct {
sync.Mutex
available [][]byte