add support for inter-node communications

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2022-09-25 19:48:55 +02:00
parent a538255034
commit 76e89d07d4
25 changed files with 847 additions and 59 deletions

View File

@@ -34,7 +34,7 @@ import (
)
const (
sqlDatabaseVersion = 22
sqlDatabaseVersion = 23
defaultSQLQueryTimeout = 10 * time.Second
longSQLQueryTimeout = 60 * time.Second
)
@@ -77,6 +77,7 @@ func sqlReplaceAll(sql string) string {
sql = strings.ReplaceAll(sql, "{{events_rules}}", sqlTableEventsRules)
sql = strings.ReplaceAll(sql, "{{rules_actions_mapping}}", sqlTableRulesActionsMapping)
sql = strings.ReplaceAll(sql, "{{tasks}}", sqlTableTasks)
sql = strings.ReplaceAll(sql, "{{nodes}}", sqlTableNodes)
sql = strings.ReplaceAll(sql, "{{prefix}}", config.SQLTablesPrefix)
return sql
}
@@ -3250,6 +3251,101 @@ func sqlCommonDeleteTask(name string, dbHandle sqlQuerier) error {
return err
}
func sqlCommonAddNode(dbHandle *sql.DB) error {
if err := currentNode.validate(); err != nil {
return fmt.Errorf("unable to register cluster node: %w", err)
}
data, err := json.Marshal(currentNode.Data)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
defer cancel()
q := getAddNodeQuery()
_, err = dbHandle.ExecContext(ctx, q, currentNode.Name, string(data), util.GetTimeAsMsSinceEpoch(time.Now()),
util.GetTimeAsMsSinceEpoch(time.Now()))
if err != nil {
return fmt.Errorf("unable to register cluster node: %w", err)
}
providerLog(logger.LevelInfo, "registered as cluster node %q, port: %d, proto: %s",
currentNode.Name, currentNode.Data.Port, currentNode.Data.Proto)
return nil
}
func sqlCommonGetNodeByName(name string, dbHandle *sql.DB) (Node, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
defer cancel()
var data []byte
var node Node
q := getNodeByNameQuery()
row := dbHandle.QueryRowContext(ctx, q, name, util.GetTimeAsMsSinceEpoch(time.Now().Add(activeNodeTimeDiff)))
err := row.Scan(&node.Name, &data, &node.CreatedAt, &node.UpdatedAt)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return node, util.NewRecordNotFoundError(err.Error())
}
return node, err
}
err = json.Unmarshal(data, &node.Data)
return node, err
}
func sqlCommonGetNodes(dbHandle *sql.DB) ([]Node, error) {
var nodes []Node
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
defer cancel()
q := getNodesQuery()
rows, err := dbHandle.QueryContext(ctx, q, currentNode.Name,
util.GetTimeAsMsSinceEpoch(time.Now().Add(activeNodeTimeDiff)))
if err != nil {
return nodes, err
}
defer rows.Close()
for rows.Next() {
var node Node
var data []byte
err = rows.Scan(&node.Name, &data, &node.CreatedAt, &node.UpdatedAt)
if err != nil {
return nodes, err
}
err = json.Unmarshal(data, &node.Data)
if err != nil {
return nodes, err
}
nodes = append(nodes, node)
}
return nodes, rows.Err()
}
func sqlCommonUpdateNodeTimestamp(dbHandle *sql.DB) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
defer cancel()
q := getUpdateNodeTimestampQuery()
res, err := dbHandle.ExecContext(ctx, q, util.GetTimeAsMsSinceEpoch(time.Now()), currentNode.Name)
if err != nil {
return err
}
return sqlCommonRequireRowAffected(res)
}
func sqlCommonCleanupNodes(dbHandle *sql.DB) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)
defer cancel()
q := getCleanupNodesQuery()
_, err := dbHandle.ExecContext(ctx, q, util.GetTimeAsMsSinceEpoch(time.Now().Add(10*activeNodeTimeDiff)))
return err
}
func sqlCommonGetDatabaseVersion(dbHandle sqlQuerier, showInitWarn bool) (schemaVersion, error) {
var result schemaVersion
ctx, cancel := context.WithTimeout(context.Background(), defaultSQLQueryTimeout)