expose KMS as plugin

This commit is contained in:
Nicola Murino
2021-07-16 18:22:42 +02:00
parent 776dffcf12
commit 6d313f6d8f
16 changed files with 1136 additions and 84 deletions

View File

@@ -4,10 +4,14 @@ package plugin
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
"github.com/drakkan/sftpgo/v2/kms"
"github.com/drakkan/sftpgo/v2/logger"
kmsplugin "github.com/drakkan/sftpgo/v2/sdk/plugin/kms"
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
)
@@ -30,8 +34,10 @@ type Renderer interface {
type Config struct {
// Plugin type
Type string `json:"type" mapstructure:"type"`
// NotifierOptions defines additional options for notifiers plugins
// NotifierOptions defines options for notifiers plugins
NotifierOptions NotifierConfig `json:"notifier_options" mapstructure:"notifier_options"`
// KMSOptions defines options for a KMS plugin
KMSOptions KMSConfig `json:"kms_options" mapstructure:"kms_options"`
// Path to the plugin executable
Cmd string `json:"cmd" mapstructure:"cmd"`
// Args to pass to the plugin executable
@@ -45,20 +51,43 @@ type Config struct {
// rejected. The client will also refuse to connect to any server that isn't
// the original instance started by the client.
AutoMTLS bool `json:"auto_mtls" mapstructure:"auto_mtls"`
// unique identifier for kms plugins
kmsID int
}
func (c *Config) newKMSPluginSecretProvider(base kms.BaseSecret, url, masterKey string) kms.SecretProvider {
return &kmsPluginSecretProvider{
BaseSecret: base,
URL: url,
MasterKey: masterKey,
config: c,
}
}
// Manager handles enabled plugins
type Manager struct {
closed int32
done chan bool
// List of configured plugins
Configs []Config `json:"plugins" mapstructure:"plugins"`
mu sync.RWMutex
notifLock sync.RWMutex
notifiers []*notifierPlugin
kmsLock sync.RWMutex
kms []*kmsPlugin
}
// Initialize initializes the configured plugins
func Initialize(configs []Config, logVerbose bool) error {
Handler = Manager{
Configs: configs,
done: make(chan bool),
closed: 0,
}
if len(configs) == 0 {
return nil
}
if err := Handler.validateConfigs(); err != nil {
return err
}
if logVerbose {
@@ -67,7 +96,8 @@ func Initialize(configs []Config, logVerbose bool) error {
pluginsLogLevel = hclog.Info
}
for _, config := range configs {
kmsID := 0
for idx, config := range Handler.Configs {
switch config.Type {
case notifier.PluginName:
plugin, err := newNotifierPlugin(config)
@@ -75,92 +105,172 @@ func Initialize(configs []Config, logVerbose bool) error {
return err
}
Handler.notifiers = append(Handler.notifiers, plugin)
case kmsplugin.PluginName:
plugin, err := newKMSPlugin(config)
if err != nil {
return err
}
Handler.kms = append(Handler.kms, plugin)
Handler.Configs[idx].kmsID = kmsID
kmsID++
kms.RegisterSecretProvider(config.KMSOptions.Scheme, config.KMSOptions.EncryptedStatus,
Handler.Configs[idx].newKMSPluginSecretProvider)
logger.Debug(logSender, "", "registered secret provider for scheme: %v, encrypted status: %v",
config.KMSOptions.Scheme, config.KMSOptions.EncryptedStatus)
default:
return fmt.Errorf("unsupported plugin type: %v", config.Type)
}
}
startCheckTicker()
return nil
}
func (m *Manager) validateConfigs() error {
kmsSchemes := make(map[string]bool)
kmsEncryptions := make(map[string]bool)
for _, config := range m.Configs {
if config.Type == kmsplugin.PluginName {
if _, ok := kmsSchemes[config.KMSOptions.Scheme]; ok {
return fmt.Errorf("invalid KMS configuration, duplicated scheme %#v", config.KMSOptions.Scheme)
}
if _, ok := kmsEncryptions[config.KMSOptions.EncryptedStatus]; ok {
return fmt.Errorf("invalid KMS configuration, duplicated encrypted status %#v", config.KMSOptions.EncryptedStatus)
}
kmsSchemes[config.KMSOptions.Scheme] = true
kmsEncryptions[config.KMSOptions.EncryptedStatus] = true
}
}
return nil
}
// NotifyFsEvent sends the fs event notifications using any defined notifier plugins
func (m *Manager) NotifyFsEvent(action, username, fsPath, fsTargetPath, sshCmd, protocol string, fileSize int64, err error) {
m.mu.RLock()
m.notifLock.RLock()
defer m.notifLock.RUnlock()
var crashedIdxs []int
for idx, n := range m.notifiers {
for _, n := range m.notifiers {
if n.exited() {
crashedIdxs = append(crashedIdxs, idx)
} else {
n.notifyFsAction(action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, err)
}
}
m.mu.RUnlock()
if len(crashedIdxs) > 0 {
m.restartCrashedNotifiers(crashedIdxs)
m.mu.RLock()
defer m.mu.RUnlock()
for idx := range crashedIdxs {
if !m.notifiers[idx].exited() {
m.notifiers[idx].notifyFsAction(action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, err)
}
logger.Warn(logSender, "", "notifer plugin %v is not active, unable to send fs event", n.config.Cmd)
continue
}
n.notifyFsAction(action, username, fsPath, fsTargetPath, sshCmd, protocol, fileSize, err)
}
}
// NotifyUserEvent sends the user event notifications using any defined notifier plugins
func (m *Manager) NotifyUserEvent(action string, user Renderer) {
m.mu.RLock()
m.notifLock.RLock()
defer m.notifLock.RUnlock()
var crashedIdxs []int
for idx, n := range m.notifiers {
for _, n := range m.notifiers {
if n.exited() {
crashedIdxs = append(crashedIdxs, idx)
} else {
n.notifyUserAction(action, user)
}
}
m.mu.RUnlock()
if len(crashedIdxs) > 0 {
m.restartCrashedNotifiers(crashedIdxs)
m.mu.RLock()
defer m.mu.RUnlock()
for idx := range crashedIdxs {
if !m.notifiers[idx].exited() {
m.notifiers[idx].notifyUserAction(action, user)
}
logger.Warn(logSender, "", "notifer plugin %v is not active, unable to send user event", n.config.Cmd)
continue
}
n.notifyUserAction(action, user)
}
}
func (m *Manager) restartCrashedNotifiers(crashedIdxs []int) {
for _, idx := range crashedIdxs {
m.mu.Lock()
defer m.mu.Unlock()
func (m *Manager) kmsEncrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, string, int32, error) {
m.kmsLock.RLock()
plugin := m.kms[kmsID]
m.kmsLock.RUnlock()
if m.notifiers[idx].exited() {
logger.Info(logSender, "", "try to restart crashed plugin %v", m.Configs[idx].Cmd)
plugin, err := newNotifierPlugin(m.Configs[idx])
if err == nil {
m.notifiers[idx] = plugin
} else {
logger.Warn(logSender, "", "plugin %v crashed and restart failed: %v", m.Configs[idx].Cmd, err)
}
return plugin.Encrypt(secret, url, masterKey)
}
func (m *Manager) kmsDecrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, error) {
m.kmsLock.RLock()
plugin := m.kms[kmsID]
m.kmsLock.RUnlock()
return plugin.Decrypt(secret, url, masterKey)
}
func (m *Manager) checkCrashedPlugins() {
m.notifLock.RLock()
for idx, n := range m.notifiers {
if n.exited() {
defer Handler.restartNotifierPlugin(n.config, idx)
}
}
m.notifLock.RUnlock()
m.kmsLock.RLock()
for idx, k := range m.kms {
if k.exited() {
defer Handler.restartKMSPlugin(k.config, idx)
}
}
m.kmsLock.RUnlock()
}
func (m *Manager) restartNotifierPlugin(config Config, idx int) {
if atomic.LoadInt32(&m.closed) == 1 {
return
}
logger.Info(logSender, "", "try to restart notifier crashed plugin %#v, idx: %v", config.Cmd, idx)
plugin, err := newNotifierPlugin(config)
if err != nil {
logger.Warn(logSender, "", "unable to restart notifier plugin %#v, err: %v", config.Cmd, err)
return
}
m.notifLock.Lock()
m.notifiers[idx] = plugin
m.notifLock.Unlock()
}
func (m *Manager) restartKMSPlugin(config Config, idx int) {
if atomic.LoadInt32(&m.closed) == 1 {
return
}
logger.Info(logSender, "", "try to restart crashed kms plugin %#v, idx: %v", config.Cmd, idx)
plugin, err := newKMSPlugin(config)
if err != nil {
logger.Warn(logSender, "", "unable to restart kms plugin %#v, err: %v", config.Cmd, err)
return
}
m.kmsLock.Lock()
m.kms[idx] = plugin
m.kmsLock.Unlock()
}
// Cleanup releases all the active plugins
func (m *Manager) Cleanup() {
atomic.StoreInt32(&m.closed, 1)
close(m.done)
m.notifLock.Lock()
for _, n := range m.notifiers {
logger.Debug(logSender, "", "cleanup plugin %v", n.config.Cmd)
logger.Debug(logSender, "", "cleanup notifier plugin %v", n.config.Cmd)
n.cleanup()
}
m.notifLock.Unlock()
m.kmsLock.Lock()
for _, k := range m.kms {
logger.Debug(logSender, "", "cleanup kms plugin %v", k.config.Cmd)
k.cleanup()
}
m.kmsLock.Unlock()
}
func startCheckTicker() {
logger.Debug(logSender, "", "start plugins checker")
checker := time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-Handler.done:
logger.Debug(logSender, "", "handler done, stop plugins checker")
checker.Stop()
return
case <-checker.C:
Handler.checkCrashedPlugins()
}
}
}()
}