eventmanager: add support for file/directory compression

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-10-10 18:53:58 +02:00
parent a417df60b3
commit 3e44a1dd2d
14 changed files with 919 additions and 42 deletions

View File

@@ -621,6 +621,161 @@ func getCSVRetentionReport(results []folderRetentionCheckResult) ([]byte, error)
return b.Bytes(), err
}
func closeWriterAndUpdateQuota(w io.WriteCloser, conn *BaseConnection, virtualPath string, numFiles int,
truncatedSize int64, errTransfer error,
) error {
errWrite := w.Close()
info, err := conn.doStatInternal(virtualPath, 0, false, false)
if err == nil {
updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, info.Size()-truncatedSize)
_, fsPath, errFs := conn.GetFsAndResolvedPath(virtualPath)
if errFs == nil {
if errTransfer == nil {
errTransfer = errWrite
}
ExecuteActionNotification(conn, operationUpload, fsPath, virtualPath, "", "", "", info.Size(), errTransfer) //nolint:errcheck
}
} else {
eventManagerLog(logger.LevelWarn, "unable to update quota after writing %q: %v", virtualPath, err)
}
return errWrite
}
func updateUserQuotaAfterFileWrite(conn *BaseConnection, virtualPath string, numFiles int, fileSize int64) {
vfolder, err := conn.User.GetVirtualFolderForPath(path.Dir(virtualPath))
if err != nil {
dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
return
}
dataprovider.UpdateVirtualFolderQuota(&vfolder.BaseVirtualFolder, numFiles, fileSize, false) //nolint:errcheck
if vfolder.IsIncludedInUserQuota() {
dataprovider.UpdateUserQuota(&conn.User, numFiles, fileSize, false) //nolint:errcheck
}
}
func getFileWriter(conn *BaseConnection, virtualPath string) (io.WriteCloser, int, int64, func(), error) {
fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
if err != nil {
return nil, 0, 0, nil, err
}
var truncatedSize, fileSize int64
numFiles := 1
isFileOverwrite := false
info, err := fs.Lstat(fsPath)
if err == nil {
fileSize = info.Size()
if info.IsDir() {
return nil, numFiles, truncatedSize, nil, fmt.Errorf("cannot write to a directory: %q", virtualPath)
}
if info.Mode().IsRegular() {
isFileOverwrite = true
truncatedSize = fileSize
}
numFiles = 0
}
if err != nil && !fs.IsNotExist(err) {
return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
}
f, w, cancelFn, err := fs.Create(fsPath, 0)
if err != nil {
return nil, numFiles, truncatedSize, nil, conn.GetFsError(fs, err)
}
vfs.SetPathPermissions(fs, fsPath, conn.User.GetUID(), conn.User.GetGID())
if isFileOverwrite {
if vfs.HasTruncateSupport(fs) || vfs.IsCryptOsFs(fs) {
updateUserQuotaAfterFileWrite(conn, virtualPath, numFiles, -fileSize)
truncatedSize = 0
}
}
if cancelFn == nil {
cancelFn = func() {}
}
if f != nil {
return f, numFiles, truncatedSize, cancelFn, nil
}
return w, numFiles, truncatedSize, cancelFn, nil
}
func addZipEntry(wr *zipWriterWrapper, conn *BaseConnection, entryPath, baseDir string) error {
if entryPath == wr.Name {
// skip the archive itself
return nil
}
info, err := conn.DoStat(entryPath, 1, false)
if err != nil {
eventManagerLog(logger.LevelError, "unable to add zip entry %#v, stat error: %v", entryPath, err)
return err
}
entryName, err := getZipEntryName(entryPath, baseDir)
if err != nil {
eventManagerLog(logger.LevelError, "unable to get zip entry name: %v", err)
return err
}
if _, ok := wr.Entries[entryName]; ok {
eventManagerLog(logger.LevelInfo, "skipping duplicate zip entry %q, is dir %t", entryPath, info.IsDir())
return nil
}
wr.Entries[entryName] = true
if info.IsDir() {
_, err = wr.Writer.CreateHeader(&zip.FileHeader{
Name: entryName + "/",
Method: zip.Deflate,
Modified: info.ModTime(),
})
if err != nil {
eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
}
contents, err := conn.ListDir(entryPath)
if err != nil {
eventManagerLog(logger.LevelError, "unable to add zip entry %q, read dir error: %v", entryPath, err)
return fmt.Errorf("unable to add zip entry %q: %w", entryPath, err)
}
for _, info := range contents {
fullPath := util.CleanPath(path.Join(entryPath, info.Name()))
if err := addZipEntry(wr, conn, fullPath, baseDir); err != nil {
eventManagerLog(logger.LevelError, "unable to add zip entry: %v", err)
return err
}
}
return nil
}
if !info.Mode().IsRegular() {
// we only allow regular files
eventManagerLog(logger.LevelInfo, "skipping zip entry for non regular file %q", entryPath)
return nil
}
reader, cancelFn, err := getFileReader(conn, entryPath)
if err != nil {
eventManagerLog(logger.LevelError, "unable to add zip entry %q, cannot open file: %v", entryPath, err)
return fmt.Errorf("unable to open %q: %w", entryPath, err)
}
defer cancelFn()
defer reader.Close()
f, err := wr.Writer.CreateHeader(&zip.FileHeader{
Name: entryName,
Method: zip.Deflate,
Modified: info.ModTime(),
})
if err != nil {
eventManagerLog(logger.LevelError, "unable to create zip entry %q: %v", entryPath, err)
return fmt.Errorf("unable to create zip entry %q: %w", entryPath, err)
}
_, err = io.Copy(f, reader)
return err
}
func getZipEntryName(entryPath, baseDir string) (string, error) {
if !strings.HasPrefix(entryPath, baseDir) {
return "", fmt.Errorf("entry path %q is outside base dir %q", entryPath, baseDir)
}
entryPath = strings.TrimPrefix(entryPath, baseDir)
return strings.TrimPrefix(entryPath, "/"), nil
}
func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, func(), error) {
fs, fsPath, err := conn.GetFsAndResolvedPath(virtualPath)
if err != nil {
@@ -628,7 +783,7 @@ func getFileReader(conn *BaseConnection, virtualPath string) (io.ReadCloser, fun
}
f, r, cancelFn, err := fs.Open(fsPath, 0)
if err != nil {
return nil, nil, err
return nil, nil, conn.GetFsError(fs, err)
}
if cancelFn == nil {
cancelFn = func() {}
@@ -1035,8 +1190,13 @@ func getUserForEventAction(user dataprovider.User) (dataprovider.User, error) {
eventManagerLog(logger.LevelError, "unable to get group for user %q: %+v", user.Username, err)
return dataprovider.User{}, fmt.Errorf("unable to get groups for user %q", user.Username)
}
user.UploadDataTransfer = 0
user.UploadBandwidth = 0
user.DownloadBandwidth = 0
user.Filters.DisableFsChecks = false
user.Filters.FilePatterns = nil
user.Filters.BandwidthLimits = nil
user.Filters.DataTransferLimits = nil
for k := range user.Permissions {
user.Permissions[k] = []string{dataprovider.PermAny}
}
@@ -1279,6 +1439,72 @@ func executeRenameFsRuleAction(renames []dataprovider.KeyValue, replacer *string
return nil
}
func getArchiveBaseDir(paths []string) string {
var parentDirs []string
for _, p := range paths {
parentDirs = append(parentDirs, path.Dir(p))
}
parentDirs = util.RemoveDuplicates(parentDirs, false)
baseDir := "/"
if len(parentDirs) == 1 {
baseDir = parentDirs[0]
}
return baseDir
}
func executeCompressFsActionForUser(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
user dataprovider.User,
) error {
user, err := getUserForEventAction(user)
if err != nil {
return err
}
connectionID := fmt.Sprintf("%s_%s", protocolEventAction, xid.New().String())
err = user.CheckFsRoot(connectionID)
defer user.CloseFs() //nolint:errcheck
if err != nil {
return fmt.Errorf("compress error, unable to check root fs for user %q: %w", user.Username, err)
}
conn := NewBaseConnection(connectionID, protocolEventAction, "", "", user)
name := util.CleanPath(replaceWithReplacer(c.Name, replacer))
paths := make([]string, 0, len(c.Paths))
for idx := range c.Paths {
p := util.CleanPath(replaceWithReplacer(c.Paths[idx], replacer))
if p == name {
return fmt.Errorf("cannot compress the archive to create: %q", name)
}
paths = append(paths, p)
}
writer, numFiles, truncatedSize, cancelFn, err := getFileWriter(conn, name)
if err != nil {
eventManagerLog(logger.LevelError, "unable to create archive %q: %v", name, err)
return fmt.Errorf("unable to create archive: %w", err)
}
defer cancelFn()
paths = util.RemoveDuplicates(paths, false)
baseDir := getArchiveBaseDir(paths)
eventManagerLog(logger.LevelDebug, "creating archive %q for paths %+v", name, paths)
zipWriter := &zipWriterWrapper{
Name: name,
Writer: zip.NewWriter(writer),
Entries: make(map[string]bool),
}
for _, item := range paths {
if err := addZipEntry(zipWriter, conn, item, baseDir); err != nil {
closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err) //nolint:errcheck
return err
}
}
if err := zipWriter.Writer.Close(); err != nil {
eventManagerLog(logger.LevelError, "unable to close zip file %q: %v", name, err)
closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err) //nolint:errcheck
return fmt.Errorf("unable to close zip file %q: %w", name, err)
}
return closeWriterAndUpdateQuota(writer, conn, name, numFiles, truncatedSize, err)
}
func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, conditions dataprovider.ConditionOptions,
params *EventParams,
) error {
@@ -1319,6 +1545,46 @@ func executeExistFsRuleAction(exist []string, replacer *strings.Replacer, condit
return nil
}
func executeCompressFsRuleAction(c dataprovider.EventActionFsCompress, replacer *strings.Replacer,
conditions dataprovider.ConditionOptions, params *EventParams,
) error {
users, err := params.getUsers()
if err != nil {
return fmt.Errorf("unable to get users: %w", err)
}
var failures []string
executed := 0
for _, user := range users {
// if sender is set, the conditions have already been evaluated
if params.sender == "" {
if !checkEventConditionPatterns(user.Username, conditions.Names) {
eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, name conditions don't match",
user.Username)
continue
}
if !checkEventGroupConditionPatters(user.Groups, conditions.GroupNames) {
eventManagerLog(logger.LevelDebug, "skipping fs compress for user %s, group name conditions don't match",
user.Username)
continue
}
}
executed++
if err = executeCompressFsActionForUser(c, replacer, user); err != nil {
failures = append(failures, user.Username)
params.AddError(err)
continue
}
}
if len(failures) > 0 {
return fmt.Errorf("fs compress failed for users: %+v", failures)
}
if executed == 0 {
eventManagerLog(logger.LevelError, "no file/folder compressed")
return errors.New("no file/folder compressed")
}
return nil
}
func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions dataprovider.ConditionOptions,
params *EventParams,
) error {
@@ -1334,6 +1600,8 @@ func executeFsRuleAction(c dataprovider.EventActionFilesystemConfig, conditions
return executeMkdirFsRuleAction(c.MkDirs, replacer, conditions, params)
case dataprovider.FilesystemActionExist:
return executeExistFsRuleAction(c.Exist, replacer, conditions, params)
case dataprovider.FilesystemActionCompress:
return executeCompressFsRuleAction(c.Compress, replacer, conditions, params)
default:
return fmt.Errorf("unsupported filesystem action %d", c.Type)
}
@@ -1818,6 +2086,12 @@ func (j *eventCronJob) Run() {
eventManagerLog(logger.LevelDebug, "execution for scheduled rule %q finished", j.ruleName)
}
type zipWriterWrapper struct {
Name string
Entries map[string]bool
Writer *zip.Writer
}
func eventManagerLog(level logger.LogLevel, format string, v ...any) {
logger.Log(level, "eventmanager", "", format, v...)
}