mirror of
https://github.com/drakkan/sftpgo.git
synced 2025-12-09 08:15:13 +03:00
move plugin handling outside the sdk package
Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
180
plugin/auth.go
Normal file
180
plugin/auth.go
Normal file
@@ -0,0 +1,180 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/auth"
|
||||
)
|
||||
|
||||
// Supported auth scopes
|
||||
const (
|
||||
AuthScopePassword = 1
|
||||
AuthScopePublicKey = 2
|
||||
AuthScopeKeyboardInteractive = 4
|
||||
AuthScopeTLSCertificate = 8
|
||||
)
|
||||
|
||||
// KeyboardAuthRequest defines the request for a keyboard interactive authentication step
|
||||
type KeyboardAuthRequest struct {
|
||||
RequestID string `json:"request_id"`
|
||||
Step int `json:"step"`
|
||||
Username string `json:"username,omitempty"`
|
||||
IP string `json:"ip,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
Answers []string `json:"answers,omitempty"`
|
||||
Questions []string `json:"questions,omitempty"`
|
||||
}
|
||||
|
||||
// KeyboardAuthResponse defines the response for a keyboard interactive authentication step
|
||||
type KeyboardAuthResponse struct {
|
||||
Instruction string `json:"instruction"`
|
||||
Questions []string `json:"questions"`
|
||||
Echos []bool `json:"echos"`
|
||||
AuthResult int `json:"auth_result"`
|
||||
CheckPwd int `json:"check_password"`
|
||||
}
|
||||
|
||||
// Validate returns an error if the KeyboardAuthResponse is invalid
|
||||
func (r *KeyboardAuthResponse) Validate() error {
|
||||
if len(r.Questions) == 0 {
|
||||
err := errors.New("interactive auth error: response does not contain questions")
|
||||
return err
|
||||
}
|
||||
if len(r.Questions) != len(r.Echos) {
|
||||
err := fmt.Errorf("interactive auth error: response questions don't match echos: %v %v",
|
||||
len(r.Questions), len(r.Echos))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AuthConfig defines configuration parameters for auth plugins
|
||||
type AuthConfig struct {
|
||||
// Scope defines the scope for the authentication plugin.
|
||||
// - 1 means passwords only
|
||||
// - 2 means public keys only
|
||||
// - 4 means keyboard interactive only
|
||||
// - 8 means TLS certificates only
|
||||
// you can combine the scopes, for example 3 means password and public key, 5 password and keyboard
|
||||
// interactive and so on
|
||||
Scope int `json:"scope" mapstructure:"scope"`
|
||||
}
|
||||
|
||||
func (c *AuthConfig) validate() error {
|
||||
authScopeMax := AuthScopePassword + AuthScopePublicKey + AuthScopeKeyboardInteractive + AuthScopeTLSCertificate
|
||||
if c.Scope == 0 || c.Scope > authScopeMax {
|
||||
return fmt.Errorf("invalid auth scope: %v", c.Scope)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type authPlugin struct {
|
||||
config Config
|
||||
service auth.Authenticator
|
||||
client *plugin.Client
|
||||
}
|
||||
|
||||
func newAuthPlugin(config Config) (*authPlugin, error) {
|
||||
p := &authPlugin{
|
||||
config: config,
|
||||
}
|
||||
if err := p.initialize(); err != nil {
|
||||
logger.Warn(logSender, "", "unable to create auth plugin: %v, config %+v", err, config)
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *authPlugin) initialize() error {
|
||||
killProcess(p.config.Cmd)
|
||||
logger.Debug(logSender, "", "create new auth plugin %#v", p.config.Cmd)
|
||||
if err := p.config.AuthOptions.validate(); err != nil {
|
||||
return fmt.Errorf("invalid options for auth plugin %#v: %v", p.config.Cmd, err)
|
||||
}
|
||||
|
||||
var secureConfig *plugin.SecureConfig
|
||||
if p.config.SHA256Sum != "" {
|
||||
secureConfig.Checksum = []byte(p.config.SHA256Sum)
|
||||
secureConfig.Hash = sha256.New()
|
||||
}
|
||||
client := plugin.NewClient(&plugin.ClientConfig{
|
||||
HandshakeConfig: auth.Handshake,
|
||||
Plugins: auth.PluginMap,
|
||||
Cmd: exec.Command(p.config.Cmd, p.config.Args...),
|
||||
AllowedProtocols: []plugin.Protocol{
|
||||
plugin.ProtocolGRPC,
|
||||
},
|
||||
AutoMTLS: p.config.AutoMTLS,
|
||||
SecureConfig: secureConfig,
|
||||
Managed: false,
|
||||
Logger: &logger.HCLogAdapter{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{
|
||||
Name: fmt.Sprintf("%v.%v", logSender, auth.PluginName),
|
||||
Level: pluginsLogLevel,
|
||||
DisableTime: true,
|
||||
}),
|
||||
},
|
||||
})
|
||||
rpcClient, err := client.Client()
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get rpc client for kms plugin %#v: %v", p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
raw, err := rpcClient.Dispense(auth.PluginName)
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
|
||||
auth.PluginName, p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
|
||||
p.service = raw.(auth.Authenticator)
|
||||
p.client = client
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *authPlugin) exited() bool {
|
||||
return p.client.Exited()
|
||||
}
|
||||
|
||||
func (p *authPlugin) cleanup() {
|
||||
p.client.Kill()
|
||||
}
|
||||
|
||||
func (p *authPlugin) checkUserAndPass(username, password, ip, protocol string, userAsJSON []byte) ([]byte, error) {
|
||||
return p.service.CheckUserAndPass(username, password, ip, protocol, userAsJSON)
|
||||
}
|
||||
|
||||
func (p *authPlugin) checkUserAndTLSCertificate(username, tlsCert, ip, protocol string, userAsJSON []byte) ([]byte, error) {
|
||||
return p.service.CheckUserAndTLSCert(username, tlsCert, ip, protocol, userAsJSON)
|
||||
}
|
||||
|
||||
func (p *authPlugin) checkUserAndPublicKey(username, pubKey, ip, protocol string, userAsJSON []byte) ([]byte, error) {
|
||||
return p.service.CheckUserAndPublicKey(username, pubKey, ip, protocol, userAsJSON)
|
||||
}
|
||||
|
||||
func (p *authPlugin) checkUserAndKeyboardInteractive(username, ip, protocol string, userAsJSON []byte) ([]byte, error) {
|
||||
return p.service.CheckUserAndKeyboardInteractive(username, ip, protocol, userAsJSON)
|
||||
}
|
||||
|
||||
func (p *authPlugin) sendKeyboardIteractiveRequest(req *KeyboardAuthRequest) (*KeyboardAuthResponse, error) {
|
||||
instructions, questions, echos, authResult, checkPassword, err := p.service.SendKeyboardAuthRequest(
|
||||
req.RequestID, req.Username, req.Password, req.IP, req.Answers, req.Questions, int32(req.Step))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &KeyboardAuthResponse{
|
||||
Instruction: instructions,
|
||||
Questions: questions,
|
||||
Echos: echos,
|
||||
AuthResult: authResult,
|
||||
CheckPwd: checkPassword,
|
||||
}, nil
|
||||
}
|
||||
181
plugin/kms.go
Normal file
181
plugin/kms.go
Normal file
@@ -0,0 +1,181 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/kms"
|
||||
kmsplugin "github.com/drakkan/sftpgo/v2/sdk/plugin/kms"
|
||||
"github.com/drakkan/sftpgo/v2/util"
|
||||
)
|
||||
|
||||
var (
|
||||
validKMSSchemes = []string{kms.SchemeAWS, kms.SchemeGCP, kms.SchemeVaultTransit, kms.SchemeAzureKeyVault}
|
||||
validKMSEncryptedStatuses = []string{kms.SecretStatusVaultTransit, kms.SecretStatusAWS, kms.SecretStatusGCP,
|
||||
kms.SecretStatusAzureKeyVault}
|
||||
)
|
||||
|
||||
// KMSConfig defines configuration parameters for kms plugins
|
||||
type KMSConfig struct {
|
||||
Scheme string `json:"scheme" mapstructure:"scheme"`
|
||||
EncryptedStatus string `json:"encrypted_status" mapstructure:"encrypted_status"`
|
||||
}
|
||||
|
||||
func (c *KMSConfig) validate() error {
|
||||
if !util.IsStringInSlice(c.Scheme, validKMSSchemes) {
|
||||
return fmt.Errorf("invalid kms scheme: %v", c.Scheme)
|
||||
}
|
||||
if !util.IsStringInSlice(c.EncryptedStatus, validKMSEncryptedStatuses) {
|
||||
return fmt.Errorf("invalid kms encrypted status: %v", c.EncryptedStatus)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type kmsPlugin struct {
|
||||
config Config
|
||||
service kmsplugin.Service
|
||||
client *plugin.Client
|
||||
}
|
||||
|
||||
func newKMSPlugin(config Config) (*kmsPlugin, error) {
|
||||
p := &kmsPlugin{
|
||||
config: config,
|
||||
}
|
||||
if err := p.initialize(); err != nil {
|
||||
logger.Warn(logSender, "", "unable to create kms plugin: %v, config %+v", err, config)
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *kmsPlugin) initialize() error {
|
||||
killProcess(p.config.Cmd)
|
||||
logger.Debug(logSender, "", "create new kms plugin %#v", p.config.Cmd)
|
||||
if err := p.config.KMSOptions.validate(); err != nil {
|
||||
return fmt.Errorf("invalid options for kms plugin %#v: %v", p.config.Cmd, err)
|
||||
}
|
||||
var secureConfig *plugin.SecureConfig
|
||||
if p.config.SHA256Sum != "" {
|
||||
secureConfig.Checksum = []byte(p.config.SHA256Sum)
|
||||
secureConfig.Hash = sha256.New()
|
||||
}
|
||||
client := plugin.NewClient(&plugin.ClientConfig{
|
||||
HandshakeConfig: kmsplugin.Handshake,
|
||||
Plugins: kmsplugin.PluginMap,
|
||||
Cmd: exec.Command(p.config.Cmd, p.config.Args...),
|
||||
AllowedProtocols: []plugin.Protocol{
|
||||
plugin.ProtocolGRPC,
|
||||
},
|
||||
AutoMTLS: p.config.AutoMTLS,
|
||||
SecureConfig: secureConfig,
|
||||
Managed: false,
|
||||
Logger: &logger.HCLogAdapter{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{
|
||||
Name: fmt.Sprintf("%v.%v", logSender, kmsplugin.PluginName),
|
||||
Level: pluginsLogLevel,
|
||||
DisableTime: true,
|
||||
}),
|
||||
},
|
||||
})
|
||||
rpcClient, err := client.Client()
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get rpc client for kms plugin %#v: %v", p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
raw, err := rpcClient.Dispense(kmsplugin.PluginName)
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
|
||||
kmsplugin.PluginName, p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
|
||||
p.client = client
|
||||
p.service = raw.(kmsplugin.Service)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *kmsPlugin) exited() bool {
|
||||
return p.client.Exited()
|
||||
}
|
||||
|
||||
func (p *kmsPlugin) cleanup() {
|
||||
p.client.Kill()
|
||||
}
|
||||
|
||||
func (p *kmsPlugin) Encrypt(secret kms.BaseSecret, url string, masterKey string) (string, string, int32, error) {
|
||||
return p.service.Encrypt(secret.Payload, secret.AdditionalData, url, masterKey)
|
||||
}
|
||||
|
||||
func (p *kmsPlugin) Decrypt(secret kms.BaseSecret, url string, masterKey string) (string, error) {
|
||||
return p.service.Decrypt(secret.Payload, secret.Key, secret.AdditionalData, secret.Mode, url, masterKey)
|
||||
}
|
||||
|
||||
type kmsPluginSecretProvider struct {
|
||||
kms.BaseSecret
|
||||
URL string
|
||||
MasterKey string
|
||||
config *Config
|
||||
}
|
||||
|
||||
func (s *kmsPluginSecretProvider) Name() string {
|
||||
return fmt.Sprintf("KMSPlugin_%v_%v_%v", filepath.Base(s.config.Cmd), s.config.KMSOptions.Scheme, s.config.kmsID)
|
||||
}
|
||||
|
||||
func (s *kmsPluginSecretProvider) IsEncrypted() bool {
|
||||
return s.Status == s.config.KMSOptions.EncryptedStatus
|
||||
}
|
||||
|
||||
func (s *kmsPluginSecretProvider) Encrypt() error {
|
||||
if s.Status != kms.SecretStatusPlain {
|
||||
return kms.ErrWrongSecretStatus
|
||||
}
|
||||
if s.Payload == "" {
|
||||
return kms.ErrInvalidSecret
|
||||
}
|
||||
|
||||
payload, key, mode, err := Handler.kmsEncrypt(s.BaseSecret, s.URL, s.MasterKey, s.config.kmsID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.Status = s.config.KMSOptions.EncryptedStatus
|
||||
s.Payload = payload
|
||||
s.Key = key
|
||||
s.Mode = int(mode)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *kmsPluginSecretProvider) Decrypt() error {
|
||||
if !s.IsEncrypted() {
|
||||
return kms.ErrWrongSecretStatus
|
||||
}
|
||||
payload, err := Handler.kmsDecrypt(s.BaseSecret, s.URL, s.MasterKey, s.config.kmsID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.Status = kms.SecretStatusPlain
|
||||
s.Payload = payload
|
||||
s.Key = ""
|
||||
s.AdditionalData = ""
|
||||
s.Mode = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *kmsPluginSecretProvider) Clone() kms.SecretProvider {
|
||||
baseSecret := kms.BaseSecret{
|
||||
Status: s.Status,
|
||||
Payload: s.Payload,
|
||||
Key: s.Key,
|
||||
AdditionalData: s.AdditionalData,
|
||||
Mode: s.Mode,
|
||||
}
|
||||
return s.config.newKMSPluginSecretProvider(baseSecret, s.URL, s.MasterKey)
|
||||
}
|
||||
82
plugin/metadata.go
Normal file
82
plugin/metadata.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/metadata"
|
||||
)
|
||||
|
||||
type metadataPlugin struct {
|
||||
config Config
|
||||
metadater metadata.Metadater
|
||||
client *plugin.Client
|
||||
}
|
||||
|
||||
func newMetadaterPlugin(config Config) (*metadataPlugin, error) {
|
||||
p := &metadataPlugin{
|
||||
config: config,
|
||||
}
|
||||
if err := p.initialize(); err != nil {
|
||||
logger.Warn(logSender, "", "unable to create metadata plugin: %v, config %+v", err, config)
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *metadataPlugin) exited() bool {
|
||||
return p.client.Exited()
|
||||
}
|
||||
|
||||
func (p *metadataPlugin) cleanup() {
|
||||
p.client.Kill()
|
||||
}
|
||||
|
||||
func (p *metadataPlugin) initialize() error {
|
||||
killProcess(p.config.Cmd)
|
||||
logger.Debug(logSender, "", "create new metadata plugin %#v", p.config.Cmd)
|
||||
var secureConfig *plugin.SecureConfig
|
||||
if p.config.SHA256Sum != "" {
|
||||
secureConfig.Checksum = []byte(p.config.SHA256Sum)
|
||||
secureConfig.Hash = sha256.New()
|
||||
}
|
||||
client := plugin.NewClient(&plugin.ClientConfig{
|
||||
HandshakeConfig: metadata.Handshake,
|
||||
Plugins: metadata.PluginMap,
|
||||
Cmd: exec.Command(p.config.Cmd, p.config.Args...),
|
||||
AllowedProtocols: []plugin.Protocol{
|
||||
plugin.ProtocolGRPC,
|
||||
},
|
||||
Managed: false,
|
||||
AutoMTLS: p.config.AutoMTLS,
|
||||
SecureConfig: secureConfig,
|
||||
Logger: &logger.HCLogAdapter{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{
|
||||
Name: fmt.Sprintf("%v.%v", logSender, metadata.PluginName),
|
||||
Level: pluginsLogLevel,
|
||||
DisableTime: true,
|
||||
}),
|
||||
},
|
||||
})
|
||||
rpcClient, err := client.Client()
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
raw, err := rpcClient.Dispense(metadata.PluginName)
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
|
||||
metadata.PluginName, p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
|
||||
p.client = client
|
||||
p.metadater = raw.(metadata.Metadater)
|
||||
|
||||
return nil
|
||||
}
|
||||
250
plugin/notifier.go
Normal file
250
plugin/notifier.go
Normal file
@@ -0,0 +1,250 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
|
||||
"github.com/drakkan/sftpgo/v2/util"
|
||||
)
|
||||
|
||||
// NotifierConfig defines configuration parameters for notifiers plugins
|
||||
type NotifierConfig struct {
|
||||
FsEvents []string `json:"fs_events" mapstructure:"fs_events"`
|
||||
ProviderEvents []string `json:"provider_events" mapstructure:"provider_events"`
|
||||
ProviderObjects []string `json:"provider_objects" mapstructure:"provider_objects"`
|
||||
RetryMaxTime int `json:"retry_max_time" mapstructure:"retry_max_time"`
|
||||
RetryQueueMaxSize int `json:"retry_queue_max_size" mapstructure:"retry_queue_max_size"`
|
||||
}
|
||||
|
||||
func (c *NotifierConfig) hasActions() bool {
|
||||
if len(c.FsEvents) > 0 {
|
||||
return true
|
||||
}
|
||||
if len(c.ProviderEvents) > 0 && len(c.ProviderObjects) > 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type eventsQueue struct {
|
||||
sync.RWMutex
|
||||
fsEvents []*notifier.FsEvent
|
||||
providerEvents []*notifier.ProviderEvent
|
||||
}
|
||||
|
||||
func (q *eventsQueue) addFsEvent(event *notifier.FsEvent) {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
q.fsEvents = append(q.fsEvents, event)
|
||||
}
|
||||
|
||||
func (q *eventsQueue) addProviderEvent(event *notifier.ProviderEvent) {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
q.providerEvents = append(q.providerEvents, event)
|
||||
}
|
||||
|
||||
func (q *eventsQueue) popFsEvent() *notifier.FsEvent {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
if len(q.fsEvents) == 0 {
|
||||
return nil
|
||||
}
|
||||
truncLen := len(q.fsEvents) - 1
|
||||
ev := q.fsEvents[truncLen]
|
||||
q.fsEvents[truncLen] = nil
|
||||
q.fsEvents = q.fsEvents[:truncLen]
|
||||
|
||||
return ev
|
||||
}
|
||||
|
||||
func (q *eventsQueue) popProviderEvent() *notifier.ProviderEvent {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
if len(q.providerEvents) == 0 {
|
||||
return nil
|
||||
}
|
||||
truncLen := len(q.providerEvents) - 1
|
||||
ev := q.providerEvents[truncLen]
|
||||
q.providerEvents[truncLen] = nil
|
||||
q.providerEvents = q.providerEvents[:truncLen]
|
||||
|
||||
return ev
|
||||
}
|
||||
|
||||
func (q *eventsQueue) getSize() int {
|
||||
q.RLock()
|
||||
defer q.RUnlock()
|
||||
|
||||
return len(q.providerEvents) + len(q.fsEvents)
|
||||
}
|
||||
|
||||
type notifierPlugin struct {
|
||||
config Config
|
||||
notifier notifier.Notifier
|
||||
client *plugin.Client
|
||||
queue *eventsQueue
|
||||
}
|
||||
|
||||
func newNotifierPlugin(config Config) (*notifierPlugin, error) {
|
||||
p := ¬ifierPlugin{
|
||||
config: config,
|
||||
queue: &eventsQueue{},
|
||||
}
|
||||
if err := p.initialize(); err != nil {
|
||||
logger.Warn(logSender, "", "unable to create notifier plugin: %v, config %+v", err, config)
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) exited() bool {
|
||||
return p.client.Exited()
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) cleanup() {
|
||||
p.client.Kill()
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) initialize() error {
|
||||
killProcess(p.config.Cmd)
|
||||
logger.Debug(logSender, "", "create new notifier plugin %#v", p.config.Cmd)
|
||||
if !p.config.NotifierOptions.hasActions() {
|
||||
return fmt.Errorf("no actions defined for the notifier plugin %#v", p.config.Cmd)
|
||||
}
|
||||
var secureConfig *plugin.SecureConfig
|
||||
if p.config.SHA256Sum != "" {
|
||||
secureConfig.Checksum = []byte(p.config.SHA256Sum)
|
||||
secureConfig.Hash = sha256.New()
|
||||
}
|
||||
client := plugin.NewClient(&plugin.ClientConfig{
|
||||
HandshakeConfig: notifier.Handshake,
|
||||
Plugins: notifier.PluginMap,
|
||||
Cmd: exec.Command(p.config.Cmd, p.config.Args...),
|
||||
AllowedProtocols: []plugin.Protocol{
|
||||
plugin.ProtocolGRPC,
|
||||
},
|
||||
AutoMTLS: p.config.AutoMTLS,
|
||||
SecureConfig: secureConfig,
|
||||
Managed: false,
|
||||
Logger: &logger.HCLogAdapter{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{
|
||||
Name: fmt.Sprintf("%v.%v", logSender, notifier.PluginName),
|
||||
Level: pluginsLogLevel,
|
||||
DisableTime: true,
|
||||
}),
|
||||
},
|
||||
})
|
||||
rpcClient, err := client.Client()
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
raw, err := rpcClient.Dispense(notifier.PluginName)
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
|
||||
notifier.PluginName, p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
|
||||
p.client = client
|
||||
p.notifier = raw.(notifier.Notifier)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) canQueueEvent(timestamp int64) bool {
|
||||
if p.config.NotifierOptions.RetryMaxTime == 0 {
|
||||
return false
|
||||
}
|
||||
if time.Now().After(time.Unix(0, timestamp).Add(time.Duration(p.config.NotifierOptions.RetryMaxTime) * time.Second)) {
|
||||
logger.Warn(logSender, "", "dropping too late event for plugin %v, event timestamp: %v",
|
||||
p.config.Cmd, time.Unix(0, timestamp))
|
||||
return false
|
||||
}
|
||||
if p.config.NotifierOptions.RetryQueueMaxSize > 0 {
|
||||
return p.queue.getSize() < p.config.NotifierOptions.RetryQueueMaxSize
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) notifyFsAction(event *notifier.FsEvent) {
|
||||
if !util.IsStringInSlice(event.Action, p.config.NotifierOptions.FsEvents) {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
p.sendFsEvent(event)
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) notifyProviderAction(event *notifier.ProviderEvent, object Renderer) {
|
||||
if !util.IsStringInSlice(event.Action, p.config.NotifierOptions.ProviderEvents) ||
|
||||
!util.IsStringInSlice(event.ObjectType, p.config.NotifierOptions.ProviderObjects) {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
objectAsJSON, err := object.RenderAsJSON(event.Action != "delete")
|
||||
if err != nil {
|
||||
logger.Warn(logSender, "", "unable to render user as json for action %v: %v", event.Action, err)
|
||||
return
|
||||
}
|
||||
event.ObjectData = objectAsJSON
|
||||
p.sendProviderEvent(event)
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) sendFsEvent(event *notifier.FsEvent) {
|
||||
if err := p.notifier.NotifyFsEvent(event); err != nil {
|
||||
logger.Warn(logSender, "", "unable to send fs action notification to plugin %v: %v", p.config.Cmd, err)
|
||||
if p.canQueueEvent(event.Timestamp) {
|
||||
p.queue.addFsEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) sendProviderEvent(event *notifier.ProviderEvent) {
|
||||
if err := p.notifier.NotifyProviderEvent(event); err != nil {
|
||||
logger.Warn(logSender, "", "unable to send user action notification to plugin %v: %v", p.config.Cmd, err)
|
||||
if p.canQueueEvent(event.Timestamp) {
|
||||
p.queue.addProviderEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *notifierPlugin) sendQueuedEvents() {
|
||||
queueSize := p.queue.getSize()
|
||||
if queueSize == 0 {
|
||||
return
|
||||
}
|
||||
logger.Debug(logSender, "", "check queued events for notifier %#v, events size: %v", p.config.Cmd, queueSize)
|
||||
fsEv := p.queue.popFsEvent()
|
||||
for fsEv != nil {
|
||||
go func(ev *notifier.FsEvent) {
|
||||
p.sendFsEvent(ev)
|
||||
}(fsEv)
|
||||
fsEv = p.queue.popFsEvent()
|
||||
}
|
||||
|
||||
providerEv := p.queue.popProviderEvent()
|
||||
for providerEv != nil {
|
||||
go func(ev *notifier.ProviderEvent) {
|
||||
p.sendProviderEvent(ev)
|
||||
}(providerEv)
|
||||
providerEv = p.queue.popProviderEvent()
|
||||
}
|
||||
logger.Debug(logSender, "", "queued events sent for notifier %#v, new events size: %v", p.config.Cmd, p.queue.getSize())
|
||||
}
|
||||
666
plugin/plugin.go
Normal file
666
plugin/plugin.go
Normal file
@@ -0,0 +1,666 @@
|
||||
// Package plugin provides support for the SFTPGo plugin system
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/kms"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/auth"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/eventsearcher"
|
||||
kmsplugin "github.com/drakkan/sftpgo/v2/sdk/plugin/kms"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/metadata"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/notifier"
|
||||
"github.com/drakkan/sftpgo/v2/util"
|
||||
)
|
||||
|
||||
const (
|
||||
logSender = "plugins"
|
||||
)
|
||||
|
||||
var (
|
||||
// Handler defines the plugins manager
|
||||
Handler Manager
|
||||
pluginsLogLevel = hclog.Debug
|
||||
// ErrNoSearcher defines the error to return for events searches if no plugin is configured
|
||||
ErrNoSearcher = errors.New("no events searcher plugin defined")
|
||||
// ErrNoMetadater returns the error to return for metadata methods if no plugin is configured
|
||||
ErrNoMetadater = errors.New("no metadata plugin defined")
|
||||
)
|
||||
|
||||
// Renderer defines the interface for generic objects rendering
|
||||
type Renderer interface {
|
||||
RenderAsJSON(reload bool) ([]byte, error)
|
||||
}
|
||||
|
||||
// Config defines a plugin configuration
|
||||
type Config struct {
|
||||
// Plugin type
|
||||
Type string `json:"type" mapstructure:"type"`
|
||||
// NotifierOptions defines options for notifiers plugins
|
||||
NotifierOptions NotifierConfig `json:"notifier_options" mapstructure:"notifier_options"`
|
||||
// KMSOptions defines options for a KMS plugin
|
||||
KMSOptions KMSConfig `json:"kms_options" mapstructure:"kms_options"`
|
||||
// AuthOptions defines options for authentication plugins
|
||||
AuthOptions AuthConfig `json:"auth_options" mapstructure:"auth_options"`
|
||||
// Path to the plugin executable
|
||||
Cmd string `json:"cmd" mapstructure:"cmd"`
|
||||
// Args to pass to the plugin executable
|
||||
Args []string `json:"args" mapstructure:"args"`
|
||||
// SHA256 checksum for the plugin executable.
|
||||
// If not empty it will be used to verify the integrity of the executable
|
||||
SHA256Sum string `json:"sha256sum" mapstructure:"sha256sum"`
|
||||
// If enabled the client and the server automatically negotiate mTLS for
|
||||
// transport authentication. This ensures that only the original client will
|
||||
// be allowed to connect to the server, and all other connections will be
|
||||
// rejected. The client will also refuse to connect to any server that isn't
|
||||
// the original instance started by the client.
|
||||
AutoMTLS bool `json:"auto_mtls" mapstructure:"auto_mtls"`
|
||||
// unique identifier for kms plugins
|
||||
kmsID int
|
||||
}
|
||||
|
||||
func (c *Config) newKMSPluginSecretProvider(base kms.BaseSecret, url, masterKey string) kms.SecretProvider {
|
||||
return &kmsPluginSecretProvider{
|
||||
BaseSecret: base,
|
||||
URL: url,
|
||||
MasterKey: masterKey,
|
||||
config: c,
|
||||
}
|
||||
}
|
||||
|
||||
// Manager handles enabled plugins
|
||||
type Manager struct {
|
||||
closed int32
|
||||
done chan bool
|
||||
// List of configured plugins
|
||||
Configs []Config `json:"plugins" mapstructure:"plugins"`
|
||||
notifLock sync.RWMutex
|
||||
notifiers []*notifierPlugin
|
||||
kmsLock sync.RWMutex
|
||||
kms []*kmsPlugin
|
||||
authLock sync.RWMutex
|
||||
auths []*authPlugin
|
||||
searcherLock sync.RWMutex
|
||||
searcher *searcherPlugin
|
||||
metadaterLock sync.RWMutex
|
||||
metadater *metadataPlugin
|
||||
authScopes int
|
||||
hasSearcher bool
|
||||
hasMetadater bool
|
||||
hasNotifiers bool
|
||||
}
|
||||
|
||||
// Initialize initializes the configured plugins
|
||||
func Initialize(configs []Config, logVerbose bool) error {
|
||||
logger.Debug(logSender, "", "initialize")
|
||||
Handler = Manager{
|
||||
Configs: configs,
|
||||
done: make(chan bool),
|
||||
closed: 0,
|
||||
authScopes: -1,
|
||||
}
|
||||
setLogLevel(logVerbose)
|
||||
if len(configs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := Handler.validateConfigs(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kmsID := 0
|
||||
for idx, config := range Handler.Configs {
|
||||
switch config.Type {
|
||||
case notifier.PluginName:
|
||||
plugin, err := newNotifierPlugin(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Handler.notifiers = append(Handler.notifiers, plugin)
|
||||
case kmsplugin.PluginName:
|
||||
plugin, err := newKMSPlugin(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Handler.kms = append(Handler.kms, plugin)
|
||||
Handler.Configs[idx].kmsID = kmsID
|
||||
kmsID++
|
||||
kms.RegisterSecretProvider(config.KMSOptions.Scheme, config.KMSOptions.EncryptedStatus,
|
||||
Handler.Configs[idx].newKMSPluginSecretProvider)
|
||||
logger.Debug(logSender, "", "registered secret provider for scheme: %v, encrypted status: %v",
|
||||
config.KMSOptions.Scheme, config.KMSOptions.EncryptedStatus)
|
||||
case auth.PluginName:
|
||||
plugin, err := newAuthPlugin(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Handler.auths = append(Handler.auths, plugin)
|
||||
if Handler.authScopes == -1 {
|
||||
Handler.authScopes = config.AuthOptions.Scope
|
||||
} else {
|
||||
Handler.authScopes |= config.AuthOptions.Scope
|
||||
}
|
||||
case eventsearcher.PluginName:
|
||||
plugin, err := newSearcherPlugin(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Handler.searcher = plugin
|
||||
case metadata.PluginName:
|
||||
plugin, err := newMetadaterPlugin(config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Handler.metadater = plugin
|
||||
default:
|
||||
return fmt.Errorf("unsupported plugin type: %v", config.Type)
|
||||
}
|
||||
}
|
||||
startCheckTicker()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) validateConfigs() error {
|
||||
kmsSchemes := make(map[string]bool)
|
||||
kmsEncryptions := make(map[string]bool)
|
||||
m.hasSearcher = false
|
||||
m.hasMetadater = false
|
||||
m.hasNotifiers = false
|
||||
|
||||
for _, config := range m.Configs {
|
||||
if config.Type == kmsplugin.PluginName {
|
||||
if _, ok := kmsSchemes[config.KMSOptions.Scheme]; ok {
|
||||
return fmt.Errorf("invalid KMS configuration, duplicated scheme %#v", config.KMSOptions.Scheme)
|
||||
}
|
||||
if _, ok := kmsEncryptions[config.KMSOptions.EncryptedStatus]; ok {
|
||||
return fmt.Errorf("invalid KMS configuration, duplicated encrypted status %#v", config.KMSOptions.EncryptedStatus)
|
||||
}
|
||||
kmsSchemes[config.KMSOptions.Scheme] = true
|
||||
kmsEncryptions[config.KMSOptions.EncryptedStatus] = true
|
||||
}
|
||||
if config.Type == eventsearcher.PluginName {
|
||||
if m.hasSearcher {
|
||||
return errors.New("only one eventsearcher plugin can be defined")
|
||||
}
|
||||
m.hasSearcher = true
|
||||
}
|
||||
if config.Type == metadata.PluginName {
|
||||
if m.hasMetadater {
|
||||
return errors.New("only one metadata plugin can be defined")
|
||||
}
|
||||
m.hasMetadater = true
|
||||
}
|
||||
if config.Type == notifier.PluginName {
|
||||
m.hasNotifiers = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasNotifiers returns true if there is at least a notifier plugin
|
||||
func (m *Manager) HasNotifiers() bool {
|
||||
return m.hasNotifiers
|
||||
}
|
||||
|
||||
// NotifyFsEvent sends the fs event notifications using any defined notifier plugins
|
||||
func (m *Manager) NotifyFsEvent(event *notifier.FsEvent) {
|
||||
m.notifLock.RLock()
|
||||
defer m.notifLock.RUnlock()
|
||||
|
||||
for _, n := range m.notifiers {
|
||||
n.notifyFsAction(event)
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyProviderEvent sends the provider event notifications using any defined notifier plugins
|
||||
func (m *Manager) NotifyProviderEvent(event *notifier.ProviderEvent, object Renderer) {
|
||||
m.notifLock.RLock()
|
||||
defer m.notifLock.RUnlock()
|
||||
|
||||
for _, n := range m.notifiers {
|
||||
n.notifyProviderAction(event, object)
|
||||
}
|
||||
}
|
||||
|
||||
// SearchFsEvents returns the filesystem events matching the specified filters
|
||||
func (m *Manager) SearchFsEvents(searchFilters *eventsearcher.FsEventSearch) ([]byte, []string, []string, error) {
|
||||
if !m.hasSearcher {
|
||||
return nil, nil, nil, ErrNoSearcher
|
||||
}
|
||||
m.searcherLock.RLock()
|
||||
plugin := m.searcher
|
||||
m.searcherLock.RUnlock()
|
||||
|
||||
return plugin.searchear.SearchFsEvents(searchFilters)
|
||||
}
|
||||
|
||||
// SearchProviderEvents returns the provider events matching the specified filters
|
||||
func (m *Manager) SearchProviderEvents(searchFilters *eventsearcher.ProviderEventSearch) ([]byte, []string, []string, error) {
|
||||
if !m.hasSearcher {
|
||||
return nil, nil, nil, ErrNoSearcher
|
||||
}
|
||||
m.searcherLock.RLock()
|
||||
plugin := m.searcher
|
||||
m.searcherLock.RUnlock()
|
||||
|
||||
return plugin.searchear.SearchProviderEvents(searchFilters)
|
||||
}
|
||||
|
||||
// HasMetadater returns true if a metadata plugin is defined
|
||||
func (m *Manager) HasMetadater() bool {
|
||||
return m.hasMetadater
|
||||
}
|
||||
|
||||
// SetModificationTime sets the modification time for the specified object
|
||||
func (m *Manager) SetModificationTime(storageID, objectPath string, mTime int64) error {
|
||||
if !m.hasMetadater {
|
||||
return ErrNoMetadater
|
||||
}
|
||||
m.metadaterLock.RLock()
|
||||
plugin := m.metadater
|
||||
m.metadaterLock.RUnlock()
|
||||
|
||||
return plugin.metadater.SetModificationTime(storageID, objectPath, mTime)
|
||||
}
|
||||
|
||||
// GetModificationTime returns the modification time for the specified path
|
||||
func (m *Manager) GetModificationTime(storageID, objectPath string, isDir bool) (int64, error) {
|
||||
if !m.hasMetadater {
|
||||
return 0, ErrNoMetadater
|
||||
}
|
||||
m.metadaterLock.RLock()
|
||||
plugin := m.metadater
|
||||
m.metadaterLock.RUnlock()
|
||||
|
||||
return plugin.metadater.GetModificationTime(storageID, objectPath)
|
||||
}
|
||||
|
||||
// GetModificationTimes returns the modification times for all the files within the specified folder
|
||||
func (m *Manager) GetModificationTimes(storageID, objectPath string) (map[string]int64, error) {
|
||||
if !m.hasMetadater {
|
||||
return nil, ErrNoMetadater
|
||||
}
|
||||
m.metadaterLock.RLock()
|
||||
plugin := m.metadater
|
||||
m.metadaterLock.RUnlock()
|
||||
|
||||
return plugin.metadater.GetModificationTimes(storageID, objectPath)
|
||||
}
|
||||
|
||||
// RemoveMetadata deletes the metadata stored for the specified object
|
||||
func (m *Manager) RemoveMetadata(storageID, objectPath string) error {
|
||||
if !m.hasMetadater {
|
||||
return ErrNoMetadater
|
||||
}
|
||||
m.metadaterLock.RLock()
|
||||
plugin := m.metadater
|
||||
m.metadaterLock.RUnlock()
|
||||
|
||||
return plugin.metadater.RemoveMetadata(storageID, objectPath)
|
||||
}
|
||||
|
||||
// GetMetadataFolders returns the folders that metadata is associated with
|
||||
func (m *Manager) GetMetadataFolders(storageID, from string, limit int) ([]string, error) {
|
||||
if !m.hasMetadater {
|
||||
return nil, ErrNoMetadater
|
||||
}
|
||||
m.metadaterLock.RLock()
|
||||
plugin := m.metadater
|
||||
m.metadaterLock.RUnlock()
|
||||
|
||||
return plugin.metadater.GetFolders(storageID, limit, from)
|
||||
}
|
||||
|
||||
func (m *Manager) kmsEncrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, string, int32, error) {
|
||||
m.kmsLock.RLock()
|
||||
plugin := m.kms[kmsID]
|
||||
m.kmsLock.RUnlock()
|
||||
|
||||
return plugin.Encrypt(secret, url, masterKey)
|
||||
}
|
||||
|
||||
func (m *Manager) kmsDecrypt(secret kms.BaseSecret, url string, masterKey string, kmsID int) (string, error) {
|
||||
m.kmsLock.RLock()
|
||||
plugin := m.kms[kmsID]
|
||||
m.kmsLock.RUnlock()
|
||||
|
||||
return plugin.Decrypt(secret, url, masterKey)
|
||||
}
|
||||
|
||||
// HasAuthScope returns true if there is an auth plugin that support the specified scope
|
||||
func (m *Manager) HasAuthScope(scope int) bool {
|
||||
if m.authScopes == -1 {
|
||||
return false
|
||||
}
|
||||
return m.authScopes&scope != 0
|
||||
}
|
||||
|
||||
// Authenticate tries to authenticate the specified user using an external plugin
|
||||
func (m *Manager) Authenticate(username, password, ip, protocol string, pkey string,
|
||||
tlsCert *x509.Certificate, authScope int, userAsJSON []byte,
|
||||
) ([]byte, error) {
|
||||
switch authScope {
|
||||
case AuthScopePassword:
|
||||
return m.checkUserAndPass(username, password, ip, protocol, userAsJSON)
|
||||
case AuthScopePublicKey:
|
||||
return m.checkUserAndPublicKey(username, pkey, ip, protocol, userAsJSON)
|
||||
case AuthScopeKeyboardInteractive:
|
||||
return m.checkUserAndKeyboardInteractive(username, ip, protocol, userAsJSON)
|
||||
case AuthScopeTLSCertificate:
|
||||
cert, err := util.EncodeTLSCertToPem(tlsCert)
|
||||
if err != nil {
|
||||
logger.Error(logSender, "", "unable to encode tls certificate to pem: %v", err)
|
||||
return nil, fmt.Errorf("unable to encode tls cert to pem: %w", err)
|
||||
}
|
||||
return m.checkUserAndTLSCert(username, cert, ip, protocol, userAsJSON)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported auth scope: %v", authScope)
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteKeyboardInteractiveStep executes a keyboard interactive step
|
||||
func (m *Manager) ExecuteKeyboardInteractiveStep(req *KeyboardAuthRequest) (*KeyboardAuthResponse, error) {
|
||||
var plugin *authPlugin
|
||||
|
||||
m.authLock.Lock()
|
||||
for _, p := range m.auths {
|
||||
if p.config.AuthOptions.Scope&AuthScopePassword != 0 {
|
||||
plugin = p
|
||||
break
|
||||
}
|
||||
}
|
||||
m.authLock.Unlock()
|
||||
|
||||
if plugin == nil {
|
||||
return nil, errors.New("no auth plugin configured for keyaboard interactive authentication step")
|
||||
}
|
||||
|
||||
return plugin.sendKeyboardIteractiveRequest(req)
|
||||
}
|
||||
|
||||
func (m *Manager) checkUserAndPass(username, password, ip, protocol string, userAsJSON []byte) ([]byte, error) {
|
||||
var plugin *authPlugin
|
||||
|
||||
m.authLock.Lock()
|
||||
for _, p := range m.auths {
|
||||
if p.config.AuthOptions.Scope&AuthScopePassword != 0 {
|
||||
plugin = p
|
||||
break
|
||||
}
|
||||
}
|
||||
m.authLock.Unlock()
|
||||
|
||||
if plugin == nil {
|
||||
return nil, errors.New("no auth plugin configured for password checking")
|
||||
}
|
||||
|
||||
return plugin.checkUserAndPass(username, password, ip, protocol, userAsJSON)
|
||||
}
|
||||
|
||||
func (m *Manager) checkUserAndPublicKey(username, pubKey, ip, protocol string, userAsJSON []byte) ([]byte, error) {
|
||||
var plugin *authPlugin
|
||||
|
||||
m.authLock.Lock()
|
||||
for _, p := range m.auths {
|
||||
if p.config.AuthOptions.Scope&AuthScopePublicKey != 0 {
|
||||
plugin = p
|
||||
break
|
||||
}
|
||||
}
|
||||
m.authLock.Unlock()
|
||||
|
||||
if plugin == nil {
|
||||
return nil, errors.New("no auth plugin configured for public key checking")
|
||||
}
|
||||
|
||||
return plugin.checkUserAndPublicKey(username, pubKey, ip, protocol, userAsJSON)
|
||||
}
|
||||
|
||||
func (m *Manager) checkUserAndTLSCert(username, tlsCert, ip, protocol string, userAsJSON []byte) ([]byte, error) {
|
||||
var plugin *authPlugin
|
||||
|
||||
m.authLock.Lock()
|
||||
for _, p := range m.auths {
|
||||
if p.config.AuthOptions.Scope&AuthScopeTLSCertificate != 0 {
|
||||
plugin = p
|
||||
break
|
||||
}
|
||||
}
|
||||
m.authLock.Unlock()
|
||||
|
||||
if plugin == nil {
|
||||
return nil, errors.New("no auth plugin configured for TLS certificate checking")
|
||||
}
|
||||
|
||||
return plugin.checkUserAndTLSCertificate(username, tlsCert, ip, protocol, userAsJSON)
|
||||
}
|
||||
|
||||
func (m *Manager) checkUserAndKeyboardInteractive(username, ip, protocol string, userAsJSON []byte) ([]byte, error) {
|
||||
var plugin *authPlugin
|
||||
|
||||
m.authLock.Lock()
|
||||
for _, p := range m.auths {
|
||||
if p.config.AuthOptions.Scope&AuthScopeKeyboardInteractive != 0 {
|
||||
plugin = p
|
||||
break
|
||||
}
|
||||
}
|
||||
m.authLock.Unlock()
|
||||
|
||||
if plugin == nil {
|
||||
return nil, errors.New("no auth plugin configured for keyboard interactive checking")
|
||||
}
|
||||
|
||||
return plugin.checkUserAndKeyboardInteractive(username, ip, protocol, userAsJSON)
|
||||
}
|
||||
|
||||
func (m *Manager) checkCrashedPlugins() {
|
||||
m.notifLock.RLock()
|
||||
for idx, n := range m.notifiers {
|
||||
if n.exited() {
|
||||
defer func(cfg Config, index int) {
|
||||
Handler.restartNotifierPlugin(cfg, index)
|
||||
}(n.config, idx)
|
||||
} else {
|
||||
n.sendQueuedEvents()
|
||||
}
|
||||
}
|
||||
m.notifLock.RUnlock()
|
||||
|
||||
m.kmsLock.RLock()
|
||||
for idx, k := range m.kms {
|
||||
if k.exited() {
|
||||
defer func(cfg Config, index int) {
|
||||
Handler.restartKMSPlugin(cfg, index)
|
||||
}(k.config, idx)
|
||||
}
|
||||
}
|
||||
m.kmsLock.RUnlock()
|
||||
|
||||
m.authLock.RLock()
|
||||
for idx, a := range m.auths {
|
||||
if a.exited() {
|
||||
defer func(cfg Config, index int) {
|
||||
Handler.restartAuthPlugin(cfg, index)
|
||||
}(a.config, idx)
|
||||
}
|
||||
}
|
||||
m.authLock.RUnlock()
|
||||
|
||||
if m.hasSearcher {
|
||||
m.searcherLock.RLock()
|
||||
if m.searcher.exited() {
|
||||
defer func(cfg Config) {
|
||||
Handler.restartSearcherPlugin(cfg)
|
||||
}(m.searcher.config)
|
||||
}
|
||||
m.searcherLock.RUnlock()
|
||||
}
|
||||
|
||||
if m.hasMetadater {
|
||||
m.metadaterLock.RLock()
|
||||
if m.metadater.exited() {
|
||||
defer func(cfg Config) {
|
||||
Handler.restartMetadaterPlugin(cfg)
|
||||
}(m.metadater.config)
|
||||
}
|
||||
m.metadaterLock.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) restartNotifierPlugin(config Config, idx int) {
|
||||
if atomic.LoadInt32(&m.closed) == 1 {
|
||||
return
|
||||
}
|
||||
logger.Info(logSender, "", "try to restart crashed notifier plugin %#v, idx: %v", config.Cmd, idx)
|
||||
plugin, err := newNotifierPlugin(config)
|
||||
if err != nil {
|
||||
logger.Error(logSender, "", "unable to restart notifier plugin %#v, err: %v", config.Cmd, err)
|
||||
return
|
||||
}
|
||||
|
||||
m.notifLock.Lock()
|
||||
plugin.queue = m.notifiers[idx].queue
|
||||
m.notifiers[idx] = plugin
|
||||
m.notifLock.Unlock()
|
||||
plugin.sendQueuedEvents()
|
||||
}
|
||||
|
||||
func (m *Manager) restartKMSPlugin(config Config, idx int) {
|
||||
if atomic.LoadInt32(&m.closed) == 1 {
|
||||
return
|
||||
}
|
||||
logger.Info(logSender, "", "try to restart crashed kms plugin %#v, idx: %v", config.Cmd, idx)
|
||||
plugin, err := newKMSPlugin(config)
|
||||
if err != nil {
|
||||
logger.Error(logSender, "", "unable to restart kms plugin %#v, err: %v", config.Cmd, err)
|
||||
return
|
||||
}
|
||||
|
||||
m.kmsLock.Lock()
|
||||
m.kms[idx] = plugin
|
||||
m.kmsLock.Unlock()
|
||||
}
|
||||
|
||||
func (m *Manager) restartAuthPlugin(config Config, idx int) {
|
||||
if atomic.LoadInt32(&m.closed) == 1 {
|
||||
return
|
||||
}
|
||||
logger.Info(logSender, "", "try to restart crashed auth plugin %#v, idx: %v", config.Cmd, idx)
|
||||
plugin, err := newAuthPlugin(config)
|
||||
if err != nil {
|
||||
logger.Error(logSender, "", "unable to restart auth plugin %#v, err: %v", config.Cmd, err)
|
||||
return
|
||||
}
|
||||
|
||||
m.authLock.Lock()
|
||||
m.auths[idx] = plugin
|
||||
m.authLock.Unlock()
|
||||
}
|
||||
|
||||
func (m *Manager) restartSearcherPlugin(config Config) {
|
||||
if atomic.LoadInt32(&m.closed) == 1 {
|
||||
return
|
||||
}
|
||||
logger.Info(logSender, "", "try to restart crashed searcher plugin %#v", config.Cmd)
|
||||
plugin, err := newSearcherPlugin(config)
|
||||
if err != nil {
|
||||
logger.Error(logSender, "", "unable to restart searcher plugin %#v, err: %v", config.Cmd, err)
|
||||
return
|
||||
}
|
||||
|
||||
m.searcherLock.Lock()
|
||||
m.searcher = plugin
|
||||
m.searcherLock.Unlock()
|
||||
}
|
||||
|
||||
func (m *Manager) restartMetadaterPlugin(config Config) {
|
||||
if atomic.LoadInt32(&m.closed) == 1 {
|
||||
return
|
||||
}
|
||||
logger.Info(logSender, "", "try to restart crashed metadater plugin %#v", config.Cmd)
|
||||
plugin, err := newMetadaterPlugin(config)
|
||||
if err != nil {
|
||||
logger.Error(logSender, "", "unable to restart metadater plugin %#v, err: %v", config.Cmd, err)
|
||||
return
|
||||
}
|
||||
|
||||
m.metadaterLock.Lock()
|
||||
m.metadater = plugin
|
||||
m.metadaterLock.Unlock()
|
||||
}
|
||||
|
||||
// Cleanup releases all the active plugins
|
||||
func (m *Manager) Cleanup() {
|
||||
logger.Debug(logSender, "", "cleanup")
|
||||
atomic.StoreInt32(&m.closed, 1)
|
||||
close(m.done)
|
||||
m.notifLock.Lock()
|
||||
for _, n := range m.notifiers {
|
||||
logger.Debug(logSender, "", "cleanup notifier plugin %v", n.config.Cmd)
|
||||
n.cleanup()
|
||||
}
|
||||
m.notifLock.Unlock()
|
||||
|
||||
m.kmsLock.Lock()
|
||||
for _, k := range m.kms {
|
||||
logger.Debug(logSender, "", "cleanup kms plugin %v", k.config.Cmd)
|
||||
k.cleanup()
|
||||
}
|
||||
m.kmsLock.Unlock()
|
||||
|
||||
m.authLock.Lock()
|
||||
for _, a := range m.auths {
|
||||
logger.Debug(logSender, "", "cleanup auth plugin %v", a.config.Cmd)
|
||||
a.cleanup()
|
||||
}
|
||||
m.authLock.Unlock()
|
||||
|
||||
if m.hasSearcher {
|
||||
m.searcherLock.Lock()
|
||||
logger.Debug(logSender, "", "cleanup searcher plugin %v", m.searcher.config.Cmd)
|
||||
m.searcher.cleanup()
|
||||
m.searcherLock.Unlock()
|
||||
}
|
||||
|
||||
if m.hasMetadater {
|
||||
m.metadaterLock.Lock()
|
||||
logger.Debug(logSender, "", "cleanup metadater plugin %v", m.metadater.config.Cmd)
|
||||
m.metadater.cleanup()
|
||||
m.metadaterLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func setLogLevel(logVerbose bool) {
|
||||
if logVerbose {
|
||||
pluginsLogLevel = hclog.Debug
|
||||
} else {
|
||||
pluginsLogLevel = hclog.Info
|
||||
}
|
||||
}
|
||||
|
||||
func startCheckTicker() {
|
||||
logger.Debug(logSender, "", "start plugins checker")
|
||||
checker := time.NewTicker(30 * time.Second)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-Handler.done:
|
||||
logger.Debug(logSender, "", "handler done, stop plugins checker")
|
||||
checker.Stop()
|
||||
return
|
||||
case <-checker.C:
|
||||
Handler.checkCrashedPlugins()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
82
plugin/searcher.go
Normal file
82
plugin/searcher.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
"github.com/drakkan/sftpgo/v2/sdk/plugin/eventsearcher"
|
||||
)
|
||||
|
||||
type searcherPlugin struct {
|
||||
config Config
|
||||
searchear eventsearcher.Searcher
|
||||
client *plugin.Client
|
||||
}
|
||||
|
||||
func newSearcherPlugin(config Config) (*searcherPlugin, error) {
|
||||
p := &searcherPlugin{
|
||||
config: config,
|
||||
}
|
||||
if err := p.initialize(); err != nil {
|
||||
logger.Warn(logSender, "", "unable to create events searcher plugin: %v, config %+v", err, config)
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *searcherPlugin) exited() bool {
|
||||
return p.client.Exited()
|
||||
}
|
||||
|
||||
func (p *searcherPlugin) cleanup() {
|
||||
p.client.Kill()
|
||||
}
|
||||
|
||||
func (p *searcherPlugin) initialize() error {
|
||||
killProcess(p.config.Cmd)
|
||||
logger.Debug(logSender, "", "create new searcher plugin %#v", p.config.Cmd)
|
||||
var secureConfig *plugin.SecureConfig
|
||||
if p.config.SHA256Sum != "" {
|
||||
secureConfig.Checksum = []byte(p.config.SHA256Sum)
|
||||
secureConfig.Hash = sha256.New()
|
||||
}
|
||||
client := plugin.NewClient(&plugin.ClientConfig{
|
||||
HandshakeConfig: eventsearcher.Handshake,
|
||||
Plugins: eventsearcher.PluginMap,
|
||||
Cmd: exec.Command(p.config.Cmd, p.config.Args...),
|
||||
AllowedProtocols: []plugin.Protocol{
|
||||
plugin.ProtocolGRPC,
|
||||
},
|
||||
AutoMTLS: p.config.AutoMTLS,
|
||||
SecureConfig: secureConfig,
|
||||
Managed: false,
|
||||
Logger: &logger.HCLogAdapter{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{
|
||||
Name: fmt.Sprintf("%v.%v", logSender, eventsearcher.PluginName),
|
||||
Level: pluginsLogLevel,
|
||||
DisableTime: true,
|
||||
}),
|
||||
},
|
||||
})
|
||||
rpcClient, err := client.Client()
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get rpc client for plugin %#v: %v", p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
raw, err := rpcClient.Dispense(eventsearcher.PluginName)
|
||||
if err != nil {
|
||||
logger.Debug(logSender, "", "unable to get plugin %v from rpc client for command %#v: %v",
|
||||
eventsearcher.PluginName, p.config.Cmd, err)
|
||||
return err
|
||||
}
|
||||
|
||||
p.client = client
|
||||
p.searchear = raw.(eventsearcher.Searcher)
|
||||
|
||||
return nil
|
||||
}
|
||||
25
plugin/util.go
Normal file
25
plugin/util.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"github.com/shirou/gopsutil/v3/process"
|
||||
|
||||
"github.com/drakkan/sftpgo/v2/logger"
|
||||
)
|
||||
|
||||
func killProcess(processPath string) {
|
||||
procs, err := process.Processes()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, p := range procs {
|
||||
cmdLine, err := p.Exe()
|
||||
if err == nil {
|
||||
if cmdLine == processPath {
|
||||
err = p.Kill()
|
||||
logger.Debug(logSender, "killed process %v, pid %v, err %v", cmdLine, p.Pid, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.Debug(logSender, "no match for plugin process %v", processPath)
|
||||
}
|
||||
Reference in New Issue
Block a user