Skip to content

Commit 02de274

Browse files
authored
feat(federated): allow to pickup a specific worker, improve loadbalancing (#3243)
* feat(explorer): allow to specify a worker target Signed-off-by: Ettore Di Giacinto <[email protected]> * feat(explorer): correctly load balance requests Signed-off-by: Ettore Di Giacinto <[email protected]> * feat(explorer): mark load balanced by default Signed-off-by: Ettore Di Giacinto <[email protected]> * fix: make sure to delete tunnels that might not exist anymore If a worker goes off and on might change tunnel address, and we want to load balance only on the active tunnels. Signed-off-by: Ettore Di Giacinto <[email protected]> --------- Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent 7d92936 commit 02de274

File tree

4 files changed

+93
-31
lines changed

4 files changed

+93
-31
lines changed

core/cli/federated.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ import (
1010
type FederatedCLI struct {
1111
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
1212
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
13-
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
13+
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
1414
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
15+
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
1516
}
1617

1718
func (f *FederatedCLI) Run(ctx *cliContext.Context) error {
1819

19-
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced)
20+
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)
2021

2122
return fs.Start(context.Background())
2223
}

core/p2p/federated.go

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
package p2p
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
"math/rand/v2"
6+
"sync"
7+
8+
"github.com/rs/zerolog/log"
9+
)
410

511
const FederatedID = "federated"
612

@@ -12,22 +18,70 @@ func NetworkID(networkID, serviceID string) string {
1218
}
1319

1420
type FederatedServer struct {
21+
sync.Mutex
1522
listenAddr, service, p2ptoken string
1623
requestTable map[string]int
1724
loadBalanced bool
25+
workerTarget string
1826
}
1927

20-
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool) *FederatedServer {
28+
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer {
2129
return &FederatedServer{
2230
listenAddr: listenAddr,
2331
service: service,
2432
p2ptoken: p2pToken,
2533
requestTable: map[string]int{},
2634
loadBalanced: loadBalanced,
35+
workerTarget: workerTarget,
36+
}
37+
}
38+
39+
func (fs *FederatedServer) RandomServer() string {
40+
var tunnelAddresses []string
41+
for _, v := range GetAvailableNodes(fs.service) {
42+
if v.IsOnline() {
43+
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
44+
} else {
45+
delete(fs.requestTable, v.TunnelAddress) // make sure it's not tracked
46+
log.Info().Msgf("Node %s is offline", v.ID)
47+
}
48+
}
49+
50+
if len(tunnelAddresses) == 0 {
51+
return ""
52+
}
53+
54+
return tunnelAddresses[rand.IntN(len(tunnelAddresses))]
55+
}
56+
57+
func (fs *FederatedServer) syncTableStatus() {
58+
fs.Lock()
59+
defer fs.Unlock()
60+
currentTunnels := make(map[string]struct{})
61+
62+
for _, v := range GetAvailableNodes(fs.service) {
63+
if v.IsOnline() {
64+
fs.ensureRecordExist(v.TunnelAddress)
65+
currentTunnels[v.TunnelAddress] = struct{}{}
66+
}
67+
}
68+
69+
// delete tunnels that don't exist anymore
70+
for t := range fs.requestTable {
71+
if _, ok := currentTunnels[t]; !ok {
72+
delete(fs.requestTable, t)
73+
}
2774
}
2875
}
2976

3077
func (fs *FederatedServer) SelectLeastUsedServer() string {
78+
fs.syncTableStatus()
79+
80+
fs.Lock()
81+
defer fs.Unlock()
82+
83+
log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
84+
3185
// cycle over requestTable and find the entry with the lower number
3286
// if there are multiple entries with the same number, select one randomly
3387
// if there are no entries, return an empty string
@@ -39,18 +93,26 @@ func (fs *FederatedServer) SelectLeastUsedServer() string {
3993
minKey = k
4094
}
4195
}
96+
log.Debug().Any("requests_served", min).Msgf("Selected tunnel %s", minKey)
97+
4298
return minKey
4399
}
44100

