mirror of
https://github.com/drakkan/sftpgo.git
synced 2025-12-08 07:10:56 +03:00
execute db migrations holding a database-level lock
so migrations cannot be executed concurrently if you run them from multiple SFTPGo instances at the same time. CockroachDB doesn't support database-level locks Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
@@ -156,7 +156,7 @@ You can also reset your provider by using the `resetprovider` sub-command. Take
|
|||||||
sftpgo resetprovider --help
|
sftpgo resetprovider --help
|
||||||
```
|
```
|
||||||
|
|
||||||
:warning: Please note that some data providers (e.g. MySQL and CockroachDB) do not support schema changes within a transaction, this means that you may end up with an inconsistent schema if migrations are forcibly aborted or if they are run concurrently by multiple instances.
|
:warning: Please note that some data providers (e.g. MySQL and CockroachDB) do not support schema changes within a transaction, this means that you may end up with an inconsistent schema if migrations are forcibly aborted. CockroachDB doesn't support database-level locks, so make sure you don't execute migrations concurrently.
|
||||||
|
|
||||||
## Create the first admin
|
## Create the first admin
|
||||||
|
|
||||||
|
|||||||
@@ -234,7 +234,7 @@ func getMySQLConnectionString(redactedPwd bool) (string, error) {
|
|||||||
return "", fmt.Errorf("unable to register tls config: %v", err)
|
return "", fmt.Errorf("unable to register tls config: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
connectionString = fmt.Sprintf("%v:%v@tcp([%v]:%v)/%v?charset=utf8mb4&interpolateParams=true&timeout=10s&parseTime=true&tls=%v&writeTimeout=10s&readTimeout=10s",
|
connectionString = fmt.Sprintf("%v:%v@tcp([%v]:%v)/%v?charset=utf8mb4&interpolateParams=true&timeout=10s&parseTime=true&tls=%v&writeTimeout=60s&readTimeout=60s",
|
||||||
config.Username, password, config.Host, config.Port, config.Name, sslMode)
|
config.Username, password, config.Host, config.Port, config.Name, sslMode)
|
||||||
} else {
|
} else {
|
||||||
connectionString = config.ConnectionString
|
connectionString = config.ConnectionString
|
||||||
|
|||||||
@@ -2911,10 +2911,11 @@ func sqlCommonGetAPIKeyRelatedIDs(apiKey *APIKey) (sql.NullInt64, sql.NullInt64,
|
|||||||
return userID, adminID, nil
|
return userID, adminID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sqlCommonGetDatabaseVersion(dbHandle *sql.DB, showInitWarn bool) (schemaVersion, error) {
|
func sqlCommonGetDatabaseVersion(dbHandle sqlQuerier, showInitWarn bool) (schemaVersion, error) {
|
||||||
var result schemaVersion
|
var result schemaVersion
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
q := getDatabaseVersionQuery()
|
q := getDatabaseVersionQuery()
|
||||||
stmt, err := dbHandle.PrepareContext(ctx, q)
|
stmt, err := dbHandle.PrepareContext(ctx, q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -2943,9 +2944,23 @@ func sqlCommonUpdateDatabaseVersion(ctx context.Context, dbHandle sqlQuerier, ve
|
|||||||
}
|
}
|
||||||
|
|
||||||
func sqlCommonExecSQLAndUpdateDBVersion(dbHandle *sql.DB, sqlQueries []string, newVersion int) error {
|
func sqlCommonExecSQLAndUpdateDBVersion(dbHandle *sql.DB, sqlQueries []string, newVersion int) error {
|
||||||
|
if err := sqlAquireLock(dbHandle); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer sqlReleaseLock(dbHandle)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), longSQLQueryTimeout)
|
ctx, cancel := context.WithTimeout(context.Background(), longSQLQueryTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
if newVersion > 0 {
|
||||||
|
currentVersion, err := sqlCommonGetDatabaseVersion(dbHandle, false)
|
||||||
|
if err == nil && currentVersion.Version >= newVersion {
|
||||||
|
providerLog(logger.LevelInfo, "current schema version: %v, requested: %v, did you execute simultaneous migrations?",
|
||||||
|
currentVersion.Version, newVersion)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return sqlCommonExecuteTx(ctx, dbHandle, func(tx *sql.Tx) error {
|
return sqlCommonExecuteTx(ctx, dbHandle, func(tx *sql.Tx) error {
|
||||||
for _, q := range sqlQueries {
|
for _, q := range sqlQueries {
|
||||||
if strings.TrimSpace(q) == "" {
|
if strings.TrimSpace(q) == "" {
|
||||||
@@ -2963,6 +2978,63 @@ func sqlCommonExecSQLAndUpdateDBVersion(dbHandle *sql.DB, sqlQueries []string, n
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sqlAquireLock(dbHandle *sql.DB) error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), longSQLQueryTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
switch config.Driver {
|
||||||
|
case PGSQLDataProviderName:
|
||||||
|
_, err := dbHandle.ExecContext(ctx, `SELECT pg_advisory_lock(101,1)`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to get advisory lock: %w", err)
|
||||||
|
}
|
||||||
|
providerLog(logger.LevelInfo, "acquired database lock")
|
||||||
|
case MySQLDataProviderName:
|
||||||
|
stmt, err := dbHandle.PrepareContext(ctx, `SELECT GET_LOCK('sftpgo.migration',30)`)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to get lock: %w", err)
|
||||||
|
}
|
||||||
|
defer stmt.Close()
|
||||||
|
|
||||||
|
var lockResult sql.NullInt64
|
||||||
|
err = stmt.QueryRowContext(ctx).Scan(&lockResult)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to get lock: %w", err)
|
||||||
|
}
|
||||||
|
if !lockResult.Valid {
|
||||||
|
return errors.New("unable to get lock: null value returned")
|
||||||
|
}
|
||||||
|
if lockResult.Int64 != 1 {
|
||||||
|
return fmt.Errorf("unable to get lock, result: %v", lockResult.Int64)
|
||||||
|
}
|
||||||
|
providerLog(logger.LevelInfo, "acquired database lock")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func sqlReleaseLock(dbHandle *sql.DB) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
switch config.Driver {
|
||||||
|
case PGSQLDataProviderName:
|
||||||
|
_, err := dbHandle.ExecContext(ctx, `SELECT pg_advisory_unlock(101,1)`)
|
||||||
|
if err != nil {
|
||||||
|
providerLog(logger.LevelWarn, "unable to release lock: %v", err)
|
||||||
|
} else {
|
||||||
|
providerLog(logger.LevelInfo, "released database lock")
|
||||||
|
}
|
||||||
|
case MySQLDataProviderName:
|
||||||
|
_, err := dbHandle.ExecContext(ctx, `SELECT RELEASE_LOCK('sftpgo.migration')`)
|
||||||
|
if err != nil {
|
||||||
|
providerLog(logger.LevelWarn, "unable to release lock: %v", err)
|
||||||
|
} else {
|
||||||
|
providerLog(logger.LevelInfo, "released database lock")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func sqlCommonExecuteTx(ctx context.Context, dbHandle *sql.DB, txFn func(*sql.Tx) error) error {
|
func sqlCommonExecuteTx(ctx context.Context, dbHandle *sql.DB, txFn func(*sql.Tx) error) error {
|
||||||
if config.Driver == CockroachDataProviderName {
|
if config.Driver == CockroachDataProviderName {
|
||||||
return crdb.ExecuteTx(ctx, dbHandle, nil, txFn)
|
return crdb.ExecuteTx(ctx, dbHandle, nil, txFn)
|
||||||
|
|||||||
Reference in New Issue
Block a user