refactor: move eventmanager to common package

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-08-01 18:48:54 +02:00
parent 3ca62d76d7
commit 9d2b5dc07d
24 changed files with 2030 additions and 1161 deletions

View File

@@ -15,16 +15,10 @@
package dataprovider
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
@@ -34,9 +28,7 @@ import (
"github.com/drakkan/sftpgo/v2/internal/kms"
"github.com/drakkan/sftpgo/v2/internal/logger"
"github.com/drakkan/sftpgo/v2/internal/smtp"
"github.com/drakkan/sftpgo/v2/internal/util"
"github.com/drakkan/sftpgo/v2/internal/vfs"
)
// Supported event actions
@@ -204,25 +196,8 @@ func (c *EventActionHTTPConfig) validate(additionalData string) error {
return nil
}
func (c *EventActionHTTPConfig) getEndpoint(replacer *strings.Replacer) (string, error) {
if len(c.QueryParameters) > 0 {
u, err := url.Parse(c.Endpoint)
if err != nil {
return "", fmt.Errorf("invalid endpoint: %w", err)
}
q := u.Query()
for _, keyVal := range c.QueryParameters {
q.Add(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
}
u.RawQuery = q.Encode()
return u.String(), nil
}
return c.Endpoint, nil
}
func (c *EventActionHTTPConfig) getHTTPClient() *http.Client {
// GetHTTPClient returns an HTTP client based on the config
func (c *EventActionHTTPConfig) GetHTTPClient() *http.Client {
client := &http.Client{
Timeout: time.Duration(c.Timeout) * time.Second,
}
@@ -241,63 +216,6 @@ func (c *EventActionHTTPConfig) getHTTPClient() *http.Client {
return client
}
func (c *EventActionHTTPConfig) execute(params EventParams) error {
if !c.Password.IsEmpty() {
if err := c.Password.TryDecrypt(); err != nil {
return fmt.Errorf("unable to decrypt password: %w", err)
}
}
addObjectData := false
if params.Object != nil {
if !addObjectData {
if strings.Contains(c.Body, "{{ObjectData}}") {
addObjectData = true
}
}
}
replacements := params.getStringReplacements(addObjectData)
replacer := strings.NewReplacer(replacements...)
endpoint, err := c.getEndpoint(replacer)
if err != nil {
return err
}
var body io.Reader
if c.Body != "" && c.Method != http.MethodGet {
body = bytes.NewBufferString(replaceWithReplacer(c.Body, replacer))
}
req, err := http.NewRequest(c.Method, endpoint, body)
if err != nil {
return err
}
if c.Username != "" {
req.SetBasicAuth(replaceWithReplacer(c.Username, replacer), c.Password.GetAdditionalData())
}
for _, keyVal := range c.Headers {
req.Header.Set(keyVal.Key, replaceWithReplacer(keyVal.Value, replacer))
}
client := c.getHTTPClient()
defer client.CloseIdleConnections()
startTime := time.Now()
resp, err := client.Do(req)
if err != nil {
eventManagerLog(logger.LevelDebug, "unable to send http notification, endpoint: %s, elapsed: %s, err: %v",
endpoint, time.Since(startTime), err)
return err
}
defer resp.Body.Close()
eventManagerLog(logger.LevelDebug, "http notification sent, endopoint: %s, elapsed: %s, status code: %d",
endpoint, time.Since(startTime), resp.StatusCode)
if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusNoContent {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
return nil
}
// EventActionCommandConfig defines the configuration for a command event target
type EventActionCommandConfig struct {
Cmd string `json:"cmd"`
@@ -323,43 +241,6 @@ func (c *EventActionCommandConfig) validate() error {
return nil
}
func (c *EventActionCommandConfig) getEnvVars(params EventParams) []string {
envVars := make([]string, 0, len(c.EnvVars))
addObjectData := false
if params.Object != nil {
for _, k := range c.EnvVars {
if strings.Contains(k.Value, "{{ObjectData}}") {
addObjectData = true
break
}
}
}
replacements := params.getStringReplacements(addObjectData)
replacer := strings.NewReplacer(replacements...)
for _, keyVal := range c.EnvVars {
envVars = append(envVars, fmt.Sprintf("%s=%s", keyVal.Key, replaceWithReplacer(keyVal.Value, replacer)))
}
return envVars
}
func (c *EventActionCommandConfig) execute(params EventParams) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, c.Cmd)
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, c.getEnvVars(params)...)
startTime := time.Now()
err := cmd.Run()
eventManagerLog(logger.LevelDebug, "executed command %q, elapsed: %s, error: %v",
c.Cmd, time.Since(startTime), err)
return err
}
// EventActionEmailConfig defines the configuration options for SMTP event actions
type EventActionEmailConfig struct {
Recipients []string `json:"recipients"`
@@ -391,24 +272,6 @@ func (o *EventActionEmailConfig) validate() error {
return nil
}
func (o *EventActionEmailConfig) execute(params EventParams) error {
addObjectData := false
if params.Object != nil {
if strings.Contains(o.Body, "{{ObjectData}}") {
addObjectData = true
}
}
replacements := params.getStringReplacements(addObjectData)
replacer := strings.NewReplacer(replacements...)
body := replaceWithReplacer(o.Body, replacer)
subject := replaceWithReplacer(o.Subject, replacer)
startTime := time.Now()
err := smtp.SendEmail(o.Recipients, subject, body, smtp.EmailContentTypeTextPlain)
eventManagerLog(logger.LevelDebug, "executed email notification action, elapsed: %s, error: %v",
time.Since(startTime), err)
return err
}
// BaseEventActionOptions defines the supported configuration options for a base event actions
type BaseEventActionOptions struct {
HTTPConfig EventActionHTTPConfig `json:"http_config"`
@@ -560,130 +423,6 @@ func (a *BaseEventAction) validate() error {
return a.Options.validate(a.Type, a.Name)
}
func (a *BaseEventAction) doUsersQuotaReset(conditions ConditionOptions) error {
users, err := provider.dumpUsers()
if err != nil {
return fmt.Errorf("unable to get users: %w", err)
}
var failedResets []string
for _, user := range users {
if !checkConditionPatterns(user.Username, conditions.Names) {
eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for user %s, name conditions don't match",
user.Username)
continue
}
if !QuotaScans.AddUserQuotaScan(user.Username) {
eventManagerLog(logger.LevelError, "another quota scan is already in progress for user %s", user.Username)
failedResets = append(failedResets, user.Username)
continue
}
numFiles, size, err := user.ScanQuota()
QuotaScans.RemoveUserQuotaScan(user.Username)
if err != nil {
eventManagerLog(logger.LevelError, "error scanning quota for user %s: %v", user.Username, err)
failedResets = append(failedResets, user.Username)
continue
}
err = UpdateUserQuota(&user, numFiles, size, true)
if err != nil {
eventManagerLog(logger.LevelError, "error updating quota for user %s: %v", user.Username, err)
failedResets = append(failedResets, user.Username)
continue
}
}
if len(failedResets) > 0 {
return fmt.Errorf("quota reset failed for users: %+v", failedResets)
}
return nil
}
func (a *BaseEventAction) doFoldersQuotaReset(conditions ConditionOptions) error {
folders, err := provider.dumpFolders()
if err != nil {
return fmt.Errorf("unable to get folders: %w", err)
}
var failedResets []string
for _, folder := range folders {
if !checkConditionPatterns(folder.Name, conditions.Names) {
eventManagerLog(logger.LevelDebug, "skipping scheduled quota reset for folder %s, name conditions don't match",
folder.Name)
continue
}
if !QuotaScans.AddVFolderQuotaScan(folder.Name) {
eventManagerLog(logger.LevelError, "another quota scan is already in progress for folder %s", folder.Name)
failedResets = append(failedResets, folder.Name)
continue
}
f := vfs.VirtualFolder{
BaseVirtualFolder: folder,
VirtualPath: "/",
}
numFiles, size, err := f.ScanQuota()
QuotaScans.RemoveVFolderQuotaScan(folder.Name)
if err != nil {
eventManagerLog(logger.LevelError, "error scanning quota for folder %s: %v", folder.Name, err)
failedResets = append(failedResets, folder.Name)
continue
}
err = UpdateVirtualFolderQuota(&folder, numFiles, size, true)
if err != nil {
eventManagerLog(logger.LevelError, "error updating quota for folder %s: %v", folder.Name, err)
failedResets = append(failedResets, folder.Name)
continue
}
}
if len(failedResets) > 0 {
return fmt.Errorf("quota reset failed for folders: %+v", failedResets)
}
return nil
}
func (a *BaseEventAction) doTransferQuotaReset(conditions ConditionOptions) error {
users, err := provider.dumpUsers()
if err != nil {
return fmt.Errorf("unable to get users: %w", err)
}
var failedResets []string
for _, user := range users {
if !checkConditionPatterns(user.Username, conditions.Names) {
eventManagerLog(logger.LevelDebug, "skipping scheduled transfer quota reset for user %s, name conditions don't match",
user.Username)
continue
}
err = UpdateUserTransferQuota(&user, 0, 0, true)
if err != nil {
eventManagerLog(logger.LevelError, "error updating transfer quota for user %s: %v", user.Username, err)
failedResets = append(failedResets, user.Username)
continue
}
}
if len(failedResets) > 0 {
return fmt.Errorf("transfer quota reset failed for users: %+v", failedResets)
}
return nil
}
func (a *BaseEventAction) execute(params EventParams, conditions ConditionOptions) error {
switch a.Type {
case ActionTypeHTTP:
return a.Options.HTTPConfig.execute(params)
case ActionTypeCommand:
return a.Options.CmdConfig.execute(params)
case ActionTypeEmail:
return a.Options.EmailConfig.execute(params)
case ActionTypeBackup:
return config.doBackup()
case ActionTypeUserQuotaReset:
return a.doUsersQuotaReset(conditions)
case ActionTypeFolderQuotaReset:
return a.doFoldersQuotaReset(conditions)
case ActionTypeTransferQuotaReset:
return a.doTransferQuotaReset(conditions)
default:
return fmt.Errorf("unsupported action type: %d", a.Type)
}
}
// EventActionOptions defines the supported configuration options for an event action
type EventActionOptions struct {
IsFailureAction bool `json:"is_failure_action"`
@@ -731,18 +470,6 @@ type ConditionPattern struct {
InverseMatch bool `json:"inverse_match,omitempty"`
}
func (p *ConditionPattern) match(name string) bool {
matched, err := path.Match(p.Pattern, name)
if err != nil {
eventManagerLog(logger.LevelError, "pattern matching error %q, err: %v", p.Pattern, err)
return false
}
if p.InverseMatch {
return !matched
}
return matched
}
func (p *ConditionPattern) validate() error {
if p.Pattern == "" {
return util.NewValidationError("empty condition pattern not allowed")
@@ -826,12 +553,13 @@ type Schedule struct {
Month string `json:"month"`
}
func (s *Schedule) getCronSpec() string {
// GetCronSpec returns the cron compatible schedule string
func (s *Schedule) GetCronSpec() string {
return fmt.Sprintf("0 %s %s %s %s", s.Hours, s.DayOfMonth, s.Month, s.DayOfWeek)
}
func (s *Schedule) validate() error {
_, err := cron.ParseStandard(s.getCronSpec())
_, err := cron.ParseStandard(s.GetCronSpec())
if err != nil {
return util.NewValidationError(fmt.Sprintf("invalid schedule, hour: %q, day of month: %q, month: %q, day of week: %q",
s.Hours, s.DayOfMonth, s.Month, s.DayOfWeek))
@@ -871,51 +599,6 @@ func (c *EventConditions) getACopy() EventConditions {
}
}
// ProviderEventMatch returns true if the specified provider event match
func (c *EventConditions) ProviderEventMatch(params EventParams) bool {
if !util.Contains(c.ProviderEvents, params.Event) {
return false
}
if !checkConditionPatterns(params.Name, c.Options.Names) {
return false
}
if len(c.Options.ProviderObjects) > 0 && !util.Contains(c.Options.ProviderObjects, params.ObjectType) {
return false
}
return true
}
// FsEventMatch returns true if the specified filesystem event match
func (c *EventConditions) FsEventMatch(params EventParams) bool {
if !util.Contains(c.FsEvents, params.Event) {
return false
}
if !checkConditionPatterns(params.Name, c.Options.Names) {
return false
}
if !checkConditionPatterns(params.VirtualPath, c.Options.FsPaths) {
if !checkConditionPatterns(params.ObjectName, c.Options.FsPaths) {
return false
}
}
if len(c.Options.Protocols) > 0 && !util.Contains(c.Options.Protocols, params.Protocol) {
return false
}
if params.Event == "upload" || params.Event == "download" {
if c.Options.MinFileSize > 0 {
if params.FileSize < c.Options.MinFileSize {
return false
}
}
if c.Options.MaxFileSize > 0 {
if params.FileSize > c.Options.MaxFileSize {
return false
}
}
}
return true
}
func (c *EventConditions) validate(trigger int) error {
switch trigger {
case EventTriggerFsEvent:
@@ -1015,7 +698,9 @@ func (r *EventRule) getACopy() EventRule {
}
}
func (r *EventRule) guardFromConcurrentExecution() bool {
// GuardFromConcurrentExecution returns true if the rule cannot be executed concurrently
// from multiple instances
func (r *EventRule) GuardFromConcurrentExecution() bool {
if config.IsShared == 0 {
return false
}
@@ -1102,6 +787,28 @@ func (r *EventRule) RenderAsJSON(reload bool) ([]byte, error) {
return json.Marshal(r)
}
func cloneKeyValues(keyVals []KeyValue) []KeyValue {
res := make([]KeyValue, 0, len(keyVals))
for _, kv := range keyVals {
res = append(res, KeyValue{
Key: kv.Key,
Value: kv.Value,
})
}
return res
}
func cloneConditionPatterns(patterns []ConditionPattern) []ConditionPattern {
res := make([]ConditionPattern, 0, len(patterns))
for _, p := range patterns {
res = append(res, ConditionPattern{
Pattern: p.Pattern,
InverseMatch: p.InverseMatch,
})
}
return res
}
// Task stores the state for a scheduled task
type Task struct {
Name string `json:"name"`