add support for log events

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2023-05-12 18:34:59 +02:00
parent 43d011f125
commit 4eded56d5f
34 changed files with 856 additions and 139 deletions

View File

@@ -33,6 +33,7 @@ type NotifierConfig struct {
FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
LogEvents []int `json:"log_events" mapstructure:"log_events"`
RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
}
@@ -51,6 +52,7 @@ type eventsQueue struct {
sync.RWMutex
fsEvents []*notifier.FsEvent
providerEvents []*notifier.ProviderEvent
logEvents []*notifier.LogEvent
}
func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) {
@@ -67,6 +69,13 @@ func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) {
q.providerEvents = append(q.providerEvents, event)
}
func (q *eventsQueue) addLogEvent(event *notifier.LogEvent) {
q.Lock()
defer q.Unlock()
q.logEvents = append(q.logEvents, event)
}
func (q *eventsQueue) popFsEvent() *notifier.FsEvent {
q.Lock()
defer q.Unlock()
@@ -97,11 +106,26 @@ func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent {
return ev
}
func (q *eventsQueue) popLogEvent() *notifier.LogEvent {
q.Lock()
defer q.Unlock()
if len(q.logEvents) == 0 {
return nil
}
truncLen := len(q.logEvents) - 1
ev := q.logEvents[truncLen]
q.logEvents[truncLen] = nil
q.logEvents = q.logEvents[:truncLen]
return ev
}
func (q *eventsQueue) getSize() int {
q.RLock()
defer q.RUnlock()
return len(q.providerEvents) + len(q.fsEvents)
return len(q.providerEvents) + len(q.fsEvents) + len(q.logEvents)
}
type notifierPlugin struct {
@@ -225,6 +249,19 @@ func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, obj
}()
}
func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) {
if !util.Contains(p.config.NotifierOptions.LogEvents, int(event.Event)) {
return
}
go func() {
Handler.addTask()
defer Handler.removeTask()
p.sendLogEvent(event)
}()
}
func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) {
if err := p.notifier.NotifyFsEvent(event); err != nil {
logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
@@ -243,6 +280,15 @@ func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) {
}
}
func (p *notifierPlugin) sendLogEvent(event *notifier.LogEvent) {
if err := p.notifier.NotifyLogEvent(event); err != nil {
logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err)
if p.canQueueEvent(event.Timestamp) {
p.queue.addLogEvent(event)
}
}
}
func (p *notifierPlugin) sendQueuedEvents() {
queueSize := p.queue.getSize()
if queueSize == 0 {
@@ -264,5 +310,12 @@ func (p *notifierPlugin) sendQueuedEvents() {
}(providerEv)
providerEv = p.queue.popProviderEvent()
}
logEv := p.queue.popLogEvent()
for logEv != nil {
go func(ev *notifier.LogEvent) {
p.sendLogEvent(ev)
}(logEv)
logEv = p.queue.popLogEvent()
}
logger.Debug(logSender, "", "queued events sent for notifier %q, new events size: %v", p.config.Cmd, p.queue.getSize())
}