Skip to content

Commit af09520

Browse files
authored
fix(p2p): avoid starting the node twice (#3349)
* fix(p2p): avoid starting the node twice Signed-off-by: Ettore Di Giacinto <[email protected]> * fix(p2p): keep exposing service if we don't start the llama.cpp runner Signed-off-by: Ettore Di Giacinto <[email protected]> --------- Signed-off-by: Ettore Di Giacinto <[email protected]>
1 parent 70e53bc commit af09520

File tree

3 files changed

+35
-34
lines changed

3 files changed

+35
-34
lines changed

core/cli/run.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,15 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
120120
if err != nil {
121121
return err
122122
}
123+
nodeContext := context.Background()
124+
125+
err = node.Start(nodeContext)
126+
if err != nil {
127+
return fmt.Errorf("starting new node: %w", err)
128+
}
123129

124130
log.Info().Msg("Starting P2P server discovery...")
125-
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
131+
if err := p2p.ServiceDiscoverer(nodeContext, node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
126132
var tunnelAddresses []string
127133
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) {
128134
if v.IsOnline() {
@@ -146,6 +152,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
146152
return err
147153
}
148154
fedCtx := context.Background()
155+
149156
node, err := p2p.ExposeService(fedCtx, "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID))
150157
if err != nil {
151158
return err

core/cli/worker/worker_p2p.go

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -65,44 +65,42 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
6565
return err
6666
}
6767
log.Info().Msgf("You need to start llama-cpp-rpc-server on '%s:%s'", address, p)
68+
} else {
69+
// Start llama.cpp directly from the version we have pre-packaged
70+
go func() {
71+
for {
72+
log.Info().Msgf("Starting llama-cpp-rpc-server on '%s:%d'", address, port)
6873

69-
return nil
70-
}
71-
72-
// Start llama.cpp directly from the version we have pre-packaged
73-
go func() {
74-
for {
75-
log.Info().Msgf("Starting llama-cpp-rpc-server on '%s:%d'", address, port)
74+
grpcProcess := assets.ResolvePath(
75+
r.BackendAssetsPath,
76+
"util",
77+
"llama-cpp-rpc-server",
78+
)
7679

77-
grpcProcess := assets.ResolvePath(
78-
r.BackendAssetsPath,
79-
"util",
80-
"llama-cpp-rpc-server",
81-
)
80+
args := append([]string{"--host", address, "--port", fmt.Sprint(port)}, r.ExtraLLamaCPPArgs...)
81+
args, grpcProcess = library.LoadLDSO(r.BackendAssetsPath, args, grpcProcess)
8282

83-
args := append([]string{"--host", address, "--port", fmt.Sprint(port)}, r.ExtraLLamaCPPArgs...)
84-
args, grpcProcess = library.LoadLDSO(r.BackendAssetsPath, args, grpcProcess)
83+
cmd := exec.Command(
84+
grpcProcess, args...,
85+
)
8586

86-
cmd := exec.Command(
87-
grpcProcess, args...,
88-
)
87+
cmd.Env = os.Environ()
8988

90-
cmd.Env = os.Environ()
89+
cmd.Stderr = os.Stdout
90+
cmd.Stdout = os.Stdout
9191

92-
cmd.Stderr = os.Stdout
93-
cmd.Stdout = os.Stdout
92+
if err := cmd.Start(); err != nil {
93+
log.Error().Any("grpcProcess", grpcProcess).Any("args", args).Err(err).Msg("Failed to start llama-cpp-rpc-server")
94+
}
9495

95-
if err := cmd.Start(); err != nil {
96-
log.Error().Any("grpcProcess", grpcProcess).Any("args", args).Err(err).Msg("Failed to start llama-cpp-rpc-server")
96+
cmd.Wait()
9797
}
98+
}()
9899

99-
cmd.Wait()
100+
_, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
101+
if err != nil {
102+
return err
100103
}
101-
}()
102-
103-
_, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
104-
if err != nil {
105-
return err
106104
}
107105

108106
for {

core/p2p/p2p.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,9 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri
202202
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) {
203203
tunnels := make(chan NodeData)
204204

205-
err := n.Start(ctx)
206-
if err != nil {
207-
return nil, fmt.Errorf("creating a new node: %w", err)
208-
}
209205
ledger, err := n.Ledger()
210206
if err != nil {
211-
return nil, fmt.Errorf("creating a new node: %w", err)
207+
return nil, fmt.Errorf("getting the ledger: %w", err)
212208
}
213209
// get new services, allocate and return to the channel
214210

0 commit comments

Comments
 (0)