file actions: add bucket and endpoint to notifications

The HTTP notifications are now invoked as POST and the notification is
a JSON inside the POST body.

This is a backward incompatible change but this way the actions can be
extended more easily, sorry for the trouble

Fixes #101
This commit is contained in:
Nicola Murino
2020-03-25 18:36:33 +01:00
parent e22d377203
commit 4759254e10
7 changed files with 149 additions and 72 deletions

View File

@@ -4,7 +4,9 @@
package sftpd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
@@ -127,6 +129,57 @@ type sshSubsystemExecMsg struct {
Command string
}
type actionNotification struct {
Action string `json:"action"`
Username string `json:"username"`
Path string `json:"path"`
TargetPath string `json:"target_path,omitempty"`
SSHCmd string `json:"ssh_cmd,omitempty"`
FileSize int64 `json:"file_size,omitempty"`
FsProvider int `json:"fs_provider"`
Bucket string `json:"bucket,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
}
func newActionNotification(user dataprovider.User, operation, filePath, target, sshCmd string, fileSize int64) actionNotification {
bucket := ""
endpoint := ""
if user.FsConfig.Provider == 1 {
bucket = user.FsConfig.S3Config.Bucket
endpoint = user.FsConfig.S3Config.Endpoint
} else if user.FsConfig.Provider == 2 {
bucket = user.FsConfig.GCSConfig.Bucket
}
return actionNotification{
Action: operation,
Username: user.Username,
Path: filePath,
TargetPath: target,
SSHCmd: sshCmd,
FileSize: fileSize,
FsProvider: user.FsConfig.Provider,
Bucket: bucket,
Endpoint: endpoint,
}
}
func (a *actionNotification) AsJSON() []byte {
res, _ := json.Marshal(a)
return res
}
func (a *actionNotification) AsEnvVars() []string {
return []string{fmt.Sprintf("SFTPGO_ACTION=%v", a.Action),
fmt.Sprintf("SFTPGO_ACTION_USERNAME=%v", a.Username),
fmt.Sprintf("SFTPGO_ACTION_PATH=%v", a.Path),
fmt.Sprintf("SFTPGO_ACTION_TARGET=%v", a.TargetPath),
fmt.Sprintf("SFTPGO_ACTION_SSH_CMD=%v", a.SSHCmd),
fmt.Sprintf("SFTPGO_ACTION_FILE_SIZE=%v", a.FileSize),
fmt.Sprintf("SFTPGO_ACTION_FS_PROVIDER=%v", a.FsProvider),
fmt.Sprintf("SFTPGO_ACTION_BUCKET=%v", a.Bucket),
fmt.Sprintf("SFTPGO_ACTION_ENDPOINT=%v", a.Endpoint)}
}
func init() {
openConnections = make(map[string]Connection)
idleConnectionTicker = time.NewTicker(5 * time.Minute)
@@ -415,80 +468,53 @@ func isAtomicUploadEnabled() bool {
return uploadMode == uploadModeAtomic || uploadMode == uploadModeAtomicWithResume
}
func executeNotificationCommand(operation, username, path, target, sshCmd, fileSize, isLocalFile string) error {
func executeNotificationCommand(a actionNotification) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, actions.Command, operation, username, path, target, sshCmd)
cmd.Env = append(os.Environ(),
fmt.Sprintf("SFTPGO_ACTION=%v", operation),
fmt.Sprintf("SFTPGO_ACTION_USERNAME=%v", username),
fmt.Sprintf("SFTPGO_ACTION_PATH=%v", path),
fmt.Sprintf("SFTPGO_ACTION_TARGET=%v", target),
fmt.Sprintf("SFTPGO_ACTION_SSH_CMD=%v", sshCmd),
fmt.Sprintf("SFTPGO_ACTION_FILE_SIZE=%v", fileSize),
fmt.Sprintf("SFTPGO_ACTION_LOCAL_FILE=%v", isLocalFile),
)
cmd := exec.CommandContext(ctx, actions.Command, a.Action, a.Username, a.Path, a.TargetPath, a.SSHCmd)
cmd.Env = append(os.Environ(), a.AsEnvVars()...)
startTime := time.Now()
err := cmd.Run()
logger.Debug(logSender, "", "executed command %#v with arguments: %#v, %#v, %#v, %#v, %#v, elapsed: %v, error: %v",
actions.Command, operation, username, path, target, sshCmd, time.Since(startTime), err)
actions.Command, a.Action, a.Username, a.Path, a.TargetPath, a.SSHCmd, time.Since(startTime), err)
return err
}
// executed in a goroutine
func executeAction(operation, username, path, target, sshCmd string, fileSize int64, isLocalFile bool) error {
if !utils.IsStringInSlice(operation, actions.ExecuteOn) {
func executeAction(a actionNotification) error {
if !utils.IsStringInSlice(a.Action, actions.ExecuteOn) {
return nil
}
var err error
size := ""
if fileSize > 0 {
size = fmt.Sprintf("%v", fileSize)
}
if len(actions.Command) > 0 && filepath.IsAbs(actions.Command) {
// we are in a goroutine but if we have to send an HTTP notification we don't want to wait for the
// end of the command
if len(actions.HTTPNotificationURL) > 0 {
go executeNotificationCommand(operation, username, path, target, sshCmd, size, fmt.Sprintf("%t", isLocalFile))
go executeNotificationCommand(a)
} else {
err = executeNotificationCommand(operation, username, path, target, sshCmd, size, fmt.Sprintf("%t", isLocalFile))
err = executeNotificationCommand(a)
}
}
if len(actions.HTTPNotificationURL) > 0 {
var url *url.URL
url, err = url.Parse(actions.HTTPNotificationURL)
if err == nil {
q := url.Query()
q.Add("action", operation)
q.Add("username", username)
q.Add("path", path)
if len(target) > 0 {
q.Add("target_path", target)
}
if len(sshCmd) > 0 {
q.Add("ssh_cmd", sshCmd)
}
if len(size) > 0 {
q.Add("file_size", size)
}
q.Add("local_file", fmt.Sprintf("%t", isLocalFile))
url.RawQuery = q.Encode()
startTime := time.Now()
httpClient := &http.Client{
Timeout: 15 * time.Second,
}
resp, err := httpClient.Get(url.String())
respCode := 0
if err == nil {
respCode = resp.StatusCode
resp.Body.Close()
}
logger.Debug(logSender, "", "notified operation %#v to URL: %v status code: %v, elapsed: %v err: %v",
operation, url.String(), respCode, time.Since(startTime), err)
} else {
if err != nil {
logger.Warn(logSender, "", "Invalid http_notification_url %#v for operation %#v: %v", actions.HTTPNotificationURL,
operation, err)
a.Action, err)
return err
}
startTime := time.Now()
httpClient := &http.Client{
Timeout: 15 * time.Second,
}
resp, err := httpClient.Post(url.String(), "application/json", bytes.NewBuffer(a.AsJSON()))
respCode := 0
if err == nil {
respCode = resp.StatusCode
resp.Body.Close()
}
logger.Debug(logSender, "", "notified operation %#v to URL: %v status code: %v, elapsed: %v err: %v",
a.Action, url.String(), respCode, time.Since(startTime), err)
}
return err
}