add sftpfs storage backend

Fixes #224
This commit is contained in:
Nicola Murino
2020-12-12 10:31:09 +01:00
parent 4d5494912d
commit a6985075b9
43 changed files with 3556 additions and 1767 deletions

View File

@@ -57,6 +57,7 @@ type Fs interface {
Join(elem ...string) string
HasVirtualFolders() bool
GetMimeType(name string) (string, error)
Close() error
}
// File defines an interface representing a SFTPGo file
@@ -129,6 +130,66 @@ type S3FsConfig struct {
UploadConcurrency int `json:"upload_concurrency,omitempty"`
}
func (c *S3FsConfig) checkCredentials() error {
if c.AccessKey == "" && !c.AccessSecret.IsEmpty() {
return errors.New("access_key cannot be empty with access_secret not empty")
}
if c.AccessSecret.IsEmpty() && c.AccessKey != "" {
return errors.New("access_secret cannot be empty with access_key not empty")
}
if c.AccessSecret.IsEncrypted() && !c.AccessSecret.IsValid() {
return errors.New("invalid encrypted access_secret")
}
if !c.AccessSecret.IsEmpty() && !c.AccessSecret.IsValidInput() {
return errors.New("invalid access_secret")
}
return nil
}
// EncryptCredentials encrypts access secret if it is in plain text
func (c *S3FsConfig) EncryptCredentials(additionalData string) error {
if c.AccessSecret.IsPlain() {
c.AccessSecret.SetAdditionalData(additionalData)
err := c.AccessSecret.Encrypt()
if err != nil {
return err
}
}
return nil
}
// Validate returns an error if the configuration is not valid
func (c *S3FsConfig) Validate() error {
if c.AccessSecret == nil {
c.AccessSecret = kms.NewEmptySecret()
}
if c.Bucket == "" {
return errors.New("bucket cannot be empty")
}
if c.Region == "" {
return errors.New("region cannot be empty")
}
if err := c.checkCredentials(); err != nil {
return err
}
if c.KeyPrefix != "" {
if strings.HasPrefix(c.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
c.KeyPrefix = path.Clean(c.KeyPrefix)
if !strings.HasSuffix(c.KeyPrefix, "/") {
c.KeyPrefix += "/"
}
}
if c.UploadPartSize != 0 && (c.UploadPartSize < 5 || c.UploadPartSize > 5000) {
return errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
}
if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
}
return nil
}
// GCSFsConfig defines the configuration for Google Cloud Storage based filesystem
type GCSFsConfig struct {
Bucket string `json:"bucket,omitempty"`
@@ -146,6 +207,38 @@ type GCSFsConfig struct {
StorageClass string `json:"storage_class,omitempty"`
}
// Validate returns an error if the configuration is not valid
func (c *GCSFsConfig) Validate(credentialsFilePath string) error {
if c.Credentials == nil {
c.Credentials = kms.NewEmptySecret()
}
if c.Bucket == "" {
return errors.New("bucket cannot be empty")
}
if c.KeyPrefix != "" {
if strings.HasPrefix(c.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
c.KeyPrefix = path.Clean(c.KeyPrefix)
if !strings.HasSuffix(c.KeyPrefix, "/") {
c.KeyPrefix += "/"
}
}
if c.Credentials.IsEncrypted() && !c.Credentials.IsValid() {
return errors.New("invalid encrypted credentials")
}
if !c.Credentials.IsValidInput() && c.AutomaticCredentials == 0 {
fi, err := os.Stat(credentialsFilePath)
if err != nil {
return fmt.Errorf("invalid credentials %v", err)
}
if fi.Size() == 0 {
return errors.New("credentials cannot be empty")
}
}
return nil
}
// AzBlobFsConfig defines the configuration for Azure Blob Storage based filesystem
type AzBlobFsConfig struct {
Container string `json:"container,omitempty"`
@@ -183,11 +276,93 @@ type AzBlobFsConfig struct {
AccessTier string `json:"access_tier,omitempty"`
}
// EncryptCredentials encrypts access secret if it is in plain text
func (c *AzBlobFsConfig) EncryptCredentials(additionalData string) error {
if c.AccountKey.IsPlain() {
c.AccountKey.SetAdditionalData(additionalData)
if err := c.AccountKey.Encrypt(); err != nil {
return err
}
}
return nil
}
func (c *AzBlobFsConfig) checkCredentials() error {
if c.AccountName == "" || !c.AccountKey.IsValidInput() {
return errors.New("credentials cannot be empty or invalid")
}
if c.AccountKey.IsEncrypted() && !c.AccountKey.IsValid() {
return errors.New("invalid encrypted account_key")
}
return nil
}
// Validate returns an error if the configuration is not valid
func (c *AzBlobFsConfig) Validate() error {
if c.AccountKey == nil {
c.AccountKey = kms.NewEmptySecret()
}
if c.SASURL != "" {
_, err := url.Parse(c.SASURL)
return err
}
if c.Container == "" {
return errors.New("container cannot be empty")
}
if err := c.checkCredentials(); err != nil {
return err
}
if c.KeyPrefix != "" {
if strings.HasPrefix(c.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
c.KeyPrefix = path.Clean(c.KeyPrefix)
if !strings.HasSuffix(c.KeyPrefix, "/") {
c.KeyPrefix += "/"
}
}
if c.UploadPartSize < 0 || c.UploadPartSize > 100 {
return fmt.Errorf("invalid upload part size: %v", c.UploadPartSize)
}
if c.UploadConcurrency < 0 || c.UploadConcurrency > 64 {
return fmt.Errorf("invalid upload concurrency: %v", c.UploadConcurrency)
}
if !utils.IsStringInSlice(c.AccessTier, validAzAccessTier) {
return fmt.Errorf("invalid access tier %#v, valid values: \"''%v\"", c.AccessTier, strings.Join(validAzAccessTier, ", "))
}
return nil
}
// CryptFsConfig defines the configuration to store local files as encrypted
type CryptFsConfig struct {
Passphrase *kms.Secret `json:"passphrase,omitempty"`
}
// EncryptCredentials encrypts access secret if it is in plain text
func (c *CryptFsConfig) EncryptCredentials(additionalData string) error {
if c.Passphrase.IsPlain() {
c.Passphrase.SetAdditionalData(additionalData)
if err := c.Passphrase.Encrypt(); err != nil {
return err
}
}
return nil
}
// Validate returns an error if the configuration is not valid
func (c *CryptFsConfig) Validate() error {
if c.Passphrase == nil || c.Passphrase.IsEmpty() {
return errors.New("invalid passphrase")
}
if !c.Passphrase.IsValidInput() {
return errors.New("passphrase cannot be empty or invalid")
}
if c.Passphrase.IsEncrypted() && !c.Passphrase.IsValid() {
return errors.New("invalid encrypted passphrase")
}
return nil
}
// PipeWriter defines a wrapper for pipeat.PipeWriterAt.
type PipeWriter struct {
writer *pipeat.PipeWriterAt
@@ -247,149 +422,22 @@ func IsCryptOsFs(fs Fs) bool {
return fs.Name() == cryptFsName
}
func checkS3Credentials(config *S3FsConfig) error {
if config.AccessKey == "" && !config.AccessSecret.IsEmpty() {
return errors.New("access_key cannot be empty with access_secret not empty")
}
if config.AccessSecret.IsEmpty() && config.AccessKey != "" {
return errors.New("access_secret cannot be empty with access_key not empty")
}
if config.AccessSecret.IsEncrypted() && !config.AccessSecret.IsValid() {
return errors.New("invalid encrypted access_secret")
}
if !config.AccessSecret.IsEmpty() && !config.AccessSecret.IsValidInput() {
return errors.New("invalid access_secret")
}
return nil
// IsSFTPFs returns true if fs is a SFTP filesystem
func IsSFTPFs(fs Fs) bool {
return strings.HasPrefix(fs.Name(), sftpFsName)
}
// ValidateS3FsConfig returns nil if the specified s3 config is valid, otherwise an error
func ValidateS3FsConfig(config *S3FsConfig) error {
if config.AccessSecret == nil {
config.AccessSecret = kms.NewEmptySecret()
}
if config.Bucket == "" {
return errors.New("bucket cannot be empty")
}
if config.Region == "" {
return errors.New("region cannot be empty")
}
if err := checkS3Credentials(config); err != nil {
return err
}
if config.KeyPrefix != "" {
if strings.HasPrefix(config.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
config.KeyPrefix = path.Clean(config.KeyPrefix)
if !strings.HasSuffix(config.KeyPrefix, "/") {
config.KeyPrefix += "/"
}
}
if config.UploadPartSize != 0 && (config.UploadPartSize < 5 || config.UploadPartSize > 5000) {
return errors.New("upload_part_size cannot be != 0, lower than 5 (MB) or greater than 5000 (MB)")
}
if config.UploadConcurrency < 0 || config.UploadConcurrency > 64 {
return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency)
}
return nil
}
// ValidateGCSFsConfig returns nil if the specified GCS config is valid, otherwise an error
func ValidateGCSFsConfig(config *GCSFsConfig, credentialsFilePath string) error {
if config.Credentials == nil {
config.Credentials = kms.NewEmptySecret()
}
if config.Bucket == "" {
return errors.New("bucket cannot be empty")
}
if config.KeyPrefix != "" {
if strings.HasPrefix(config.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
config.KeyPrefix = path.Clean(config.KeyPrefix)
if !strings.HasSuffix(config.KeyPrefix, "/") {
config.KeyPrefix += "/"
}
}
if config.Credentials.IsEncrypted() && !config.Credentials.IsValid() {
return errors.New("invalid encrypted credentials")
}
if !config.Credentials.IsValidInput() && config.AutomaticCredentials == 0 {
fi, err := os.Stat(credentialsFilePath)
if err != nil {
return fmt.Errorf("invalid credentials %v", err)
}
if fi.Size() == 0 {
return errors.New("credentials cannot be empty")
}
}
return nil
}
func checkAzCredentials(config *AzBlobFsConfig) error {
if config.AccountName == "" || !config.AccountKey.IsValidInput() {
return errors.New("credentials cannot be empty or invalid")
}
if config.AccountKey.IsEncrypted() && !config.AccountKey.IsValid() {
return errors.New("invalid encrypted account_key")
}
return nil
}
// ValidateAzBlobFsConfig returns nil if the specified Azure Blob config is valid, otherwise an error
func ValidateAzBlobFsConfig(config *AzBlobFsConfig) error {
if config.AccountKey == nil {
config.AccountKey = kms.NewEmptySecret()
}
if config.SASURL != "" {
_, err := url.Parse(config.SASURL)
return err
}
if config.Container == "" {
return errors.New("container cannot be empty")
}
if err := checkAzCredentials(config); err != nil {
return err
}
if config.KeyPrefix != "" {
if strings.HasPrefix(config.KeyPrefix, "/") {
return errors.New("key_prefix cannot start with /")
}
config.KeyPrefix = path.Clean(config.KeyPrefix)
if !strings.HasSuffix(config.KeyPrefix, "/") {
config.KeyPrefix += "/"
}
}
if config.UploadPartSize < 0 || config.UploadPartSize > 100 {
return fmt.Errorf("invalid upload part size: %v", config.UploadPartSize)
}
if config.UploadConcurrency < 0 || config.UploadConcurrency > 64 {
return fmt.Errorf("invalid upload concurrency: %v", config.UploadConcurrency)
}
if !utils.IsStringInSlice(config.AccessTier, validAzAccessTier) {
return fmt.Errorf("invalid access tier %#v, valid values: \"''%v\"", config.AccessTier, strings.Join(validAzAccessTier, ", "))
}
return nil
}
// ValidateCryptFsConfig returns nil if the specified CryptFs config is valid, otherwise an error
func ValidateCryptFsConfig(config *CryptFsConfig) error {
if config.Passphrase == nil || config.Passphrase.IsEmpty() {
return errors.New("invalid passphrase")
}
if !config.Passphrase.IsValidInput() {
return errors.New("passphrase cannot be empty or invalid")
}
if config.Passphrase.IsEncrypted() && !config.Passphrase.IsValid() {
return errors.New("invalid encrypted passphrase")
}
return nil
// IsLocalOrSFTPFs returns true if fs is local or SFTP
func IsLocalOrSFTPFs(fs Fs) bool {
return IsLocalOsFs(fs) || IsSFTPFs(fs)
}
// SetPathPermissions calls fs.Chown.
// It does nothing for local filesystem on windows
func SetPathPermissions(fs Fs, path string, uid int, gid int) {
if uid == -1 && gid == -1 {
return
}
if IsLocalOsFs(fs) {
if runtime.GOOS == "windows" {
return