mirror of
https://github.com/drakkan/sftpgo.git
synced 2025-12-06 14:20:55 +03:00
azblob: use UUIDs as block IDs
Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
@@ -38,11 +38,11 @@ import (
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
||||
"github.com/eikenb/pipeat"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/sftp"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/internal/logger"
|
||||
@@ -103,7 +103,7 @@ func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsCo
|
||||
return fs.initFromSASURL()
|
||||
}
|
||||
|
||||
credential, err := azblob.NewSharedKeyCredential(fs.config.AccountName, fs.config.AccountKey.GetPayload())
|
||||
credential, err := blob.NewSharedKeyCredential(fs.config.AccountName, fs.config.AccountKey.GetPayload())
|
||||
if err != nil {
|
||||
return fs, fmt.Errorf("invalid credentials: %v", err)
|
||||
}
|
||||
@@ -123,7 +123,7 @@ func NewAzBlobFs(connectionID, localTempDir, mountPath string, config AzBlobFsCo
|
||||
}
|
||||
|
||||
func (fs *AzureBlobFs) initFromSASURL() (Fs, error) {
|
||||
parts, err := azblob.ParseURL(fs.config.SASURL.GetPayload())
|
||||
parts, err := blob.ParseURL(fs.config.SASURL.GetPayload())
|
||||
if err != nil {
|
||||
return fs, fmt.Errorf("invalid SAS URL: %w", err)
|
||||
}
|
||||
@@ -937,7 +937,6 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
|
||||
// we only need to recycle few byte slices
|
||||
pool := newBufferAllocator(int(partSize))
|
||||
finished := false
|
||||
binaryBlockID := make([]byte, 8)
|
||||
var blocks []string
|
||||
var wg sync.WaitGroup
|
||||
var errOnce sync.Once
|
||||
@@ -964,13 +963,20 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
|
||||
return err
|
||||
}
|
||||
|
||||
fs.incrementBlockID(binaryBlockID)
|
||||
blockID := base64.StdEncoding.EncodeToString(binaryBlockID)
|
||||
// Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
|
||||
// at the same time causing CommitBlockList to get a mix of blocks from all the clients.
|
||||
generatedUUID, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
pool.releaseBuffer(buf)
|
||||
pool.free()
|
||||
return fmt.Errorf("unable to generate block ID: %w", err)
|
||||
}
|
||||
blockID := base64.StdEncoding.EncodeToString([]byte(generatedUUID.String()))
|
||||
blocks = append(blocks, blockID)
|
||||
|
||||
guard <- struct{}{}
|
||||
if hasError.Load() {
|
||||
fsLog(fs, logger.LevelError, "pool error, upload for part %v not started", part)
|
||||
fsLog(fs, logger.LevelError, "pool error, upload for part %d not started", part)
|
||||
pool.releaseBuffer(buf)
|
||||
break
|
||||
}
|
||||
@@ -1042,18 +1048,6 @@ func (*AzureBlobFs) readFill(r io.Reader, buf []byte) (n int, err error) {
|
||||
return n, err
|
||||
}
|
||||
|
||||
// copied from rclone
|
||||
func (*AzureBlobFs) incrementBlockID(blockID []byte) {
|
||||
for i, digit := range blockID {
|
||||
newDigit := digit + 1
|
||||
blockID[i] = newDigit
|
||||
if newDigit >= digit {
|
||||
// exit if no carry
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fs *AzureBlobFs) preserveModificationTime(source, target string, fi os.FileInfo) {
|
||||
if plugin.Handler.HasMetadater() {
|
||||
if !fi.IsDir() {
|
||||
|
||||
Reference in New Issue
Block a user