plugins: improve notifier and searcher

This commit is contained in:
Nicola Murino
2021-10-20 19:39:49 +02:00
parent 3bbe67571f
commit 97d0a48557
18 changed files with 861 additions and 388 deletions

View File

@@ -10,7 +10,6 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/drakkan/sftpgo/v2/logger"
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
@@ -43,14 +42,14 @@ type eventsQueue struct {
providerEvents []*proto.ProviderEvent
}
func (q *eventsQueue) addFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip string,
func (q *eventsQueue) addFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip string,
fileSize int64, status int,
) {
q.Lock()
defer q.Unlock()
q.fsEvents = append(q.fsEvents, &proto.FsEvent{
Timestamp: timestamppb.New(timestamp),
Timestamp: timestamp,
Action: action,
Username: username,
FsPath: fsPath,
@@ -63,14 +62,14 @@ func (q *eventsQueue) addFsEvent(timestamp time.Time, action, username, fsPath,
})
}
func (q *eventsQueue) addProviderEvent(timestamp time.Time, action, username, objectType, objectName, ip string,
func (q *eventsQueue) addProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
objectAsJSON []byte,
) {
q.Lock()
defer q.Unlock()
q.providerEvents = append(q.providerEvents, &proto.ProviderEvent{
Timestamp: timestamppb.New(timestamp),
Timestamp: timestamp,
Action: action,
ObjectType: objectType,
Username: username,
@@ -191,11 +190,11 @@ func (p *notifierPlugin) initialize() error {
return nil
}
func (p *notifierPlugin) canQueueEvent(timestamp time.Time) bool {
func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
if p.config.NotifierOptions.RetryMaxTime == 0 {
return false
}
if time.Now().After(timestamp.Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
if time.Now().After(util.GetTimeFromMsecSinceEpoch(timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
return false
}
if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
@@ -204,7 +203,7 @@ func (p *notifierPlugin) canQueueEvent(timestamp time.Time) bool {
return true
}
func (p *notifierPlugin) notifyFsAction(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd,
func (p *notifierPlugin) notifyFsAction(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
protocol, ip, virtualPath, virtualTargetPath string, fileSize int64, errAction error) {
if !util.IsStringInSlice(action, p.config.NotifierOptions.FsEvents) {
return
@@ -220,7 +219,7 @@ func (p *notifierPlugin) notifyFsAction(timestamp time.Time, action, username, f
}()
}
func (p *notifierPlugin) notifyProviderAction(timestamp time.Time, action, username, objectType, objectName, ip string,
func (p *notifierPlugin) notifyProviderAction(timestamp int64, action, username, objectType, objectName, ip string,
object Renderer,
) {
if !util.IsStringInSlice(action, p.config.NotifierOptions.ProviderEvents) ||
@@ -238,7 +237,7 @@ func (p *notifierPlugin) notifyProviderAction(timestamp time.Time, action, usern
}()
}
func (p *notifierPlugin) sendFsEvent(timestamp time.Time, action, username, fsPath, fsTargetPath, sshCmd,
func (p *notifierPlugin) sendFsEvent(timestamp int64, action, username, fsPath, fsTargetPath, sshCmd,
protocol, ip, virtualPath, virtualTargetPath string, fileSize int64, status int) {
if err := p.notifier.NotifyFsEvent(timestamp, action, username, fsPath, fsTargetPath, sshCmd, protocol, ip,
virtualPath, virtualTargetPath, fileSize, status); err != nil {
@@ -249,7 +248,7 @@ func (p *notifierPlugin) sendFsEvent(timestamp time.Time, action, username, fsPa
}
}
func (p *notifierPlugin) sendProviderEvent(timestamp time.Time, action, username, objectType, objectName, ip string,
func (p *notifierPlugin) sendProviderEvent(timestamp int64, action, username, objectType, objectName, ip string,
objectAsJSON []byte,
) {
if err := p.notifier.NotifyProviderEvent(timestamp, action, username, objectType, objectName, ip, objectAsJSON); err != nil {
@@ -268,14 +267,14 @@ func (p *notifierPlugin) sendQueuedEvents() {
logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
fsEv := p.queue.popFsEvent()
for fsEv != nil {
go p.sendFsEvent(fsEv.Timestamp.AsTime(), fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath,
go p.sendFsEvent(fsEv.Timestamp, fsEv.Action, fsEv.Username, fsEv.FsPath, fsEv.FsTargetPath,
fsEv.SshCmd, fsEv.Protocol, fsEv.Ip, fsEv.VirtualPath, fsEv.VirtualTargetPath, fsEv.FileSize, int(fsEv.Status))
fsEv = p.queue.popFsEvent()
}
providerEv := p.queue.popProviderEvent()
for providerEv != nil {
go p.sendProviderEvent(providerEv.Timestamp.AsTime(), providerEv.Action, providerEv.Username, providerEv.ObjectType,
go p.sendProviderEvent(providerEv.Timestamp, providerEv.Action, providerEv.Username, providerEv.ObjectType,
providerEv.ObjectName, providerEv.Ip, providerEv.ObjectData)
providerEv = p.queue.popProviderEvent()
}