mirror of
https://github.com/drakkan/sftpgo.git
synced 2025-12-06 06:10:54 +03:00
34
examples/data-retention/README.md
Normal file
34
examples/data-retention/README.md
Normal file
@@ -0,0 +1,34 @@
|
||||
# File retention policies
|
||||
|
||||
The `checkretention` example script shows how to use the SFTPGo REST API to manage data retention.
|
||||
|
||||
:warning: Deleting files is an irreversible action, please make sure you fully understand what you are doing before using this feature, you may have users with overlapping home directories or virtual folders shared between multiple users, it is relatively easy to inadvertently delete files you need.
|
||||
|
||||
The example shows how to setup a really simple retention policy, for each user it sends this request:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"path": "/",
|
||||
"retention": 168,
|
||||
"delete_empty_dirs": true,
|
||||
"ignore_user_permissions": false
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
so alls files with modification time older than 168 hours (7 days) will be deleted. Empty directories will be removed and the check will respect user's permissions, so if the user cannot delete a file/folder it will be skipped.
|
||||
|
||||
You can define different retention policies per-user and per-folder and you can exclude a folder setting the retention to `0`.
|
||||
|
||||
You can use this script as a starting point, please edit it according to your needs.
|
||||
|
||||
The script is written in Python and has the following requirements:
|
||||
|
||||
- python3 or python2
|
||||
- python [Requests](https://requests.readthedocs.io/en/master/) module
|
||||
|
||||
The provided example tries to connect to an SFTPGo instance running on `127.0.0.1:8080` using the following credentials:
|
||||
|
||||
- username: `admin`
|
||||
- password: `password`
|
||||
115
examples/data-retention/checkretention
Executable file
115
examples/data-retention/checkretention
Executable file
@@ -0,0 +1,115 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
from datetime import datetime
|
||||
import sys
|
||||
import time
|
||||
|
||||
import pytz
|
||||
import requests
|
||||
|
||||
try:
|
||||
import urllib.parse as urlparse
|
||||
except ImportError:
|
||||
import urlparse
|
||||
|
||||
# change base_url to point to your SFTPGo installation
|
||||
base_url = "http://127.0.0.1:8080"
|
||||
# set to False if you want to skip TLS certificate validation
|
||||
verify_tls_cert = True
|
||||
# set the credentials for a valid admin here
|
||||
admin_user = "admin"
|
||||
admin_password = "password"
|
||||
|
||||
|
||||
class CheckRetention:
|
||||
|
||||
def __init__(self):
|
||||
self.limit = 100
|
||||
self.offset = 0
|
||||
self.access_token = ""
|
||||
self.access_token_expiration = None
|
||||
|
||||
def printLog(self, message):
|
||||
print("{} - {}".format(datetime.now(), message))
|
||||
|
||||
def checkAccessToken(self):
|
||||
if self.access_token != "" and self.access_token_expiration:
|
||||
expire_diff = self.access_token_expiration - datetime.now(tz=pytz.UTC)
|
||||
# we don't use total_seconds to be python 2 compatible
|
||||
seconds_to_expire = expire_diff.days * 86400 + expire_diff.seconds
|
||||
if seconds_to_expire > 180:
|
||||
return
|
||||
|
||||
auth = requests.auth.HTTPBasicAuth(admin_user, admin_password)
|
||||
r = requests.get(urlparse.urljoin(base_url, "api/v2/token"), auth=auth, verify=verify_tls_cert, timeout=10)
|
||||
if r.status_code != 200:
|
||||
self.printLog("error getting access token: {}".format(r.text))
|
||||
sys.exit(1)
|
||||
self.access_token = r.json()["access_token"]
|
||||
self.access_token_expiration = pytz.timezone("UTC").localize(datetime.strptime(r.json()["expires_at"],
|
||||
"%Y-%m-%dT%H:%M:%SZ"))
|
||||
|
||||
def getAuthHeader(self):
|
||||
self.checkAccessToken()
|
||||
return {"Authorization": "Bearer " + self.access_token}
|
||||
|
||||
def waitForRentionCheck(self, username):
|
||||
while True:
|
||||
auth_header = self.getAuthHeader()
|
||||
r = requests.get(urlparse.urljoin(base_url, "api/v2/retention/users/checks"), headers=auth_header, verify=verify_tls_cert,
|
||||
timeout=10)
|
||||
if r.status_code != 200:
|
||||
self.printLog("error getting retention checks while waiting for {}: {}".format(username, r.text))
|
||||
sys.exit(1)
|
||||
|
||||
checking = False
|
||||
for check in r.json():
|
||||
if check["username"] == username:
|
||||
checking = True
|
||||
if not checking:
|
||||
break
|
||||
self.printLog("waiting for the retention check to complete for user {}".format(username))
|
||||
time.sleep(2)
|
||||
|
||||
self.printLog("retention check for user {} finished".format(username))
|
||||
|
||||
def checkUserRetention(self, username):
|
||||
self.printLog("starting retention check for user {}".format(username))
|
||||
auth_header = self.getAuthHeader()
|
||||
retention = [
|
||||
{
|
||||
"path": "/",
|
||||
"retention": 168,
|
||||
"delete_empty_dirs": True,
|
||||
"ignore_user_permissions": False
|
||||
}
|
||||
]
|
||||
r = requests.post(urlparse.urljoin(base_url, "api/v2/retention/users/" + username + "/check"), headers=auth_header,
|
||||
json=retention, verify=verify_tls_cert, timeout=10)
|
||||
if r.status_code != 202:
|
||||
self.printLog("error starting retention check for user {}: {}".format(username, r.text))
|
||||
sys.exit(1)
|
||||
self.waitForRentionCheck(username)
|
||||
|
||||
def checkUsersRetention(self):
|
||||
while True:
|
||||
self.printLog("get users, limit {} offset {}".format(self.limit, self.offset))
|
||||
auth_header = self.getAuthHeader()
|
||||
payload = {"limit":self.limit, "offset":self.offset}
|
||||
r = requests.get(urlparse.urljoin(base_url, "api/v2/users"), headers=auth_header, params=payload,
|
||||
verify=verify_tls_cert, timeout=10)
|
||||
if r.status_code != 200:
|
||||
self.printLog("error getting users: {}".format(r.text))
|
||||
sys.exit(1)
|
||||
users = r.json()
|
||||
for user in users:
|
||||
self.checkUserRetention(user["username"])
|
||||
|
||||
self.offset += len(users)
|
||||
if len(users) < self.limit:
|
||||
break
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
c = CheckRetention()
|
||||
c.checkUsersRetention()
|
||||
@@ -6,7 +6,7 @@ The stored quota may be incorrect for several reasons, such as an unexpected shu
|
||||
|
||||
A quota scan updates the number of files and their total size for the specified user and the virtual folders, if any, included in his quota.
|
||||
|
||||
If you want to track quotas, a scheduled quota scan is recommended. You could use this example as a starting point.
|
||||
If you want to track quotas, a scheduled quota scan is recommended. You can use this example as a starting point.
|
||||
|
||||
The script is written in Python and has the following requirements:
|
||||
|
||||
|
||||
@@ -65,7 +65,7 @@ class UpdateQuota:
|
||||
def waitForQuotaUpdate(self, username):
|
||||
while True:
|
||||
auth_header = self.getAuthHeader()
|
||||
r = requests.get(urlparse.urljoin(base_url, "api/v2/quota-scans"), headers=auth_header, verify=verify_tls_cert,
|
||||
r = requests.get(urlparse.urljoin(base_url, "api/v2/quotas/users/scans"), headers=auth_header, verify=verify_tls_cert,
|
||||
timeout=10)
|
||||
if r.status_code != 200:
|
||||
self.printLog("error getting quota scans while waiting for {}: {}".format(username, r.text))
|
||||
@@ -85,8 +85,8 @@ class UpdateQuota:
|
||||
def updateUserQuota(self, username):
|
||||
self.printLog("starting quota update for user {}".format(username))
|
||||
auth_header = self.getAuthHeader()
|
||||
r = requests.post(urlparse.urljoin(base_url, "api/v2/quota-scans"), headers=auth_header,
|
||||
json={"username":username}, verify=verify_tls_cert, timeout=10)
|
||||
r = requests.post(urlparse.urljoin(base_url, "api/v2/quotas/users/" + username + "/scan"), headers=auth_header,
|
||||
verify=verify_tls_cert, timeout=10)
|
||||
if r.status_code != 202:
|
||||
self.printLog("error starting quota scan for user {}: {}".format(username, r.text))
|
||||
sys.exit(1)
|
||||
|
||||
Reference in New Issue
Block a user