replace fnv with sha256

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
This commit is contained in:
Nicola Murino
2024-12-14 14:42:43 +01:00
parent 599ee5a58f
commit c56be285a5
2 changed files with 18 additions and 16 deletions

View File

@@ -17,10 +17,11 @@ package dataprovider
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/sha256"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"hash/fnv"
"io" "io"
"net/http" "net/http"
"strconv" "strconv"
@@ -108,12 +109,12 @@ func (n *NodeData) validate() error {
} }
func (n *NodeData) getNodeName() string { func (n *NodeData) getNodeName() string {
h := fnv.New64a() h := sha256.New()
var b bytes.Buffer var b bytes.Buffer
b.WriteString(fmt.Sprintf("%s:%d", n.Host, n.Port)) b.WriteString(fmt.Sprintf("%s:%d", n.Host, n.Port))
h.Write(b.Bytes()) h.Write(b.Bytes())
return strconv.FormatUint(h.Sum64(), 10) return hex.EncodeToString(h.Sum(nil))
} }
// Node defines a cluster node // Node defines a cluster node

View File

@@ -18,9 +18,10 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"crypto/rsa" "crypto/rsa"
"crypto/sha256"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"hash/fnv"
"io" "io"
"io/fs" "io/fs"
"net" "net"
@@ -283,8 +284,8 @@ func (c *SFTPFsConfig) ValidateAndEncryptCredentials(additionalData string) erro
} }
// getUniqueID returns an hash of the settings used to connect to the SFTP server // getUniqueID returns an hash of the settings used to connect to the SFTP server
func (c *SFTPFsConfig) getUniqueID(partition int) uint64 { func (c *SFTPFsConfig) getUniqueID(partition int) string {
h := fnv.New64a() h := sha256.New()
var b bytes.Buffer var b bytes.Buffer
b.WriteString(c.Endpoint) b.WriteString(c.Endpoint)
@@ -301,7 +302,7 @@ func (c *SFTPFsConfig) getUniqueID(partition int) uint64 {
b.WriteString(strconv.Itoa(partition)) b.WriteString(strconv.Itoa(partition))
h.Write(b.Bytes()) h.Write(b.Bytes())
return h.Sum64() return hex.EncodeToString(h.Sum(nil))
} }
// SFTPFs is a Fs implementation for SFTP backends // SFTPFs is a Fs implementation for SFTP backends
@@ -1145,13 +1146,13 @@ func (c *sftpConnection) GetLastActivity() time.Time {
type sftpConnectionsCache struct { type sftpConnectionsCache struct {
scheduler *cron.Cron scheduler *cron.Cron
sync.RWMutex sync.RWMutex
items map[uint64]*sftpConnection items map[string]*sftpConnection
} }
func newSFTPConnectionCache() *sftpConnectionsCache { func newSFTPConnectionCache() *sftpConnectionsCache {
c := &sftpConnectionsCache{ c := &sftpConnectionsCache{
scheduler: cron.New(cron.WithLocation(time.UTC), cron.WithLogger(cron.DiscardLogger)), scheduler: cron.New(cron.WithLocation(time.UTC), cron.WithLogger(cron.DiscardLogger)),
items: make(map[uint64]*sftpConnection), items: make(map[string]*sftpConnection),
} }
_, err := c.scheduler.AddFunc("@every 1m", c.Cleanup) _, err := c.scheduler.AddFunc("@every 1m", c.Cleanup)
util.PanicOnError(err) util.PanicOnError(err)
@@ -1166,13 +1167,13 @@ func (c *sftpConnectionsCache) Get(config *SFTPFsConfig, sessionID string) (*sft
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
var oldKey uint64 var oldKey string
for { for {
if val, ok := c.items[key]; ok { if val, ok := c.items[key]; ok {
activeSessions := val.ActiveSessions() activeSessions := val.ActiveSessions()
if activeSessions < maxSessionsPerConnection || key == oldKey { if activeSessions < maxSessionsPerConnection || key == oldKey {
logger.Debug(logSenderSFTPCache, "", logger.Debug(logSenderSFTPCache, "",
"reusing connection for session ID %q, key: %d, active sessions %d, active connections: %d", "reusing connection for session ID %q, key %s, active sessions %d, active connections: %d",
sessionID, key, activeSessions+1, len(c.items)) sessionID, key, activeSessions+1, len(c.items))
val.AddSession(sessionID) val.AddSession(sessionID)
return val, nil return val, nil
@@ -1181,7 +1182,7 @@ func (c *sftpConnectionsCache) Get(config *SFTPFsConfig, sessionID string) (*sft
oldKey = key oldKey = key
key = config.getUniqueID(partition) key = config.getUniqueID(partition)
logger.Debug(logSenderSFTPCache, "", logger.Debug(logSenderSFTPCache, "",
"connection full, generated new key for partition: %d, active sessions: %d, key: %d, old key: %d", "connection full, generated new key for partition: %d, active sessions: %d, key: %s, old key: %s",
partition, activeSessions, oldKey, key) partition, activeSessions, oldKey, key)
} else { } else {
conn := newSFTPConnection(config, sessionID) conn := newSFTPConnection(config, sessionID)
@@ -1192,20 +1193,20 @@ func (c *sftpConnectionsCache) Get(config *SFTPFsConfig, sessionID string) (*sft
conn.signer = signer conn.signer = signer
c.items[key] = conn c.items[key] = conn
logger.Debug(logSenderSFTPCache, "", logger.Debug(logSenderSFTPCache, "",
"adding new connection for session ID %q, partition: %d, key: %d, active connections: %d", "adding new connection for session ID %q, partition: %d, key: %s, active connections: %d",
sessionID, partition, key, len(c.items)) sessionID, partition, key, len(c.items))
return conn, nil return conn, nil
} }
} }
} }
func (c *sftpConnectionsCache) Remove(key uint64) { func (c *sftpConnectionsCache) Remove(key string) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
if conn, ok := c.items[key]; ok { if conn, ok := c.items[key]; ok {
delete(c.items, key) delete(c.items, key)
logger.Debug(logSenderSFTPCache, "", "removed connection with key %d, active connections: %d", key, len(c.items)) logger.Debug(logSenderSFTPCache, "", "removed connection with key %s, active connections: %d", key, len(c.items))
defer conn.Close() defer conn.Close()
} }
@@ -1218,7 +1219,7 @@ func (c *sftpConnectionsCache) Cleanup() {
if val := conn.GetLastActivity(); val.Before(time.Now().Add(-30 * time.Second)) { if val := conn.GetLastActivity(); val.Before(time.Now().Add(-30 * time.Second)) {
logger.Debug(conn.logSender, "", "removing inactive connection, last activity %s", val) logger.Debug(conn.logSender, "", "removing inactive connection, last activity %s", val)
defer func(key uint64) { defer func(key string) {
c.Remove(key) c.Remove(key)
}(k) }(k)
} }