eventmanager placeholders: add StatusString and ErrorString

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-08-29 19:03:31 +02:00
parent 37d98ca290
commit 56bf51277c
9 changed files with 285 additions and 150 deletions

View File

@@ -406,21 +406,50 @@ func (r *eventRulesContainer) handleCertificateEvent(params EventParams) {
// EventParams defines the supported event parameters
type EventParams struct {
Name string
Event string
Status int
VirtualPath string
FsPath string
VirtualTargetPath string
FsTargetPath string
ObjectName string
ObjectType string
FileSize int64
Protocol string
IP string
Timestamp int64
Object plugin.Renderer
sender string
Name string
Event string
Status int
VirtualPath string
FsPath string
VirtualTargetPath string
FsTargetPath string
ObjectName string
ObjectType string
FileSize int64
Protocol string
IP string
Timestamp int64
Object plugin.Renderer
sender string
updateStatusFromError bool
errors []string
}
func (p *EventParams) getACopy() *EventParams {
params := *p
params.errors = make([]string, len(p.errors))
copy(params.errors, p.errors)
return &params
}
// AddError adds a new error to the event params and update the status if needed
func (p *EventParams) AddError(err error) {
if err == nil {
return
}
if p.updateStatusFromError && p.Status == 1 {
p.Status = 2
}
p.errors = append(p.errors, err.Error())
}
func (p *EventParams) getStatusString() string {
switch p.Status {
case 1:
return "OK"
default:
return "KO"
}
}
// getUsers returns users with group settings not applied
@@ -469,11 +498,18 @@ func (p *EventParams) getStringReplacements(addObjectData bool) []string {
"{{Protocol}}", p.Protocol,
"{{IP}}", p.IP,
"{{Timestamp}}", fmt.Sprintf("%d", p.Timestamp),
"{{StatusString}}", p.getStatusString(),
}
if len(p.errors) > 0 {
replacements = append(replacements, "{{ErrorString}}", strings.Join(p.errors, ", "))
} else {
replacements = append(replacements, "{{ErrorString}}", "")
}
replacements = append(replacements, "{{ObjectData}}", "")
if addObjectData {
data, err := p.Object.RenderAsJSON(p.Event != operationDelete)
if err == nil {
replacements = append(replacements, "{{ObjectData}}", string(data))
replacements[len(replacements)-1] = string(data)
}
}
return replacements
@@ -516,7 +552,7 @@ func getMailAttachments(user dataprovider.User, attachments []string, replacer *
err = user.CheckFsRoot(connectionID)
defer user.CloseFs() //nolint:errcheck
if err != nil {
return nil, err
return nil, fmt.Errorf("error getting email attachments, unable to check root fs for user %q: %w", user.Username, err)
}
conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
totalSize := int64(0)
@@ -596,7 +632,7 @@ func getHTTPRuleActionEndpoint(c dataprovider.EventActionHTTPConfig, replacer *s
return c.Endpoint, nil
}
func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params EventParams) error {
func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params *EventParams) error {
if !c.Password.IsEmpty() {
if err := c.Password.TryDecrypt(); err != nil {
return fmt.Errorf("unable to decrypt password: %w", err)
@@ -653,7 +689,7 @@ func executeHTTPRuleAction(c dataprovider.EventActionHTTPConfig, params EventPar
return nil
}
func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params EventParams) error {
func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params *EventParams) error {
envVars := make([]string, 0, len(c.EnvVars))
addObjectData := false
if params.Object != nil {
@@ -686,7 +722,7 @@ func executeCommandRuleAction(c dataprovider.EventActionCommandConfig, params Ev
return err
}
func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params EventParams) error {
func executeEmailRuleAction(c dataprovider.EventActionEmailConfig, params *EventParams) error {
addObjectData := false
if params.Object != nil {
if strings.Contains(c.Body, "{{ObjectData}}") {
@@ -748,7 +784,7 @@ func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer,
err = user.CheckFsRoot(connectionID)
defer user.CloseFs() //nolint:errcheck
if err != nil {
return err
return fmt.Errorf("delete error, unable to check root fs for user %q: %w", user.Username, err)
}
conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
for _, item := range deletes {
@@ -775,7 +811,7 @@ func executeDeleteFsActionForUser(deletes []string, replacer *strings.Replacer,
}
func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
conditions dataprovider.ConditionOptions, params EventParams,
conditions dataprovider.ConditionOptions, params *EventParams,
) error {
users, err := params.getUsers()
if err != nil {
@@ -792,6 +828,7 @@ func executeDeleteFsRuleAction(deletes []string, replacer *strings.Replacer,
}
executed++
if err = executeDeleteFsActionForUser(deletes, replacer, user); err != nil {
params.AddError(err)
failures = append(failures, user.Username)
continue
}
@@ -815,7 +852,7 @@ func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, use
err = user.CheckFsRoot(connectionID)
defer user.CloseFs() //nolint:errcheck
if err != nil {
return err
return fmt.Errorf("mkdir error, unable to check root fs for user %q: %w", user.Username, err)
}
conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
for _, item := range dirs {
@@ -832,7 +869,7 @@ func executeMkDirsFsActionForUser(dirs []string, replacer *strings.Replacer, use
}
func executeMkdirFsRuleAction(dirs []string, replacer *strings.Replacer,
conditions dataprovider.ConditionOptions, params EventParams,
conditions dataprovider.ConditionOptions, params *EventParams,
) error {
users, err := params.getUsers()
if err != nil {
@@ -874,7 +911,7 @@ func executeRenameFsActionForUser(renames []dataprovider.KeyValue, replacer *str
err = user.CheckFsRoot(connectionID)
defer user.CloseFs() //nolint:errcheck
if err != nil {
return err
return fmt.Errorf("rename error, unable to check root fs for user %q: %w", user.Username, err)
}
conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
for _, item := range renames {
@@ -899,7 +936,7 @@ func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
err = user.CheckFsRoot(connectionID)
defer user.CloseFs() //nolint:errcheck
if err != nil {
return err
return fmt.Errorf("existence check error, unable to check root fs for user %q: %w", user.Username, err)
}
conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
for _, item := range exist {
@@ -913,7 +950,7 @@ func executeExistFsActionForUser(exist []string, replacer *strings.Replacer,
}
func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *strings.Replacer,
conditions dataprovider.ConditionOptions, params EventParams,
conditions dataprovider.ConditionOptions, params *EventParams,
) error {
users, err := params.getUsers()
if err != nil {
@@ -931,6 +968,7 @@ func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *string
executed++
if err = executeRenameFsActionForUser(renames, replacer, user); err != nil {
failures = append(failures, user.Username)
params.AddError(err)
continue
}
}
@@ -945,7 +983,7 @@ func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *string
}
func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
params EventParams,
params *EventParams,
) error {
users, err := params.getUsers()
if err != nil {
@@ -963,6 +1001,7 @@ func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, condit
executed++
if err = executeExistFsActionForUser(exist, replacer, user); err != nil {
failures = append(failures, user.Username)
params.AddError(err)
continue
}
}
@@ -977,7 +1016,7 @@ func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, condit
}
func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
params EventParams,
params *EventParams,
) error {
addObjectData := false
replacements := params.getStringReplacements(addObjectData)
@@ -1003,25 +1042,25 @@ func executeQuotaResetForUser(user dataprovider.User) error {
return err
}
if !QuotaScans.AddUserQuotaScan(user.Username) {
eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %s", user.Username)
return fmt.Errorf("another quota scan is in progress for user %s", user.Username)
eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %q", user.Username)
return fmt.Errorf("another quota scan is in progress for user %q", user.Username)
}
defer QuotaScans.RemoveUserQuotaScan(user.Username)
numFiles, size, err := user.ScanQuota()
if err != nil {
eventManagerLog(logger.LevelError, "error scanning quota for user %s: %v", user.Username, err)
return err
eventManagerLog(logger.LevelError, "error scanning quota for user %q: %v", user.Username, err)
return fmt.Errorf("error scanning quota for user %q: %w", user.Username, err)
}
err = dataprovider.UpdateUserQuota(&user, numFiles, size, true)
if err != nil {
eventManagerLog(logger.LevelError, "error updating quota for user %s: %v", user.Username, err)
return err
eventManagerLog(logger.LevelError, "error updating quota for user %q: %v", user.Username, err)
return fmt.Errorf("error updating quota for user %q: %w", user.Username, err)
}
return nil
}
func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
users, err := params.getUsers()
if err != nil {
return fmt.Errorf("unable to get users: %w", err)
@@ -1031,12 +1070,13 @@ func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions,
for _, user := range users {
// if sender is set, the conditions have already been evaluated
if params.sender == "" && !checkEventConditionPatterns(user.Username, conditions.Names) {
eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, name conditions don't match",
eventManagerLog(logger.LevelDebug, "skipping quota reset for user %q, name conditions don't match",
user.Username)
continue
}
executed++
if err = executeQuotaResetForUser(user); err != nil {
params.AddError(err)
failedResets = append(failedResets, user.Username)
continue
}
@@ -1051,7 +1091,7 @@ func executeUsersQuotaResetRuleAction(conditions dataprovider.ConditionOptions,
return nil
}
func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
folders, err := params.getFolders()
if err != nil {
return fmt.Errorf("unable to get folders: %w", err)
@@ -1066,7 +1106,8 @@ func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions
continue
}
if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %s", folder.Name)
eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %q", folder.Name)
params.AddError(fmt.Errorf("another quota scan is already in progress for folder %q", folder.Name))
failedResets = append(failedResets, folder.Name)
continue
}
@@ -1078,13 +1119,15 @@ func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions
numFiles, size, err := f.ScanQuota()
QuotaScans.RemoveVFolderQuotaScan(folder.Name)
if err != nil {
eventManagerLog(logger.LevelError, "error scanning quota for folder %s: %v", folder.Name, err)
eventManagerLog(logger.LevelError, "error scanning quota for folder %q: %v", folder.Name, err)
params.AddError(fmt.Errorf("error scanning quota for folder %q: %w", folder.Name, err))
failedResets = append(failedResets, folder.Name)
continue
}
err = dataprovider.UpdateVirtualFolderQuota(&folder, numFiles, size, true)
if err != nil {
eventManagerLog(logger.LevelError, "error updating quota for folder %s: %v", folder.Name, err)
eventManagerLog(logger.LevelError, "error updating quota for folder %q: %v", folder.Name, err)
params.AddError(fmt.Errorf("error updating quota for folder %q: %w", folder.Name, err))
failedResets = append(failedResets, folder.Name)
}
}
@@ -1098,7 +1141,7 @@ func executeFoldersQuotaResetRuleAction(conditions dataprovider.ConditionOptions
return nil
}
func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params EventParams) error {
func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOptions, params *EventParams) error {
users, err := params.getUsers()
if err != nil {
return fmt.Errorf("unable to get users: %w", err)
@@ -1115,7 +1158,8 @@ func executeTransferQuotaResetRuleAction(conditions dataprovider.ConditionOption
executed++
err = dataprovider.UpdateUserTransferQuota(&user, 0, 0, true)
if err != nil {
eventManagerLog(logger.LevelError, "error updating transfer quota for user %s: %v", user.Username, err)
eventManagerLog(logger.LevelError, "error updating transfer quota for user %q: %v", user.Username, err)
params.AddError(fmt.Errorf("error updating transfer quota for user %q: %w", user.Username, err))
failedResets = append(failedResets, user.Username)
}
}
@@ -1140,18 +1184,18 @@ func executeDataRetentionCheckForUser(user dataprovider.User, folders []dataprov
}
c := RetentionChecks.Add(check, &user)
if c == nil {
eventManagerLog(logger.LevelError, "another retention check is already in progress for user %s", user.Username)
return fmt.Errorf("another retention check is in progress for user %s", user.Username)
eventManagerLog(logger.LevelError, "another retention check is already in progress for user %q", user.Username)
return fmt.Errorf("another retention check is in progress for user %q", user.Username)
}
if err := c.Start(); err != nil {
eventManagerLog(logger.LevelError, "error checking retention for user %s: %v", user.Username, err)
return err
eventManagerLog(logger.LevelError, "error checking retention for user %q: %v", user.Username, err)
return fmt.Errorf("error checking retention for user %q: %w", user.Username, err)
}
return nil
}
func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRetentionConfig,
conditions dataprovider.ConditionOptions, params EventParams,
conditions dataprovider.ConditionOptions, params *EventParams,
) error {
users, err := params.getUsers()
if err != nil {
@@ -1169,6 +1213,7 @@ func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRete
executed++
if err = executeDataRetentionCheckForUser(user, config.Folders); err != nil {
failedChecks = append(failedChecks, user.Username)
params.AddError(err)
continue
}
}
@@ -1182,29 +1227,37 @@ func executeDataRetentionCheckRuleAction(config dataprovider.EventActionDataRete
return nil
}
func executeRuleAction(action dataprovider.BaseEventAction, params EventParams, conditions dataprovider.ConditionOptions) error {
func executeRuleAction(action dataprovider.BaseEventAction, params *EventParams, conditions dataprovider.ConditionOptions) error {
var err error
switch action.Type {
case dataprovider.ActionTypeHTTP:
return executeHTTPRuleAction(action.Options.HTTPConfig, params)
err = executeHTTPRuleAction(action.Options.HTTPConfig, params)
case dataprovider.ActionTypeCommand:
return executeCommandRuleAction(action.Options.CmdConfig, params)
err = executeCommandRuleAction(action.Options.CmdConfig, params)
case dataprovider.ActionTypeEmail:
return executeEmailRuleAction(action.Options.EmailConfig, params)
err = executeEmailRuleAction(action.Options.EmailConfig, params)
case dataprovider.ActionTypeBackup:
return dataprovider.ExecuteBackup()
err = dataprovider.ExecuteBackup()
case dataprovider.ActionTypeUserQuotaReset:
return executeUsersQuotaResetRuleAction(conditions, params)
err = executeUsersQuotaResetRuleAction(conditions, params)
case dataprovider.ActionTypeFolderQuotaReset:
return executeFoldersQuotaResetRuleAction(conditions, params)
err = executeFoldersQuotaResetRuleAction(conditions, params)
case dataprovider.ActionTypeTransferQuotaReset:
return executeTransferQuotaResetRuleAction(conditions, params)
err = executeTransferQuotaResetRuleAction(conditions, params)
case dataprovider.ActionTypeDataRetentionCheck:
return executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params)
err = executeDataRetentionCheckRuleAction(action.Options.RetentionConfig, conditions, params)
case dataprovider.ActionTypeFilesystem:
return executeFsRuleAction(action.Options.FsConfig, conditions, params)
err = executeFsRuleAction(action.Options.FsConfig, conditions, params)
default:
return fmt.Errorf("unsupported action type: %d", action.Type)
err = fmt.Errorf("unsupported action type: %d", action.Type)
}
if err != nil {
err = fmt.Errorf("action %q failed: %w", action.Name, err)
}
params.AddError(err)
return err
}
func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams) error {
@@ -1212,10 +1265,11 @@ func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams)
for _, rule := range rules {
var failedActions []string
paramsCopy := params.getACopy()
for _, action := range rule.Actions {
if !action.Options.IsFailureAction && action.Options.ExecuteSync {
startTime := time.Now()
if err := executeRuleAction(action.BaseEventAction, params, rule.Conditions.Options); err != nil {
if err := executeRuleAction(action.BaseEventAction, paramsCopy, rule.Conditions.Options); err != nil {
eventManagerLog(logger.LevelError, "unable to execute sync action %q for rule %q, elapsed %s, err: %v",
action.Name, rule.Name, time.Since(startTime), err)
failedActions = append(failedActions, action.Name)
@@ -1231,7 +1285,7 @@ func executeSyncRulesActions(rules []dataprovider.EventRule, params EventParams)
}
}
// execute async actions if any, including failure actions
go executeRuleAsyncActions(rule, params, failedActions)
go executeRuleAsyncActions(rule, paramsCopy, failedActions)
}
return errRes
@@ -1242,11 +1296,11 @@ func executeAsyncRulesActions(rules []dataprovider.EventRule, params EventParams
defer eventManager.removeAsyncTask()
for _, rule := range rules {
executeRuleAsyncActions(rule, params, nil)
executeRuleAsyncActions(rule, params.getACopy(), nil)
}
}
func executeRuleAsyncActions(rule dataprovider.EventRule, params EventParams, failedActions []string) {
func executeRuleAsyncActions(rule dataprovider.EventRule, params *EventParams, failedActions []string) {
for _, action := range rule.Actions {
if !action.Options.IsFailureAction && !action.Options.ExecuteSync {
startTime := time.Now()
@@ -1361,9 +1415,9 @@ func (j *eventCronJob) Run() {
}
}(task.Name)
executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
} else {
executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{})
executeAsyncRulesActions([]dataprovider.EventRule{rule}, EventParams{Status: 1, updateStatusFromError: true})
}
eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
}