// Copyright (C) 2019 Nicola Murino // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published // by the Free Software Foundation, version 3. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package plugin import ( "fmt" "slices" "sync" "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/sftpgo/sdk/plugin/notifier" "github.com/drakkan/sftpgo/v2/internal/logger" ) // NotifierConfig defines configuration parameters for notifiers plugins type NotifierConfig struct { FsEvents []string `json:"fs_events" mapstructure:"fs_events"` ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"` ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"` LogEvents []int `json:"log_events" mapstructure:"log_events"` RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"` RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"` } func (c *NotifierConfig) hasActions() bool { if len(c.FsEvents) > 0 { return true } if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 { return true } if len(c.LogEvents) > 0 { return true } return false } type notifierPlugin struct { config Config notifier notifier.Notifier client *plugin.Client mu sync.RWMutex fsEvents []*notifier.FsEvent providerEvents []*notifier.ProviderEvent logEvents []*notifier.LogEvent } func newNotifierPlugin(config Config) (*notifierPlugin, error) { p := ¬ifierPlugin{ config: config, } if err := p.initialize(); err != nil { logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config) return nil, err } return p, nil } func (p *notifierPlugin) exited() bool { return p.client.Exited() } func (p *notifierPlugin) cleanup() { p.client.Kill() } func (p *notifierPlugin) initialize() error { killProcess(p.config.Cmd) logger.Debug(logSender, "", "create new notifier plugin %q", p.config.Cmd) if !p.config.NotifierOptions.hasActions() { return fmt.Errorf("no actions defined for the notifier plugin %q", p.config.Cmd) } secureConfig, err := p.config.getSecureConfig() if err != nil { return err } client := plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: notifier.Handshake, Plugins: notifier.PluginMap, Cmd: p.config.getCommand(), SkipHostEnv: true, AllowedProtocols: []plugin.Protocol{ plugin.ProtocolGRPC, }, AutoMTLS: p.config.AutoMTLS, SecureConfig: secureConfig, Managed: false, Logger: &logger.HCLogAdapter{ Logger: hclog.New(&hclog.LoggerOptions{ Name: fmt.Sprintf("%s.%s", logSender, notifier.PluginName), Level: pluginsLogLevel, DisableTime: true, }), }, }) rpcClient, err := client.Client() if err != nil { logger.Debug(logSender, "", "unable to get rpc client for plugin %q: %v", p.config.Cmd, err) return err } raw, err := rpcClient.Dispense(notifier.PluginName) if err != nil { logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %q: %v", notifier.PluginName, p.config.Cmd, err) return err } p.client = client p.notifier = raw.(notifier.Notifier) return nil } func (p *notifierPlugin) queueSize() int { p.mu.RLock() defer p.mu.RUnlock() return len(p.providerEvents) + len(p.fsEvents) + len(p.logEvents) } func (p *notifierPlugin) queueFsEvent(ev *notifier.FsEvent) { p.mu.Lock() defer p.mu.Unlock() p.fsEvents = append(p.fsEvents, ev) } func (p *notifierPlugin) queueProviderEvent(ev *notifier.ProviderEvent) { p.mu.Lock() defer p.mu.Unlock() p.providerEvents = append(p.providerEvents, ev) } func (p *notifierPlugin) queueLogEvent(ev *notifier.LogEvent) { p.mu.Lock() defer p.mu.Unlock() p.logEvents = append(p.logEvents, ev) } func (p *notifierPlugin) canQueueEvent(timestamp int64) bool { if p.config.NotifierOptions.RetryMaxTime == 0 { return false } if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) { logger.Warn(logSender, "", "dropping too late event for plugin %v, event timestamp: %v", p.config.Cmd, time.Unix(0, timestamp)) return false } if p.config.NotifierOptions.RetryQueueMaxSize > 0 { return p.queueSize() < p.config.NotifierOptions.RetryQueueMaxSize } return true } func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) { if !slices.Contains(p.config.NotifierOptions.FsEvents, event.Action) { return } p.sendFsEvent(event) } func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) { if !slices.Contains(p.config.NotifierOptions.ProviderEvents, event.Action) || !slices.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) { return } p.sendProviderEvent(event, object) } func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) { p.sendLogEvent(event) } func (p *notifierPlugin) sendFsEvent(ev *notifier.FsEvent) { go func(event *notifier.FsEvent) { Handler.addTask() defer Handler.removeTask() if err := p.notifier.NotifyFsEvent(event); err != nil { logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err) if p.canQueueEvent(event.Timestamp) { p.queueFsEvent(event) } } }(ev) } func (p *notifierPlugin) sendProviderEvent(ev *notifier.ProviderEvent, object Renderer) { go func(event *notifier.ProviderEvent) { Handler.addTask() defer Handler.removeTask() if object != nil { objectAsJSON, err := object.RenderAsJSON(event.Action != "delete") if err != nil { logger.Error(logSender, "", "unable to render user as json for action %q: %v", event.Action, err) } else { event.ObjectData = objectAsJSON } } if err := p.notifier.NotifyProviderEvent(event); err != nil { logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err) if p.canQueueEvent(event.Timestamp) { p.queueProviderEvent(event) } } }(ev) } func (p *notifierPlugin) sendLogEvent(ev *notifier.LogEvent) { go func(event *notifier.LogEvent) { Handler.addTask() defer Handler.removeTask() if err := p.notifier.NotifyLogEvent(event); err != nil { logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err) if p.canQueueEvent(event.Timestamp) { p.queueLogEvent(event) } } }(ev) } func (p *notifierPlugin) sendQueuedEvents() { queueSize := p.queueSize() if queueSize == 0 { return } p.mu.Lock() defer p.mu.Unlock() logger.Debug(logSender, "", "send queued events for notifier %q, events size: %v", p.config.Cmd, queueSize) for _, ev := range p.fsEvents { p.sendFsEvent(ev) } p.fsEvents = nil for _, ev := range p.providerEvents { p.sendProviderEvent(ev, nil) } p.providerEvents = nil for _, ev := range p.logEvents { p.sendLogEvent(ev) } p.logEvents = nil logger.Debug(logSender, "", "%d queued events sent for notifier %q,", queueSize, p.config.Cmd) }