extend virtual folders support to all storage backends

Fixes #241
This commit is contained in:
Nicola Murino
2021-03-21 19:15:47 +01:00
parent 0286da2356
commit d6dc3a507e
70 changed files with 6825 additions and 3740 deletions

View File

@@ -36,8 +36,10 @@ var maxTryTimeout = time.Hour * 24 * 365
// AzureBlobFs is a Fs implementation for Azure Blob storage.
type AzureBlobFs struct {
connectionID string
localTempDir string
connectionID string
localTempDir string
// if not empty this fs is mouted as virtual folder in the specified path
mountPath string
config *AzBlobFsConfig
svc *azblob.ServiceURL
containerURL azblob.ContainerURL
@@ -50,10 +52,14 @@ func init() {
}
// NewAzBlobFs returns an AzBlobFs object that allows to interact with Azure Blob storage
func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs, error) {
func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsConfig) (Fs, error) {
if localTempDir == "" {
localTempDir = filepath.Clean(os.TempDir())
}
fs := &AzureBlobFs{
connectionID: connectionID,
localTempDir: localTempDir,
mountPath: mountPath,
config: &config,
ctxTimeout: 30 * time.Second,
ctxLongTimeout: 300 * time.Second,
@@ -89,7 +95,7 @@ func NewAzBlobFs(connectionID, localTempDir string, config AzBlobFsConfig) (Fs,
parts := azblob.NewBlobURLParts(*u)
if parts.ContainerName != "" {
if fs.config.Container != "" && fs.config.Container != parts.ContainerName {
return fs, fmt.Errorf("Container name in SAS URL %#v and container provided %#v do not match",
return fs, fmt.Errorf("container name in SAS URL %#v and container provided %#v do not match",
parts.ContainerName, fs.config.Container)
}
fs.svc = nil
@@ -282,7 +288,7 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
return err
}
if hasContents {
return fmt.Errorf("Cannot rename non empty directory: %#v", source)
return fmt.Errorf("cannot rename non empty directory: %#v", source)
}
}
dstBlobURL := fs.containerURL.NewBlobURL(target)
@@ -318,7 +324,7 @@ func (fs *AzureBlobFs) Rename(source, target string) error {
}
}
if copyStatus != azblob.CopyStatusSuccess {
err := fmt.Errorf("Copy failed with status: %s", copyStatus)
err := fmt.Errorf("copy failed with status: %s", copyStatus)
metrics.AZCopyObjectCompleted(err)
return err
}
@@ -334,7 +340,7 @@ func (fs *AzureBlobFs) Remove(name string, isDir bool) error {
return err
}
if hasContents {
return fmt.Errorf("Cannot remove non empty directory: %#v", name)
return fmt.Errorf("cannot remove non empty directory: %#v", name)
}
}
blobBlockURL := fs.containerURL.NewBlockBlobURL(name)
@@ -359,6 +365,11 @@ func (fs *AzureBlobFs) Mkdir(name string) error {
return w.Close()
}
// MkdirAll does nothing, we don't have folder
func (*AzureBlobFs) MkdirAll(name string, uid int, gid int) error {
return nil
}
// Symlink creates source as a symbolic link to target.
func (*AzureBlobFs) Symlink(source, target string) error {
return ErrVfsUnsupported
@@ -528,7 +539,7 @@ func (*AzureBlobFs) IsNotSupported(err error) bool {
// CheckRootPath creates the specified local root directory if it does not exists
func (fs *AzureBlobFs) CheckRootPath(username string, uid int, gid int) bool {
// we need a local directory for temporary files
osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, nil)
osFs := NewOsFs(fs.ConnectionID(), fs.localTempDir, "")
return osFs.CheckRootPath(username, uid, gid)
}
@@ -607,6 +618,9 @@ func (fs *AzureBlobFs) GetRelativePath(name string) string {
}
rel = path.Clean("/" + strings.TrimPrefix(rel, "/"+fs.config.KeyPrefix))
}
if fs.mountPath != "" {
rel = path.Join(fs.mountPath, rel)
}
return rel
}
@@ -675,6 +689,9 @@ func (*AzureBlobFs) HasVirtualFolders() bool {
// ResolvePath returns the matching filesystem path for the specified sftp path
func (fs *AzureBlobFs) ResolvePath(virtualPath string) (string, error) {
if fs.mountPath != "" {
virtualPath = strings.TrimPrefix(virtualPath, fs.mountPath)
}
if !path.IsAbs(virtualPath) {
virtualPath = path.Clean("/" + virtualPath)
}