45101
func (fs *FederatedServer) RecordRequest(nodeID string) {
102+
fs.Lock()
103+
defer fs.Unlock()
46104
// increment the counter for the nodeID in the requestTable
47105
fs.requestTable[nodeID]++
106+
107+
log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
48108
}
49109

50-
func (fs *FederatedServer) EnsureRecordExist(nodeID string) {
110+
func (fs *FederatedServer) ensureRecordExist(nodeID string) {
51111
// if the nodeID is not in the requestTable, add it with a counter of 0
52112
_, ok := fs.requestTable[nodeID]
53113
if !ok {
54114
fs.requestTable[nodeID] = 0
55115
}
116+
117+
log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
56118
}

core/p2p/federated_server.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"net"
1111
"time"
1212

13-
"math/rand/v2"
14-
1513
"github.com/mudler/edgevpn/pkg/node"
1614
"github.com/mudler/edgevpn/pkg/protocol"
1715
"github.com/mudler/edgevpn/pkg/types"
@@ -76,7 +74,7 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
7674
case <-ctx.Done():
7775
return errors.New("context canceled")
7876
default:
79-
log.Debug().Msg("New for connection")
77+
log.Debug().Msgf("New connection from %s", l.Addr().String())
8078
// Listen for an incoming connection.
8179
conn, err := l.Accept()
8280
if err != nil {
@@ -86,38 +84,34 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
8684

8785
// Handle connections in a new goroutine, forwarding to the p2p service
8886
go func() {
89-
var tunnelAddresses []string
90-
for _, v := range GetAvailableNodes(fs.service) {
91-
if v.IsOnline() {
92-
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
93-
} else {
94-
log.Info().Msgf("Node %s is offline", v.ID)
95-
}
96-
}
97-
98-
if len(tunnelAddresses) == 0 {
99-
log.Error().Msg("No available nodes yet")
100-
return
101-
}
102-
10387
tunnelAddr := ""
10488

105-
if fs.loadBalanced {
106-
for _, t := range tunnelAddresses {
107-
fs.EnsureRecordExist(t)
89+
if fs.workerTarget != "" {
90+
for _, v := range GetAvailableNodes(fs.service) {
91+
if v.ID == fs.workerTarget {
92+
tunnelAddr = v.TunnelAddress
93+
break
94+
}
10895
}
96+
} else if fs.loadBalanced {
97+
log.Debug().Msgf("Load balancing request")
10998

11099
tunnelAddr = fs.SelectLeastUsedServer()
111-
log.Debug().Msgf("Selected tunnel %s", tunnelAddr)
112100
if tunnelAddr == "" {
113-
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
101+
tunnelAddr = fs.RandomServer()
114102
}
115103

116-
fs.RecordRequest(tunnelAddr)
117104
} else {
118-
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
105+
tunnelAddr = fs.RandomServer()
119106
}
120107

108+
if tunnelAddr == "" {
109+
log.Error().Msg("No available nodes yet")
110+
return
111+
}
112+
113+
log.Debug().Msgf("Selected tunnel %s", tunnelAddr)
114+
121115
tunnelConn, err := net.Dial("tcp", tunnelAddr)
122116
if err != nil {
123117
log.Error().Err(err).Msg("Error connecting to tunnel")
@@ -132,7 +126,10 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
132126

133127
tunnelConn.Close()
134128
conn.Close()
135-
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
129+
130+
if fs.loadBalanced {
131+
fs.RecordRequest(tunnelAddr)
132+
}
136133
}()
137134
}
138135
}

core/p2p/p2p.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
181181
if err != nil {
182182
return nil, fmt.Errorf("creating a new node: %w", err)
183183
}
184-
185184
// get new services, allocate and return to the channel
186185

187186
// TODO:
@@ -201,6 +200,9 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
201200
zlog.Debug().Msg("Searching for workers")
202201

203202
data := ledger.LastBlock().Storage[servicesID]
203+
204+
zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data")
205+
204206
for k, v := range data {
205207
zlog.Info().Msgf("Found worker %s", k)
206208
nd := &NodeData{}

0 commit comments

Comments
 (0)