mirror of
https://github.com/drakkan/sftpgo.git
synced 2025-12-06 22:30:56 +03:00
269 lines
7.4 KiB
Go
269 lines
7.4 KiB
Go
// Copyright (C) 2019 Nicola Murino
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published
|
|
// by the Free Software Foundation, version 3.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package plugin
|
|
|
|
import (
|
|
"fmt"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-plugin"
|
|
"github.com/sftpgo/sdk/plugin/notifier"
|
|
|
|
"github.com/drakkan/sftpgo/v2/internal/logger"
|
|
)
|
|
|
|
// NotifierConfig defines configuration parameters for notifiers plugins
|
|
type NotifierConfig struct {
|
|
FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
|
|
ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
|
|
ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
|
|
LogEvents []int `json:"log_events" mapstructure:"log_events"`
|
|
RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
|
|
RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
|
|
}
|
|
|
|
func (c *NotifierConfig) hasActions() bool {
|
|
if len(c.FsEvents) > 0 {
|
|
return true
|
|
}
|
|
if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 {
|
|
return true
|
|
}
|
|
if len(c.LogEvents) > 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
type notifierPlugin struct {
|
|
config Config
|
|
notifier notifier.Notifier
|
|
client *plugin.Client
|
|
mu sync.RWMutex
|
|
fsEvents []*notifier.FsEvent
|
|
providerEvents []*notifier.ProviderEvent
|
|
logEvents []*notifier.LogEvent
|
|
}
|
|
|
|
func newNotifierPlugin(config Config) (*notifierPlugin, error) {
|
|
p := ¬ifierPlugin{
|
|
config: config,
|
|
}
|
|
if err := p.initialize(); err != nil {
|
|
logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
|
|
return nil, err
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
func (p *notifierPlugin) exited() bool {
|
|
return p.client.Exited()
|
|
}
|
|
|
|
func (p *notifierPlugin) cleanup() {
|
|
p.client.Kill()
|
|
}
|
|
|
|
func (p *notifierPlugin) initialize() error {
|
|
killProcess(p.config.Cmd)
|
|
logger.Debug(logSender, "", "create new notifier plugin %q", p.config.Cmd)
|
|
if !p.config.NotifierOptions.hasActions() {
|
|
return fmt.Errorf("no actions defined for the notifier plugin %q", p.config.Cmd)
|
|
}
|
|
secureConfig, err := p.config.getSecureConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
client := plugin.NewClient(&plugin.ClientConfig{
|
|
HandshakeConfig: notifier.Handshake,
|
|
Plugins: notifier.PluginMap,
|
|
Cmd: p.config.getCommand(),
|
|
SkipHostEnv: true,
|
|
AllowedProtocols: []plugin.Protocol{
|
|
plugin.ProtocolGRPC,
|
|
},
|
|
AutoMTLS: p.config.AutoMTLS,
|
|
SecureConfig: secureConfig,
|
|
Managed: false,
|
|
Logger: &logger.HCLogAdapter{
|
|
Logger: hclog.New(&hclog.LoggerOptions{
|
|
Name: fmt.Sprintf("%s.%s", logSender, notifier.PluginName),
|
|
Level: pluginsLogLevel,
|
|
DisableTime: true,
|
|
}),
|
|
},
|
|
})
|
|
rpcClient, err := client.Client()
|
|
if err != nil {
|
|
logger.Debug(logSender, "", "unable to get rpc client for plugin %q: %v", p.config.Cmd, err)
|
|
return err
|
|
}
|
|
raw, err := rpcClient.Dispense(notifier.PluginName)
|
|
if err != nil {
|
|
logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %q: %v",
|
|
notifier.PluginName, p.config.Cmd, err)
|
|
return err
|
|
}
|
|
|
|
p.client = client
|
|
p.notifier = raw.(notifier.Notifier)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *notifierPlugin) queueSize() int {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
|
|
return len(p.providerEvents) + len(p.fsEvents) + len(p.logEvents)
|
|
}
|
|
|
|
func (p *notifierPlugin) queueFsEvent(ev *notifier.FsEvent) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
p.fsEvents = append(p.fsEvents, ev)
|
|
}
|
|
|
|
func (p *notifierPlugin) queueProviderEvent(ev *notifier.ProviderEvent) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
p.providerEvents = append(p.providerEvents, ev)
|
|
}
|
|
|
|
func (p *notifierPlugin) queueLogEvent(ev *notifier.LogEvent) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
p.logEvents = append(p.logEvents, ev)
|
|
}
|
|
|
|
func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
|
|
if p.config.NotifierOptions.RetryMaxTime == 0 {
|
|
return false
|
|
}
|
|
if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
|
|
logger.Warn(logSender, "", "dropping too late event for plugin %v, event timestamp: %v",
|
|
p.config.Cmd, time.Unix(0, timestamp))
|
|
return false
|
|
}
|
|
if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
|
|
return p.queueSize() < p.config.NotifierOptions.RetryQueueMaxSize
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
|
|
if !slices.Contains(p.config.NotifierOptions.FsEvents, event.Action) {
|
|
return
|
|
}
|
|
p.sendFsEvent(event)
|
|
}
|
|
|
|
func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
|
|
if !slices.Contains(p.config.NotifierOptions.ProviderEvents, event.Action) ||
|
|
!slices.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) {
|
|
return
|
|
}
|
|
p.sendProviderEvent(event, object)
|
|
}
|
|
|
|
func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) {
|
|
p.sendLogEvent(event)
|
|
}
|
|
|
|
func (p *notifierPlugin) sendFsEvent(ev *notifier.FsEvent) {
|
|
go func(event *notifier.FsEvent) {
|
|
Handler.addTask()
|
|
defer Handler.removeTask()
|
|
|
|
if err := p.notifier.NotifyFsEvent(event); err != nil {
|
|
logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
|
|
if p.canQueueEvent(event.Timestamp) {
|
|
p.queueFsEvent(event)
|
|
}
|
|
}
|
|
}(ev)
|
|
}
|
|
|
|
func (p *notifierPlugin) sendProviderEvent(ev *notifier.ProviderEvent, object Renderer) {
|
|
go func(event *notifier.ProviderEvent) {
|
|
Handler.addTask()
|
|
defer Handler.removeTask()
|
|
|
|
if object != nil {
|
|
objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
|
|
if err != nil {
|
|
logger.Error(logSender, "", "unable to render user as json for action %q: %v", event.Action, err)
|
|
} else {
|
|
event.ObjectData = objectAsJSON
|
|
}
|
|
}
|
|
|
|
if err := p.notifier.NotifyProviderEvent(event); err != nil {
|
|
logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
|
|
if p.canQueueEvent(event.Timestamp) {
|
|
p.queueProviderEvent(event)
|
|
}
|
|
}
|
|
}(ev)
|
|
}
|
|
|
|
func (p *notifierPlugin) sendLogEvent(ev *notifier.LogEvent) {
|
|
go func(event *notifier.LogEvent) {
|
|
Handler.addTask()
|
|
defer Handler.removeTask()
|
|
|
|
if err := p.notifier.NotifyLogEvent(event); err != nil {
|
|
logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err)
|
|
if p.canQueueEvent(event.Timestamp) {
|
|
p.queueLogEvent(event)
|
|
}
|
|
}
|
|
}(ev)
|
|
}
|
|
|
|
func (p *notifierPlugin) sendQueuedEvents() {
|
|
queueSize := p.queueSize()
|
|
if queueSize == 0 {
|
|
return
|
|
}
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
logger.Debug(logSender, "", "send queued events for notifier %q, events size: %v", p.config.Cmd, queueSize)
|
|
|
|
for _, ev := range p.fsEvents {
|
|
p.sendFsEvent(ev)
|
|
}
|
|
p.fsEvents = nil
|
|
|
|
for _, ev := range p.providerEvents {
|
|
p.sendProviderEvent(ev, nil)
|
|
}
|
|
p.providerEvents = nil
|
|
|
|
for _, ev := range p.logEvents {
|
|
p.sendLogEvent(ev)
|
|
}
|
|
p.logEvents = nil
|
|
|
|
logger.Debug(logSender, "", "%d queued events sent for notifier %q,", queueSize, p.config.Cmd)
|
|
}
|