eventmanager: add data retention reports

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-09-06 19:09:23 +02:00
parent f264b005ff
commit 3e5cf56460
13 changed files with 509 additions and 58 deletions

View File

@@ -17,6 +17,7 @@ package common
import (
"bytes"
"context"
"encoding/csv"
"errors"
"fmt"
"io"
@@ -28,11 +29,13 @@ import (
"os"
"os/exec"
"path"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/klauspost/compress/zip"
"github.com/robfig/cron/v3"
"github.com/rs/xid"
"github.com/sftpgo/sdk"
@@ -47,8 +50,8 @@ import (
)
const (
ipBlockedEventName = "IP Blocked"
emailAttachmentsMaxSize = int64(10 * 1024 * 1024)
ipBlockedEventName = "IP Blocked"
maxAttachmentsSize = int64(10 * 1024 * 1024)
)
var (
@@ -412,6 +415,12 @@ func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
}
}
type executedRetentionCheck struct {
Username string
ActionName string
Results []folderRetentionCheckResult
}
// EventParams defines the supported event parameters
type EventParams struct {
Name string
@@ -432,12 +441,25 @@ type EventParams struct {
sender string
updateStatusFromError bool
errors []string
retentionChecks []executedRetentionCheck
}
func (p *EventParams) getACopy() *EventParams {
params := *p
params.errors = make([]string, len(p.errors))
copy(params.errors, p.errors)
retentionChecks := make([]executedRetentionCheck, 0, len(p.retentionChecks))
for _, c := range p.retentionChecks {
executedCheck := executedRetentionCheck{
Username: c.Username,
ActionName: c.ActionName,
}
executedCheck.Results = make([]folderRetentionCheckResult, len(c.Results))
copy(executedCheck.Results, c.Results)
retentionChecks = append(retentionChecks, executedCheck)
}
params.retentionChecks = retentionChecks
return &params
}
@@ -498,6 +520,52 @@ func (p *EventParams) getFolders() ([]vfs.BaseVirtualFolder, error) {
return []vfs.BaseVirtualFolder{folder}, nil
}
func (p *EventParams) getCompressedDataRetentionReport() ([]byte, error) {
if len(p.retentionChecks) == 0 {
return nil, errors.New("no data retention report available")
}
var b bytes.Buffer
wr := zip.NewWriter(&b)
for _, check := range p.retentionChecks {
if size := int64(len(b.Bytes())); size > maxAttachmentsSize {
eventManagerLog(logger.LevelError, "unable to get retention report, size too large: %s", util.ByteCountIEC(size))
return nil, fmt.Errorf("unable to get retention report, size too large: %s", util.ByteCountIEC(size))
}
data, err := getCSVRetentionReport(check.Results)
if err != nil {
return nil, fmt.Errorf("unable to get CSV report: %w", err)
}
fh := &zip.FileHeader{
Name: fmt.Sprintf("%s-%s.csv", check.ActionName, check.Username),
Method: zip.Deflate,
Modified: time.Now().UTC(),
}
f, err := wr.CreateHeader(fh)
if err != nil {
return nil, fmt.Errorf("unable to create zip header for file %q: %w", fh.Name, err)
}
_, err = io.Copy(f, bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("unable to write content to zip file %q: %w", fh.Name, err)
}
}
if err := wr.Close(); err != nil {
return nil, fmt.Errorf("unable to close zip writer: %w", err)
}
return b.Bytes(), nil
}
func (p *EventParams) getRetentionReportsAsMailAttachment() (mail.File, error) {
var result mail.File
data, err := p.getCompressedDataRetentionReport()
if err != nil {
return result, err
}
result.Name = "retention-reports.zip"
result.Data = data
return result, nil
}
func (p *EventParams) getStringReplacements(addObjectData bool) []string {
replacements := []string{
"{{Name}}", p.Name,
@@ -530,6 +598,29 @@ func (p *EventParams) getStringReplacements(addObjectData bool) []string {
return replacements
}
func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error) {
var b bytes.Buffer
csvWriter := csv.NewWriter(&b)
err := csvWriter.Write([]string{"path", "retention (hours)", "deleted files", "deleted size (bytes)",
"elapsed (ms)", "info", "error"})
if err != nil {
return nil, err
}
for _, result := range results {
err = csvWriter.Write([]string{result.Path, strconv.Itoa(result.Retention), strconv.Itoa(result.DeletedFiles),
strconv.FormatInt(result.DeletedSize, 10), strconv.FormatInt(result.Elapsed.Milliseconds(), 10),
result.Info, result.Error})
if err != nil {
return nil, err
}
}
csvWriter.Flush()
err = csvWriter.Error()
return b.Bytes(), err
}
func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
if err != nil {
@@ -600,7 +691,7 @@ func getMailAttachments(user dataprovider.User, attachments []string, replacer *
return nil, fmt.Errorf("cannot attach non regular file %q", virtualPath)
}
totalSize += info.Size()
if totalSize > emailAttachmentsMaxSize {
if totalSize > maxAttachmentsSize {
return nil, fmt.Errorf("unable to send files as attachment, size too large: %s", util.ByteCountIEC(totalSize))
}
data, err := getFileContent(conn, virtualPath, int(info.Size()))
@@ -682,7 +773,7 @@ func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *s
}
func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.MIMEHeader,
conn *BaseConnection, replacer *strings.Replacer,
conn *BaseConnection, replacer *strings.Replacer, params *EventParams,
) error {
partWriter, err := m.CreatePart(h)
if err != nil {
@@ -697,6 +788,18 @@ func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.
}
return nil
}
if part.Filepath == dataprovider.RetentionReportPlaceHolder {
data, err := params.getCompressedDataRetentionReport()
if err != nil {
return err
}
_, err = partWriter.Write(data)
if err != nil {
eventManagerLog(logger.LevelError, "unable to write part %q, err: %v", part.Name, err)
return err
}
return nil
}
err = writeFileContent(conn, util.CleanPath(replacer.Replace(part.Filepath)), partWriter)
if err != nil {
eventManagerLog(logger.LevelError, "unable to write file part %q, err: %v", part.Name, err)
@@ -706,13 +809,20 @@ func writeHTTPPart(m *multipart.Writer, part dataprovider.HTTPPart, h textproto.
}
func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strings.Replacer,
cancel context.CancelFunc, user dataprovider.User,
cancel context.CancelFunc, user dataprovider.User, params *EventParams,
) (io.ReadCloser, string, error) {
var body io.ReadCloser
if c.Method == http.MethodGet {
return body, "", nil
}
if c.Body != "" {
if c.Body == dataprovider.RetentionReportPlaceHolder {
data, err := params.getCompressedDataRetentionReport()
if err != nil {
return body, "", err
}
return io.NopCloser(bytes.NewBuffer(data)), "", nil
}
return io.NopCloser(bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))), "", nil
}
if len(c.Parts) > 0 {
@@ -745,11 +855,10 @@ func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strin
if part.Body != "" {
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"`, multipartQuoteEscaper.Replace(part.Name)))
} else {
filePath := util.CleanPath(replacer.Replace(part.Filepath))
h.Set("Content-Disposition",
fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(filePath))))
contentType := mime.TypeByExtension(path.Ext(filePath))
multipartQuoteEscaper.Replace(part.Name), multipartQuoteEscaper.Replace(path.Base(part.Filepath))))
contentType := mime.TypeByExtension(path.Ext(part.Filepath))
if contentType == "" {
contentType = "application/octet-stream"
}
@@ -758,7 +867,7 @@ func getHTTPRuleActionBody(c dataprovider.EventActionHTTPConfig, replacer *strin
for _, keyVal := range part.Headers {
h.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
}
if err := writeHTTPPart(m, part, h, conn, replacer); err != nil {
if err := writeHTTPPart(m, part, h, conn, replacer, params); err != nil {
cancel()
return
}
@@ -791,13 +900,13 @@ func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventPa
defer cancel()
var user dataprovider.User
if c.HasMultipartFile() {
if c.HasMultipartFiles() {
user, err = params.getUserFromSender()
if err != nil {
return err
}
}
body, contentType, err := getHTTPRuleActionBody(c, replacer, cancel, user)
body, contentType, err := getHTTPRuleActionBody(c, replacer, cancel, user, params)
if err != nil {
return err
}
@@ -888,15 +997,28 @@ func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *Event
subject := replaceWithReplacer(c.Subject, replacer)
startTime := time.Now()
var files []mail.File
if len(c.Attachments) > 0 {
fileAttachments := make([]string, 0, len(c.Attachments))
for _, attachment := range c.Attachments {
if attachment == dataprovider.RetentionReportPlaceHolder {
f, err := params.getRetentionReportsAsMailAttachment()
if err != nil {
return err
}
files = append(files, f)
continue
}
fileAttachments = append(fileAttachments, attachment)
}
if len(fileAttachments) > 0 {
user, err := params.getUserFromSender()
if err != nil {
return err
}
files, err = getMailAttachments(user, c.Attachments, replacer)
res, err := getMailAttachments(user, fileAttachments, replacer)
if err != nil {
return err
}
files = append(files, res...)
}
err := smtp.SendEmail(c.Recipients, subject, body, smtp.EmailContentTypeTextPlain, files...)
eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
@@ -1369,7 +1491,9 @@ func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOption
return nil
}
func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention) error {
func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprovider.FolderRetention,
params *EventParams, actionName string,
) error {
if err := user.LoadAndApplyGroupSettings(); err != nil {
eventManagerLog(logger.LevelDebug, "skipping scheduled retention check for user %s, cannot apply group settings: %v",
user.Username, err)
@@ -1383,6 +1507,13 @@ func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprov
eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
return fmt.Errorf("another retention check is in progress for user %q", user.Username)
}
defer func() {
params.retentionChecks = append(params.retentionChecks, executedRetentionCheck{
Username: user.Username,
ActionName: actionName,
Results: c.results,
})
}()
if err := c.Start(); err != nil {
eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
@@ -1391,7 +1522,7 @@ func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprov
}
func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
conditions dataprovider.ConditionOptions, params *EventParams,
conditions dataprovider.ConditionOptions, params *EventParams, actionName string,
) error {
users, err := params.getUsers()
if err != nil {
@@ -1414,7 +1545,7 @@ func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRete
}
}
executed++
if err = executeDataRetentionCheckForUser(user, config.Folders); err != nil {
if err = executeDataRetentionCheckForUser(user, config.Folders, params, actionName); err != nil {
failedChecks = append(failedChecks, user.Username)
params.AddError(err)
continue
@@ -1449,7 +1580,7 @@ func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams,
case dataprovider.ActionTypeTransferQuotaReset:
err = executeTransferQuotaResetRuleAction(conditions, params)
case dataprovider.ActionTypeDataRetentionCheck:
err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params)
err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params, action.Name)
case dataprovider.ActionTypeFilesystem:
err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
default: