plugin: simplify notifiers queue handling

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2024-12-25 22:53:26 +01:00
parent 22f80b97f0
commit e689d52dca
2 changed files with 101 additions and 151 deletions

View File

@@ -50,97 +50,19 @@ func (c *NotifierConfig) hasActions() bool {
return false return false
} }
type eventsQueue struct { type notifierPlugin struct {
sync.RWMutex config Config
notifier notifier.Notifier
client *plugin.Client
mu sync.RWMutex
fsEvents []*notifier.FsEvent fsEvents []*notifier.FsEvent
providerEvents []*notifier.ProviderEvent providerEvents []*notifier.ProviderEvent
logEvents []*notifier.LogEvent logEvents []*notifier.LogEvent
} }
func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) {
q.Lock()
defer q.Unlock()
q.fsEvents = append(q.fsEvents, event)
}
func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) {
q.Lock()
defer q.Unlock()
q.providerEvents = append(q.providerEvents, event)
}
func (q *eventsQueue) addLogEvent(event *notifier.LogEvent) {
q.Lock()
defer q.Unlock()
q.logEvents = append(q.logEvents, event)
}
func (q *eventsQueue) popFsEvent() *notifier.FsEvent {
q.Lock()
defer q.Unlock()
if len(q.fsEvents) == 0 {
return nil
}
truncLen := len(q.fsEvents) - 1
ev := q.fsEvents[truncLen]
q.fsEvents[truncLen] = nil
q.fsEvents = q.fsEvents[:truncLen]
return ev
}
func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent {
q.Lock()
defer q.Unlock()
if len(q.providerEvents) == 0 {
return nil
}
truncLen := len(q.providerEvents) - 1
ev := q.providerEvents[truncLen]
q.providerEvents[truncLen] = nil
q.providerEvents = q.providerEvents[:truncLen]
return ev
}
func (q *eventsQueue) popLogEvent() *notifier.LogEvent {
q.Lock()
defer q.Unlock()
if len(q.logEvents) == 0 {
return nil
}
truncLen := len(q.logEvents) - 1
ev := q.logEvents[truncLen]
q.logEvents[truncLen] = nil
q.logEvents = q.logEvents[:truncLen]
return ev
}
func (q *eventsQueue) getSize() int {
q.RLock()
defer q.RUnlock()
return len(q.providerEvents) + len(q.fsEvents) + len(q.logEvents)
}
type notifierPlugin struct {
config Config
notifier notifier.Notifier
client *plugin.Client
queue *eventsQueue
}
func newNotifierPlugin(config Config) (*notifierPlugin, error) { func newNotifierPlugin(config Config) (*notifierPlugin, error) {
p := &notifierPlugin{ p := &notifierPlugin{
config: config, config: config,
queue: &eventsQueue{},
} }
if err := p.initialize(); err != nil { if err := p.initialize(); err != nil {
logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config) logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
@@ -180,7 +102,7 @@ func (p *notifierPlugin) initialize() error {
Managed: false, Managed: false,
Logger: &logger.HCLogAdapter{ Logger: &logger.HCLogAdapter{
Logger: hclog.New(&hclog.LoggerOptions{ Logger: hclog.New(&hclog.LoggerOptions{
Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName), Name: fmt.Sprintf("%s.%s", logSender, notifier.PluginName),
Level: pluginsLogLevel, Level: pluginsLogLevel,
DisableTime: true, DisableTime: true,
}), }),
@@ -204,6 +126,34 @@ func (p *notifierPlugin) initialize() error {
return nil return nil
} }
func (p *notifierPlugin) queueSize() int {
p.mu.RLock()
defer p.mu.RUnlock()
return len(p.providerEvents) + len(p.fsEvents) + len(p.logEvents)
}
func (p *notifierPlugin) queueFsEvent(ev *notifier.FsEvent) {
p.mu.Lock()
defer p.mu.Unlock()
p.fsEvents = append(p.fsEvents, ev)
}
func (p *notifierPlugin) queueProviderEvent(ev *notifier.ProviderEvent) {
p.mu.Lock()
defer p.mu.Unlock()
p.providerEvents = append(p.providerEvents, ev)
}
func (p *notifierPlugin) queueLogEvent(ev *notifier.LogEvent) {
p.mu.Lock()
defer p.mu.Unlock()
p.logEvents = append(p.logEvents, ev)
}
func (p *notifierPlugin) canQueueEvent(timestamp int64) bool { func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
if p.config.NotifierOptions.RetryMaxTime == 0 { if p.config.NotifierOptions.RetryMaxTime == 0 {
return false return false
@@ -214,7 +164,7 @@ func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
return false return false
} }
if p.config.NotifierOptions.RetryQueueMaxSize > 0 { if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize return p.queueSize() < p.config.NotifierOptions.RetryQueueMaxSize
} }
return true return true
} }
@@ -223,13 +173,7 @@ func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
if !slices.Contains(p.config.NotifierOptions.FsEvents, event.Action) { if !slices.Contains(p.config.NotifierOptions.FsEvents, event.Action) {
return return
} }
go func() {
Handler.addTask()
defer Handler.removeTask()
p.sendFsEvent(event) p.sendFsEvent(event)
}()
} }
func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) { func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
@@ -237,84 +181,88 @@ func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, obj
!slices.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) { !slices.Contains(p.config.NotifierOptions.ProviderObjects, event.ObjectType) {
return return
} }
p.sendProviderEvent(event, object)
go func() {
Handler.addTask()
defer Handler.removeTask()
objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
if err != nil {
logger.Warn(logSender, "", "unable to render user as json for action %v: %v", event.Action, err)
return
}
event.ObjectData = objectAsJSON
p.sendProviderEvent(event)
}()
} }
func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) { func (p *notifierPlugin) notifyLogEvent(event *notifier.LogEvent) {
go func() { p.sendLogEvent(event)
}
func (p *notifierPlugin) sendFsEvent(ev *notifier.FsEvent) {
go func(event *notifier.FsEvent) {
Handler.addTask() Handler.addTask()
defer Handler.removeTask() defer Handler.removeTask()
p.sendLogEvent(event)
}()
}
func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) {
if err := p.notifier.NotifyFsEvent(event); err != nil { if err := p.notifier.NotifyFsEvent(event); err != nil {
logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err) logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
if p.canQueueEvent(event.Timestamp) { if p.canQueueEvent(event.Timestamp) {
p.queue.addFsEvent(event) p.queueFsEvent(event)
} }
} }
}(ev)
} }
func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) { func (p *notifierPlugin) sendProviderEvent(ev *notifier.ProviderEvent, object Renderer) {
go func(event *notifier.ProviderEvent) {
Handler.addTask()
defer Handler.removeTask()
if object != nil {
objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
if err != nil {
logger.Error(logSender, "", "unable to render user as json for action %q: %v", event.Action, err)
} else {
event.ObjectData = objectAsJSON
}
}
if err := p.notifier.NotifyProviderEvent(event); err != nil { if err := p.notifier.NotifyProviderEvent(event); err != nil {
logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err) logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
if p.canQueueEvent(event.Timestamp) { if p.canQueueEvent(event.Timestamp) {
p.queue.addProviderEvent(event) p.queueProviderEvent(event)
} }
} }
}(ev)
} }
func (p *notifierPlugin) sendLogEvent(event *notifier.LogEvent) { func (p *notifierPlugin) sendLogEvent(ev *notifier.LogEvent) {
go func(event *notifier.LogEvent) {
Handler.addTask()
defer Handler.removeTask()
if err := p.notifier.NotifyLogEvent(event); err != nil { if err := p.notifier.NotifyLogEvent(event); err != nil {
logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err) logger.Warn(logSender, "", "unable to send log event to plugin %v: %v", p.config.Cmd, err)
if p.canQueueEvent(event.Timestamp) { if p.canQueueEvent(event.Timestamp) {
p.queue.addLogEvent(event) p.queueLogEvent(event)
} }
} }
}(ev)
} }
func (p *notifierPlugin) sendQueuedEvents() { func (p *notifierPlugin) sendQueuedEvents() {
queueSize := p.queue.getSize() queueSize := p.queueSize()
if queueSize == 0 { if queueSize == 0 {
return return
} }
logger.Debug(logSender, "", "check queued events for notifier %q, events size: %v", p.config.Cmd, queueSize) p.mu.Lock()
fsEv := p.queue.popFsEvent() defer p.mu.Unlock()
for fsEv != nil {
go func(ev *notifier.FsEvent) {
p.sendFsEvent(ev)
}(fsEv)
fsEv = p.queue.popFsEvent()
}
providerEv := p.queue.popProviderEvent() logger.Debug(logSender, "", "send queued events for notifier %q, events size: %v", p.config.Cmd, queueSize)
for providerEv != nil {
go func(ev *notifier.ProviderEvent) { for _, ev := range p.fsEvents {
p.sendProviderEvent(ev) p.sendFsEvent(ev)
}(providerEv)
providerEv = p.queue.popProviderEvent()
} }
logEv := p.queue.popLogEvent() p.fsEvents = nil
for logEv != nil {
go func(ev *notifier.LogEvent) { for _, ev := range p.providerEvents {
p.sendProviderEvent(ev, nil)
}
p.providerEvents = nil
for _, ev := range p.logEvents {
p.sendLogEvent(ev) p.sendLogEvent(ev)
}(logEv)
logEv = p.queue.popLogEvent()
} }
logger.Debug(logSender, "", "queued events sent for notifier %q, new events size: %v", p.config.Cmd, p.queue.getSize()) p.logEvents = nil
logger.Debug(logSender, "", "%d queued events sent for notifier %q,", queueSize, p.config.Cmd)
} }

View File

@@ -642,7 +642,9 @@ func (m *Manager) restartNotifierPlugin(config Config, idx int) {
} }
m.notifLock.Lock() m.notifLock.Lock()
plugin.queue = m.notifiers[idx].queue plugin.fsEvents = m.notifiers[idx].fsEvents
plugin.providerEvents = m.notifiers[idx].providerEvents
plugin.logEvents = m.notifiers[idx].logEvents
m.notifiers[idx] = plugin m.notifiers[idx] = plugin
m.notifLock.Unlock() m.notifLock.Unlock()
plugin.sendQueuedEvents() plugin.sendQueuedEvents()