mirror of
https://github.com/drakkan/sftpgo.git
synced 2025-12-06 22:30:56 +03:00
add metrics for Azure Blob storage
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/eikenb/pipeat"
|
||||
|
||||
"github.com/drakkan/sftpgo/logger"
|
||||
"github.com/drakkan/sftpgo/metrics"
|
||||
"github.com/drakkan/sftpgo/utils"
|
||||
"github.com/drakkan/sftpgo/version"
|
||||
)
|
||||
@@ -185,11 +186,13 @@ func (fs AzureBlobFs) Stat(name string) (os.FileInfo, error) {
|
||||
Prefix: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
metrics.AZListObjectsCompleted(err)
|
||||
return nil, err
|
||||
}
|
||||
marker = listBlob.NextMarker
|
||||
for _, blobPrefix := range listBlob.Segment.BlobPrefixes {
|
||||
if fs.isEqual(blobPrefix.Name, name) {
|
||||
metrics.AZListObjectsCompleted(nil)
|
||||
return NewFileInfo(name, true, 0, time.Now(), false), nil
|
||||
}
|
||||
}
|
||||
@@ -203,11 +206,13 @@ func (fs AzureBlobFs) Stat(name string) (os.FileInfo, error) {
|
||||
if blobInfo.Properties.ContentLength != nil {
|
||||
size = *blobInfo.Properties.ContentLength
|
||||
}
|
||||
metrics.AZListObjectsCompleted(nil)
|
||||
return NewFileInfo(name, isDir, size, blobInfo.Properties.LastModified, false), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics.AZListObjectsCompleted(nil)
|
||||
return nil, errors.New("404 no such file or directory")
|
||||
}
|
||||
|
||||
@@ -242,6 +247,7 @@ func (fs AzureBlobFs) Open(name string, offset int64) (*os.File, *pipeat.PipeRea
|
||||
n, err := io.Copy(w, body)
|
||||
w.CloseWithError(err) //nolint:errcheck
|
||||
fsLog(fs, logger.LevelDebug, "download completed, path: %#v size: %v, err: %v", name, n, err)
|
||||
metrics.AZTransferCompleted(n, 1, err)
|
||||
}()
|
||||
|
||||
return nil, r, cancelFn, nil
|
||||
@@ -284,6 +290,7 @@ func (fs AzureBlobFs) Create(name string, flag int) (*os.File, *PipeWriter, func
|
||||
r.CloseWithError(err) //nolint:errcheck
|
||||
p.Done(err)
|
||||
fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, r.GetReadedBytes(), err)
|
||||
metrics.AZTransferCompleted(r.GetReadedBytes(), 0, err)
|
||||
}()
|
||||
|
||||
return nil, p, cancelFn, nil
|
||||
@@ -322,6 +329,7 @@ func (fs AzureBlobFs) Rename(source, target string) error {
|
||||
|
||||
resp, err := dstBlobURL.StartCopyFromURL(ctx, srcURL, md, mac, bac)
|
||||
if err != nil {
|
||||
metrics.AZCopyObjectCompleted(err)
|
||||
return err
|
||||
}
|
||||
copyStatus := resp.CopyStatus()
|
||||
@@ -335,6 +343,7 @@ func (fs AzureBlobFs) Rename(source, target string) error {
|
||||
// of them before giving up.
|
||||
nErrors++
|
||||
if ctx.Err() != nil || nErrors == 3 {
|
||||
metrics.AZCopyObjectCompleted(err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
@@ -342,8 +351,11 @@ func (fs AzureBlobFs) Rename(source, target string) error {
|
||||
}
|
||||
}
|
||||
if copyStatus != azblob.CopyStatusSuccess {
|
||||
return fmt.Errorf("Copy failed with status: %s", copyStatus)
|
||||
err := fmt.Errorf("Copy failed with status: %s", copyStatus)
|
||||
metrics.AZCopyObjectCompleted(err)
|
||||
return err
|
||||
}
|
||||
metrics.AZCopyObjectCompleted(nil)
|
||||
return fs.Remove(source, fi.IsDir())
|
||||
}
|
||||
|
||||
@@ -363,6 +375,7 @@ func (fs AzureBlobFs) Remove(name string, isDir bool) error {
|
||||
defer cancelFn()
|
||||
|
||||
_, err := blobBlockURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
|
||||
metrics.AZDeleteObjectCompleted(err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -444,6 +457,7 @@ func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
|
||||
Prefix: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
metrics.AZListObjectsCompleted(err)
|
||||
return nil, err
|
||||
}
|
||||
marker = listBlob.NextMarker
|
||||
@@ -481,6 +495,7 @@ func (fs AzureBlobFs) ReadDir(dirname string) ([]os.FileInfo, error) {
|
||||
}
|
||||
}
|
||||
|
||||
metrics.AZListObjectsCompleted(nil)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -565,6 +580,7 @@ func (fs AzureBlobFs) ScanRootDirContents() (int, int64, error) {
|
||||
Prefix: fs.config.KeyPrefix,
|
||||
})
|
||||
if err != nil {
|
||||
metrics.AZListObjectsCompleted(err)
|
||||
return numFiles, size, err
|
||||
}
|
||||
marker = listBlob.NextMarker
|
||||
@@ -585,6 +601,7 @@ func (fs AzureBlobFs) ScanRootDirContents() (int, int64, error) {
|
||||
}
|
||||
}
|
||||
|
||||
metrics.AZListObjectsCompleted(nil)
|
||||
return numFiles, size, nil
|
||||
}
|
||||
|
||||
@@ -644,6 +661,7 @@ func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
|
||||
Prefix: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
metrics.AZListObjectsCompleted(err)
|
||||
return err
|
||||
}
|
||||
marker = listBlob.NextMarker
|
||||
@@ -667,6 +685,7 @@ func (fs AzureBlobFs) Walk(root string, walkFn filepath.WalkFunc) error {
|
||||
}
|
||||
}
|
||||
|
||||
metrics.AZListObjectsCompleted(nil)
|
||||
return walkFn(root, NewFileInfo(root, true, 0, time.Now(), false), nil)
|
||||
}
|
||||
|
||||
@@ -695,6 +714,7 @@ func (fs AzureBlobFs) GetMimeType(name string) (string, error) {
|
||||
|
||||
blobBlockURL := fs.containerURL.NewBlockBlobURL(name)
|
||||
response, err := blobBlockURL.GetProperties(ctx, azblob.BlobAccessConditions{})
|
||||
metrics.AZHeadObjectCompleted(err)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -719,6 +739,7 @@ func (fs *AzureBlobFs) checkIfBucketExists() error {
|
||||
defer cancelFn()
|
||||
|
||||
_, err := fs.containerURL.GetProperties(ctx, azblob.LeaseAccessConditions{})
|
||||
metrics.AZHeadContainerCompleted(err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -790,7 +811,6 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
|
||||
defer cancelFn()
|
||||
|
||||
_, err := blockBlobURL.StageBlock(innerCtx, blockID, bufferReader, azblob.LeaseAccessConditions{}, nil)
|
||||
pool.releaseBuffer(buf)
|
||||
if err != nil {
|
||||
errOnce.Do(func() {
|
||||
poolError = err
|
||||
@@ -798,6 +818,7 @@ func (fs *AzureBlobFs) handleMultipartUpload(ctx context.Context, reader io.Read
|
||||
poolCancel()
|
||||
})
|
||||
}
|
||||
pool.releaseBuffer(buf)
|
||||
<-guard
|
||||
}(blockID, buf, n)
|
||||
}
|
||||
@@ -840,11 +861,13 @@ type bufferAllocator struct {
|
||||
sync.Mutex
|
||||
available [][]byte
|
||||
bufferSize int
|
||||
finalized bool
|
||||
}
|
||||
|
||||
func newBufferAllocator(size int) *bufferAllocator {
|
||||
return &bufferAllocator{
|
||||
bufferSize: size,
|
||||
finalized: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -871,7 +894,7 @@ func (b *bufferAllocator) releaseBuffer(buf []byte) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
if len(buf) != b.bufferSize {
|
||||
if b.finalized || len(buf) != b.bufferSize {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -883,4 +906,5 @@ func (b *bufferAllocator) free() {
|
||||
defer b.Unlock()
|
||||
|
||||
b.available = nil
|
||||
b.finalized = true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user