mirror of
https://github.com/drakkan/sftpgo.git
synced 2025-12-07 23:00:55 +03:00
notifiers plugin: replace params with a struct
Fixes #658 Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
@@ -12,7 +12,6 @@ import (
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier/proto"
|
||||
"github.com/drakkan/sftpgo/v2/util"
|
||||
)
|
||||
|
||||
@@ -37,48 +36,25 @@ func (c *NotifierConfig) hasActions() bool {
|
||||
|
||||
type eventsQueue struct {
|
||||
sync.RWMutex
|
||||
fsEvents []*proto.FsEvent
|
||||
providerEvents []*proto.ProviderEvent
|
||||
fsEvents []*notifier.FsEvent
|
||||
providerEvents []*notifier.ProviderEvent
|
||||
}
|
||||
|
||||
func (q *eventsQueue) addFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip string,
|
||||
fileSize int64, status int,
|
||||
) {
|
||||
func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
q.fsEvents = append(q.fsEvents, &proto.FsEvent{
|
||||
Timestamp: timestamp,
|
||||
Action: action,
|
||||
Username: username,
|
||||
FsPath: fsPath,
|
||||
FsTargetPath: fsTargetPath,
|
||||
SshCmd: sshCmd,
|
||||
FileSize: fileSize,
|
||||
Protocol: protocol,
|
||||
Ip: ip,
|
||||
Status: int32(status),
|
||||
})
|
||||
q.fsEvents = append(q.fsEvents, event)
|
||||
}
|
||||
|
||||
func (q *eventsQueue) addProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
|
||||
objectAsJSON []byte,
|
||||
) {
|
||||
func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
q.providerEvents = append(q.providerEvents, &proto.ProviderEvent{
|
||||
Timestamp: timestamp,
|
||||
Action: action,
|
||||
ObjectType: objectType,
|
||||
Username: username,
|
||||
Ip: ip,
|
||||
ObjectName: objectName,
|
||||
ObjectData: objectAsJSON,
|
||||
})
|
||||
q.providerEvents = append(q.providerEvents, event)
|
||||
}
|
||||
|
||||
func (q *eventsQueue) popFsEvent() *proto.FsEvent {
|
||||
func (q *eventsQueue) popFsEvent() *notifier.FsEvent {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
@@ -93,7 +69,7 @@ func (q *eventsQueue) popFsEvent() *proto.FsEvent {
|
||||
return ev
|
||||
}
|
||||
|
||||
func (q *eventsQueue) popProviderEvent() *proto.ProviderEvent {
|
||||
func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
@@ -193,7 +169,7 @@ func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
|
||||
if p.config.NotifierOptions.RetryMaxTime == 0 {
|
||||
return false
|
||||
}
|
||||
if time.Now().After(util.GetTimeFromMsecSinceEpoch(timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
|
||||
if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
|
||||
return false
|
||||
}
|
||||
if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
|
||||
@@ -202,58 +178,47 @@ func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) notifyFsAction(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
|
||||
protocol, ip, virtualPath, virtualTargetPath, sessionID string, fileSize int64, errAction error) {
|
||||
if !util.IsStringInSlice(action, p.config.NotifierOptions.FsEvents) {
|
||||
func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
|
||||
if !util.IsStringInSlice(event.Action, p.config.NotifierOptions.FsEvents) {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
status := 1
|
||||
if errAction != nil {
|
||||
status = 0
|
||||
}
|
||||
p.sendFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, virtualPath, virtualTargetPath,
|
||||
sessionID, fileSize, status)
|
||||
p.sendFsEvent(event)
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) notifyProviderAction(timestamp int64, action, username, objectType, objectName, ip string,
|
||||
object Renderer,
|
||||
) {
|
||||
if !util.IsStringInSlice(action, p.config.NotifierOptions.ProviderEvents) ||
|
||||
!util.IsStringInSlice(objectType, p.config.NotifierOptions.ProviderObjects) {
|
||||
func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
|
||||
if !util.IsStringInSlice(event.Action, p.config.NotifierOptions.ProviderEvents) ||
|
||||
!util.IsStringInSlice(event.ObjectType, p.config.NotifierOptions.ProviderObjects) {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
objectAsJSON, err := object.RenderAsJSON(action != "delete")
|
||||
objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
|
||||
if err != nil {
|
||||
logger.Warn(logSender, "", "unable to render user as json for action %v: %v", action, err)
|
||||
logger.Warn(logSender, "", "unable to render user as json for action %v: %v", event.Action, err)
|
||||
return
|
||||
}
|
||||
p.sendProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON)
|
||||
event.ObjectData = objectAsJSON
|
||||
p.sendProviderEvent(event)
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) sendFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
|
||||
protocol, ip, virtualPath, virtualTargetPath, sessionID string, fileSize int64, status int) {
|
||||
if err := p.notifier.NotifyFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
|
||||
virtualPath, virtualTargetPath, sessionID, fileSize, status); err != nil {
|
||||
func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) {
|
||||
if err := p.notifier.NotifyFsEvent(event); err != nil {
|
||||
logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
|
||||
if p.canQueueEvent(timestamp) {
|
||||
p.queue.addFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, fileSize, status)
|
||||
if p.canQueueEvent(event.Timestamp) {
|
||||
p.queue.addFsEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) sendProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
|
||||
objectAsJSON []byte,
|
||||
) {
|
||||
if err := p.notifier.NotifyProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON); err != nil {
|
||||
func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) {
|
||||
if err := p.notifier.NotifyProviderEvent(event); err != nil {
|
||||
logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
|
||||
if p.canQueueEvent(timestamp) {
|
||||
p.queue.addProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON)
|
||||
if p.canQueueEvent(event.Timestamp) {
|
||||
p.queue.addProviderEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -266,16 +231,17 @@ func (p *notifierPlugin) sendQueuedEvents() {
|
||||
logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
|
||||
fsEv := p.queue.popFsEvent()
|
||||
for fsEv != nil {
|
||||
go p.sendFsEvent(fsEv.Timestamp, fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath,
|
||||
fsEv.SshCmd, fsEv.Protocol, fsEv.Ip, fsEv.VirtualPath, fsEv.VirtualTargetPath, fsEv.SessionId,
|
||||
fsEv.FileSize, int(fsEv.Status))
|
||||
go func(ev *notifier.FsEvent) {
|
||||
p.sendFsEvent(ev)
|
||||
}(fsEv)
|
||||
fsEv = p.queue.popFsEvent()
|
||||
}
|
||||
|
||||
providerEv := p.queue.popProviderEvent()
|
||||
for providerEv != nil {
|
||||
go p.sendProviderEvent(providerEv.Timestamp, providerEv.Action, providerEv.Username, providerEv.ObjectType,
|
||||
providerEv.ObjectName, providerEv.Ip, providerEv.ObjectData)
|
||||
go func(ev *notifier.ProviderEvent) {
|
||||
p.sendProviderEvent(ev)
|
||||
}(providerEv)
|
||||
providerEv = p.queue.popProviderEvent()
|
||||
}
|
||||
logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())
|
||||
|
||||
@@ -69,14 +69,35 @@ type GRPCServer struct {
|
||||
|
||||
// SendFsEvent implements the serve side fs notify method
|
||||
func (s *GRPCServer) SendFsEvent(ctx context.Context, req *proto.FsEvent) (*emptypb.Empty, error) {
|
||||
err := s.Impl.NotifyFsEvent(req.Timestamp, req.Action, req.Username, req.FsPath, req.FsTargetPath, req.SshCmd,
|
||||
req.Protocol, req.Ip, req.VirtualPath, req.VirtualTargetPath, req.SessionId, req.FileSize, int(req.Status))
|
||||
event := &FsEvent{
|
||||
Action: req.Action,
|
||||
Username: req.Username,
|
||||
Path: req.FsPath,
|
||||
TargetPath: req.FsTargetPath,
|
||||
VirtualPath: req.VirtualPath,
|
||||
SSHCmd: req.SshCmd,
|
||||
FileSize: req.FileSize,
|
||||
Status: int(req.Status),
|
||||
Protocol: req.Protocol,
|
||||
IP: req.Ip,
|
||||
SessionID: req.SessionId,
|
||||
Timestamp: req.Timestamp,
|
||||
}
|
||||
err := s.Impl.NotifyFsEvent(event)
|
||||
return &emptypb.Empty{}, err
|
||||
}
|
||||
|
||||
// SendProviderEvent implements the serve side provider event notify method
|
||||
func (s *GRPCServer) SendProviderEvent(ctx context.Context, req *proto.ProviderEvent) (*emptypb.Empty, error) {
|
||||
err := s.Impl.NotifyProviderEvent(req.Timestamp, req.Action, req.Username, req.ObjectType, req.ObjectName,
|
||||
req.Ip, req.ObjectData)
|
||||
event := &ProviderEvent{
|
||||
Action: req.Action,
|
||||
Username: req.Username,
|
||||
ObjectType: req.ObjectType,
|
||||
ObjectName: req.ObjectName,
|
||||
IP: req.Ip,
|
||||
ObjectData: req.ObjectData,
|
||||
Timestamp: req.Timestamp,
|
||||
}
|
||||
err := s.Impl.NotifyProviderEvent(event)
|
||||
return &emptypb.Empty{}, err
|
||||
}
|
||||
|
||||
@@ -30,11 +30,42 @@ var PluginMap = map[string]plugin.Plugin{
|
||||
PluginName: &Plugin{},
|
||||
}
|
||||
|
||||
// FsEvent defines a file system event
|
||||
type FsEvent struct {
|
||||
Action string `json:"action"`
|
||||
Username string `json:"username"`
|
||||
Path string `json:"path"`
|
||||
TargetPath string `json:"target_path,omitempty"`
|
||||
VirtualPath string `json:"virtual_path"`
|
||||
VirtualTargetPath string `json:"virtual_target_path,omitempty"`
|
||||
SSHCmd string `json:"ssh_cmd,omitempty"`
|
||||
FileSize int64 `json:"file_size,omitempty"`
|
||||
FsProvider int `json:"fs_provider"`
|
||||
Bucket string `json:"bucket,omitempty"`
|
||||
Endpoint string `json:"endpoint,omitempty"`
|
||||
Status int `json:"status"`
|
||||
Protocol string `json:"protocol"`
|
||||
IP string `json:"ip"`
|
||||
SessionID string `json:"session_id"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
OpenFlags int `json:"open_flags,omitempty"`
|
||||
}
|
||||
|
||||
// ProviderEvent defines a provider event
|
||||
type ProviderEvent struct {
|
||||
Action string
|
||||
Username string
|
||||
ObjectType string
|
||||
ObjectName string
|
||||
IP string
|
||||
ObjectData []byte
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
// Notifier defines the interface for notifiers plugins
|
||||
type Notifier interface {
|
||||
NotifyFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
|
||||
virtualPath, virtualTargetPath, sessionID string, fileSize int64, status int) error
|
||||
NotifyProviderEvent(timestamp int64, action, username, objectType, objectName, ip string, object []byte) error
|
||||
NotifyFsEvent(event *FsEvent) error
|
||||
NotifyProviderEvent(event *ProviderEvent) error
|
||||
}
|
||||
|
||||
// Plugin defines the implementation to serve/connect to a notifier plugin
|
||||
|
||||
@@ -95,6 +95,7 @@ type Manager struct {
|
||||
authScopes int
|
||||
hasSearcher bool
|
||||
hasMetadater bool
|
||||
hasNotifiers bool
|
||||
}
|
||||
|
||||
// Initialize initializes the configured plugins
|
||||
@@ -172,6 +173,7 @@ func (m *Manager) validateConfigs() error {
|
||||
kmsEncryptions := make(map[string]bool)
|
||||
m.hasSearcher = false
|
||||
m.hasMetadater = false
|
||||
m.hasNotifiers = false
|
||||
|
||||
for _, config := range m.Configs {
|
||||
if config.Type == kmsplugin.PluginName {
|
||||
@@ -196,32 +198,35 @@ func (m *Manager) validateConfigs() error {
|
||||
}
|
||||
m.hasMetadater = true
|
||||
}
|
||||
if config.Type == notifier.PluginName {
|
||||
m.hasNotifiers = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasNotifiers returns true if there is at least a notifier plugin
|
||||
func (m *Manager) HasNotifiers() bool {
|
||||
return m.hasNotifiers
|
||||
}
|
||||
|
||||
// NotifyFsEvent sends the fs event notifications using any defined notifier plugins
|
||||
func (m *Manager) NotifyFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
|
||||
virtualPath, virtualTargetPath, sessionID string, fileSize int64, err error,
|
||||
) {
|
||||
func (m *Manager) NotifyFsEvent(event *notifier.FsEvent) {
|
||||
m.notifLock.RLock()
|
||||
defer m.notifLock.RUnlock()
|
||||
|
||||
for _, n := range m.notifiers {
|
||||
n.notifyFsAction(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip, virtualPath,
|
||||
virtualTargetPath, sessionID, fileSize, err)
|
||||
n.notifyFsAction(event)
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyProviderEvent sends the provider event notifications using any defined notifier plugins
|
||||
func (m *Manager) NotifyProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
|
||||
object Renderer,
|
||||
) {
|
||||
func (m *Manager) NotifyProviderEvent(event *notifier.ProviderEvent, object Renderer) {
|
||||
m.notifLock.RLock()
|
||||
defer m.notifLock.RUnlock()
|
||||
|
||||
for _, n := range m.notifiers {
|
||||
n.notifyProviderAction(timestamp, action, username, objectType, objectName, ip, object)
|
||||
n.notifyProviderAction(event, object)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user