Skip to content

Commit 6d350cc

Browse files
authored
feat(federation): do not allocate local services for load balancing (#3337)
* refactor: extract proxy into functions * feat(federation): do not allocate services, directly connect with libp2p Signed-off-by: Ettore Di Giacinto <[email protected]> --------- Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent bcd3c1d commit 6d350cc

File tree

4 files changed

+97
-110
lines changed

4 files changed

+97
-110
lines changed

core/p2p/federated.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ func (fs *FederatedServer) RandomServer() string {
4040
var tunnelAddresses []string
4141
for _, v := range GetAvailableNodes(fs.service) {
4242
if v.IsOnline() {
43-
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
43+
tunnelAddresses = append(tunnelAddresses, v.ID)
4444
} else {
45-
delete(fs.requestTable, v.TunnelAddress) // make sure it's not tracked
45+
delete(fs.requestTable, v.ID) // make sure it's not tracked
4646
log.Info().Msgf("Node %s is offline", v.ID)
4747
}
4848
}
@@ -61,8 +61,8 @@ func (fs *FederatedServer) syncTableStatus() {
6161

6262
for _, v := range GetAvailableNodes(fs.service) {
6363
if v.IsOnline() {
64-
fs.ensureRecordExist(v.TunnelAddress)
65-
currentTunnels[v.TunnelAddress] = struct{}{}
64+
fs.ensureRecordExist(v.ID)
65+
currentTunnels[v.ID] = struct{}{}
6666
}
6767
}
6868

core/p2p/federated_server.go

Lines changed: 16 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,12 @@ import (
88
"errors"
99
"fmt"
1010
"net"
11-
"time"
1211

1312
"github.com/mudler/edgevpn/pkg/node"
14-
"github.com/mudler/edgevpn/pkg/protocol"
15-
"github.com/mudler/edgevpn/pkg/types"
1613
"github.com/rs/zerolog/log"
1714
)
1815

1916
func (f *FederatedServer) Start(ctx context.Context) error {
20-
2117
n, err := NewNode(f.p2ptoken)
2218
if err != nil {
2319
return fmt.Errorf("creating a new node: %w", err)
@@ -29,7 +25,7 @@ func (f *FederatedServer) Start(ctx context.Context) error {
2925

3026
if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
3127
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
32-
}, true); err != nil {
28+
}, false); err != nil {
3329
return err
3430
}
3531

@@ -50,21 +46,8 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
5046
<-ctx.Done()
5147
l.Close()
5248
}()
53-
ledger, _ := node.Ledger()
54-
55-
// Announce ourselves so nodes accepts our connection
56-
ledger.Announce(
57-
ctx,
58-
10*time.Second,
59-
func() {
60-
updatedMap := map[string]interface{}{}
61-
updatedMap[node.Host().ID().String()] = &types.User{
62-
PeerID: node.Host().ID().String(),
63-
Timestamp: time.Now().String(),
64-
}
65-
ledger.Add(protocol.UsersLedgerKey, updatedMap)
66-
},
67-
)
49+
50+
nodeAnnounce(ctx, node)
6851

6952
defer l.Close()
7053
for {
@@ -82,52 +65,36 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
8265

8366
// Handle connections in a new goroutine, forwarding to the p2p service
8467
go func() {
85-
tunnelAddr := ""
86-
68+
workerID := ""
8769
if fs.workerTarget != "" {
88-
for _, v := range GetAvailableNodes(fs.service) {
89-
if v.ID == fs.workerTarget {
90-
tunnelAddr = v.TunnelAddress
91-
break
92-
}
93-
}
70+
workerID = fs.workerTarget
9471
} else if fs.loadBalanced {
9572
log.Debug().Msgf("Load balancing request")
9673

97-
tunnelAddr = fs.SelectLeastUsedServer()
98-
if tunnelAddr == "" {
74+
workerID = fs.SelectLeastUsedServer()
75+
if workerID == "" {
9976
log.Debug().Msgf("Least used server not found, selecting random")
100-
tunnelAddr = fs.RandomServer()
77+
workerID = fs.RandomServer()
10178
}
102-
10379
} else {
104-
tunnelAddr = fs.RandomServer()
80+
workerID = fs.RandomServer()
10581
}
10682

107-
if tunnelAddr == "" {
83+
if workerID == "" {
10884
log.Error().Msg("No available nodes yet")
10985
return
11086
}
11187

112-
log.Debug().Msgf("Selected tunnel %s", tunnelAddr)
113-
114-
tunnelConn, err := net.Dial("tcp", tunnelAddr)
115-
if err != nil {
116-
log.Error().Err(err).Msg("Error connecting to tunnel")
88+
log.Debug().Msgf("Selected node %s", workerID)
89+
nodeData, exists := GetNode(fs.service, workerID)
90+
if !exists {
91+
log.Error().Msgf("Node %s not found", workerID)
11792
return
11893
}
11994

120-
log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
121-
closer := make(chan struct{}, 2)
122-
go copyStream(closer, tunnelConn, conn)
123-
go copyStream(closer, conn, tunnelConn)
124-
<-closer
125-
126-
tunnelConn.Close()
127-
conn.Close()
128-
95+
proxyP2PConnection(ctx, node, nodeData.ServiceID, conn)
12996
if fs.loadBalanced {
130-
fs.RecordRequest(tunnelAddr)
97+
fs.RecordRequest(workerID)
13198
}
13299
}()
133100
}

core/p2p/node.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type NodeData struct {
1414
Name string
1515
ID string
1616
TunnelAddress string
17+
ServiceID string
1718
LastSeen time.Time
1819
}
1920

@@ -39,6 +40,19 @@ func GetAvailableNodes(serviceID string) []NodeData {
3940
return availableNodes
4041
}
4142

43+
func GetNode(serviceID, nodeID string) (NodeData, bool) {
44+
if serviceID == "" {
45+
serviceID = defaultServicesID
46+
}
47+
mu.Lock()
48+
defer mu.Unlock()
49+
if _, ok := nodes[serviceID]; !ok {
50+
return NodeData{}, false
51+
}
52+
nd, exists := nodes[serviceID][nodeID]
53+
return nd, exists
54+
}
55+
4256
func AddNode(serviceID string, node NodeData) {
4357
if serviceID == "" {
4458
serviceID = defaultServicesID

core/p2p/p2p.go

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -66,22 +66,7 @@ func nodeID(s string) string {
6666
return fmt.Sprintf("%s-%s", hostname, s)
6767
}
6868

69-
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
70-
71-
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
72-
// Open local port for listening
73-
l, err := net.Listen("tcp", listenAddr)
74-
if err != nil {
75-
zlog.Error().Err(err).Msg("Error listening")
76-
return err
77-
}
78-
go func() {
79-
<-ctx.Done()
80-
l.Close()
81-
}()
82-
83-
// ll.Info("Binding local port on", srcaddr)
84-
69+
func nodeAnnounce(ctx context.Context, node *node.Node) {
8570
ledger, _ := node.Ledger()
8671

8772
// Announce ourselves so nodes accepts our connection
@@ -97,6 +82,66 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
9782
ledger.Add(protocol.UsersLedgerKey, updatedMap)
9883
},
9984
)
85+
}
86+
87+
func proxyP2PConnection(ctx context.Context, node *node.Node, serviceID string, conn net.Conn) {
88+
ledger, _ := node.Ledger()
89+
// Retrieve current ID for ip in the blockchain
90+
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
91+
service := &types.Service{}
92+
existingValue.Unmarshal(service)
93+
// If mismatch, update the blockchain
94+
if !found {
95+
zlog.Error().Msg("Service not found on blockchain")
96+
conn.Close()
97+
// ll.Debugf("service '%s' not found on blockchain", serviceID)
98+
return
99+
}
100+
101+
// Decode the Peer
102+
d, err := peer.Decode(service.PeerID)
103+
if err != nil {
104+
zlog.Error().Msg("cannot decode peer")
105+
106+
conn.Close()
107+
// ll.Debugf("could not decode peer '%s'", service.PeerID)
108+
return
109+
}
110+
111+
// Open a stream
112+
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
113+
if err != nil {
114+
zlog.Error().Err(err).Msg("cannot open stream peer")
115+
116+
conn.Close()
117+
// ll.Debugf("could not open stream '%s'", err.Error())
118+
return
119+
}
120+
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
121+
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String())
122+
closer := make(chan struct{}, 2)
123+
go copyStream(closer, stream, conn)
124+
go copyStream(closer, conn, stream)
125+
<-closer
126+
127+
stream.Close()
128+
conn.Close()
129+
}
130+
131+
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
132+
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
133+
// Open local port for listening
134+
l, err := net.Listen("tcp", listenAddr)
135+
if err != nil {
136+
zlog.Error().Err(err).Msg("Error listening")
137+
return err
138+
}
139+
go func() {
140+
<-ctx.Done()
141+
l.Close()
142+
}()
143+
144+
nodeAnnounce(ctx, node)
100145

101146
defer l.Close()
102147
for {
@@ -114,47 +159,7 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
114159

115160
// Handle connections in a new goroutine, forwarding to the p2p service
116161
go func() {
117-
// Retrieve current ID for ip in the blockchain
118-
existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, service)
119-
service := &types.Service{}
120-
existingValue.Unmarshal(service)
121-
// If mismatch, update the blockchain
122-
if !found {
123-
zlog.Error().Msg("Service not found on blockchain")
124-
conn.Close()
125-
// ll.Debugf("service '%s' not found on blockchain", serviceID)
126-
return
127-
}
128-
129-
// Decode the Peer
130-
d, err := peer.Decode(service.PeerID)
131-
if err != nil {
132-
zlog.Error().Msg("cannot decode peer")
133-
134-
conn.Close()
135-
// ll.Debugf("could not decode peer '%s'", service.PeerID)
136-
return
137-
}
138-
139-
// Open a stream
140-
stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
141-
if err != nil {
142-
zlog.Error().Msg("cannot open stream peer")
143-
144-
conn.Close()
145-
// ll.Debugf("could not open stream '%s'", err.Error())
146-
return
147-
}
148-
// ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
149-
zlog.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), stream.Conn().RemoteMultiaddr().String())
150-
closer := make(chan struct{}, 2)
151-
go copyStream(closer, stream, conn)
152-
go copyStream(closer, conn, stream)
153-
<-closer
154-
155-
stream.Close()
156-
conn.Close()
157-
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
162+
proxyP2PConnection(ctx, node, service, conn)
158163
}()
159164
}
160165
}
@@ -258,6 +263,7 @@ var muservice sync.Mutex
258263
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) {
259264
muservice.Lock()
260265
defer muservice.Unlock()
266+
nd.ServiceID = sserv
261267
if ndService, found := service[nd.Name]; !found {
262268
if !nd.IsOnline() {
263269
// if node is offline and not present, do nothing

0 commit comments

Comments
 (0)