Skip to content

Commit 9fec886

Browse files
cheyangk8s-ci-robot
authored andcommitted
Fix long latency of 'arena list' (kubeflow#93)
* add profile rate * add set profile * update document with how profiling * add tracing * support trace * fix long latency when using arena list * fix cache
1 parent af5dfe9 commit 9fec886

File tree

11 files changed

+85
-27
lines changed

11 files changed

+85
-27
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,20 @@ make
5353

5454
`arena` binary is located in directory `arena/bin`. You may want to add the directory to `$PATH`.
5555

56+
## CPU Profiling
57+
58+
```
59+
# set profile rate (HZ)
60+
export PROFILE_RATE=1000
61+
62+
# arena {command} --pprof
63+
arena list --pprof
64+
INFO[0000] Dump cpu profile file into /tmp/cpu_profile
65+
```
66+
67+
Then you can analyze the profile by following [Go CPU profiling: pprof and speedscope](https://coder.today/go-profiling-pprof-and-speedscope-b05b812cc429)
68+
69+
5670
## CLI Document
5771

5872
Please refer to [arena.md](docs/cli/arena.md)

cmd/arena/commands/common.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ var (
3535
// To reduce client-go API call, for 'arena list' scenario
3636
allPods []v1.Pod
3737
allJobs []batchv1.Job
38+
useCache bool
3839
name string
3940
namespace string
4041
arenaNamespace string // the system namespace of arena

cmd/arena/commands/list.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ import (
2020
"strings"
2121
"text/tabwriter"
2222

23+
"io"
24+
2325
"github.com/kubeflow/arena/util"
2426
"github.com/kubeflow/arena/util/helm"
2527
log "github.com/sirupsen/logrus"
2628
"github.com/spf13/cobra"
27-
"io"
2829
)
2930

3031
func NewListCommand() *cobra.Command {
@@ -46,7 +47,8 @@ func NewListCommand() *cobra.Command {
4647
fmt.Println(err)
4748
os.Exit(1)
4849
}
49-
50+
// determine use cache
51+
useCache = true
5052
allPods, err = acquireAllPods(client)
5153
if err != nil {
5254
fmt.Println(err)
@@ -111,7 +113,7 @@ func displayTrainingJobList(jobInfoList []TrainingJob, displayGPU bool) {
111113
_ = w.Flush()
112114
}
113115

114-
func PrintLine(w io.Writer, fields ...string) {
116+
func PrintLine(w io.Writer, fields ...string) {
115117
//w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
116118
buffer := strings.Join(fields, "\t")
117119
fmt.Fprintln(w, buffer)

cmd/arena/commands/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ var (
3535
loadingRules *clientcmd.ClientConfigLoadingRules
3636
logLevel string
3737
enablePProf bool
38+
enableTrace bool
3839
)
3940

4041
// NewCommand returns a new instance of an Arena command
@@ -56,6 +57,7 @@ func NewCommand() *cobra.Command {
5657
// enable logging
5758
command.PersistentFlags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
5859
command.PersistentFlags().BoolVar(&enablePProf, "pprof", false, "enable cpu profile")
60+
command.PersistentFlags().BoolVar(&enableTrace, "trace", false, "enable trace")
5961
command.PersistentFlags().StringVar(&arenaNamespace, "arenaNamespace", "arena-system", "The namespace of arena system service, like TFJob")
6062

6163
command.AddCommand(NewSubmitCommand())

cmd/arena/commands/top_job.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ import (
2121
log "github.com/sirupsen/logrus"
2222
"github.com/spf13/cobra"
2323

24-
"github.com/kubeflow/arena/util/helm"
2524
"strconv"
2625
"text/tabwriter"
27-
"k8s.io/api/core/v1"
2826
"time"
27+
28+
"github.com/kubeflow/arena/util/helm"
29+
"k8s.io/api/core/v1"
2930
)
3031

3132
func NewTopJobCommand() *cobra.Command {
@@ -41,7 +42,7 @@ func NewTopJobCommand() *cobra.Command {
4142
fmt.Println(err)
4243
os.Exit(1)
4344
}
44-
printStart:
45+
printStart:
4546
releaseMap, err := helm.ListReleaseMap()
4647
// log.Printf("releaseMap %v", releaseMap)
4748
if err != nil {
@@ -54,7 +55,7 @@ func NewTopJobCommand() *cobra.Command {
5455
// fmt.Println(err)
5556
// os.Exit(1)
5657
// }
57-
58+
useCache = true
5859
allPods, err = acquireAllPods(client)
5960
if err != nil {
6061
fmt.Println(err)
@@ -110,12 +111,11 @@ func NewTopJobCommand() *cobra.Command {
110111
},
111112
}
112113

113-
command.Flags().BoolVarP(&printNotStop, "refresh", "r", false, "Display continuously")
114-
command.Flags().StringVarP(&instanceName, "instance", "i", "", "Display instance top info")
114+
command.Flags().BoolVarP(&printNotStop, "refresh", "r", false, "Display continuously")
115+
command.Flags().StringVarP(&instanceName, "instance", "i", "", "Display instance top info")
115116
return command
116117
}
117118

118-
119119
func topTrainingJob(jobInfoList []TrainingJob, showSpecificJobMetric bool, instanceName string, notStop bool) {
120120
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
121121
var (
@@ -161,7 +161,7 @@ func topTrainingJob(jobInfoList []TrainingJob, showSpecificJobMetric bool, insta
161161
status,
162162
hostIP,
163163
)
164-
}else {
164+
} else {
165165
index := 0
166166
keys := SortMapKeys(podMetric)
167167
for _, gid := range keys {
@@ -175,16 +175,16 @@ func topTrainingJob(jobInfoList []TrainingJob, showSpecificJobMetric bool, insta
175175
podName,
176176
guid,
177177
fmt.Sprintf("%.0f%%", gpuMetric.GpuDutyCycle),
178-
fmt.Sprintf("%.1fMiB / %.1fMiB ", fromByteToMiB(gpuMetric.GpuMemoryUsed) , fromByteToMiB(gpuMetric.GpuMemoryTotal) ),
178+
fmt.Sprintf("%.1fMiB / %.1fMiB ", fromByteToMiB(gpuMetric.GpuMemoryUsed), fromByteToMiB(gpuMetric.GpuMemoryTotal)),
179179
status,
180180
hostIP,
181181
)
182-
index ++
182+
index++
183183
}
184184
}
185185
}
186186
}
187-
}else {
187+
} else {
188188
for _, jobInfo := range jobInfoList {
189189

190190
hostIP := jobInfo.HostIPOfChief()

cmd/arena/commands/top_node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func NewTopNodeCommand() *cobra.Command {
5151
fmt.Println(err)
5252
os.Exit(1)
5353
}
54-
allPods, err := acquireAllActivePods(client)
54+
allPods, err = acquireAllActivePods(client)
5555
if err != nil {
5656
fmt.Println(err)
5757
os.Exit(1)

cmd/arena/commands/trainer_horovod.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func NewHorovodJobTrainer(client *kubernetes.Clientset) Trainer {
106106
func (m *HorovodJobTrainer) IsSupported(name, ns string) bool {
107107
isHorovod := false
108108

109-
if len(allJobs) > 0 {
109+
if useCache {
110110
for _, job := range allJobs {
111111
if isHorovodJob(name, ns, job) {
112112
isHorovod = true
@@ -138,7 +138,7 @@ func (m *HorovodJobTrainer) Type() string {
138138
}
139139

140140
func (m *HorovodJobTrainer) GetTrainingJob(name, namespace string) (tj TrainingJob, err error) {
141-
if len(allPods) > 0 {
141+
if useCache {
142142
tj, err = m.getTrainingJobFromCache(name, namespace)
143143
} else {
144144
tj, err = m.getTrainingJob(name, namespace)

cmd/arena/commands/trainer_mpi.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func NewMPIJobTrainer(client *kubernetes.Clientset) Trainer {
201201
}
202202
}
203203
// allPods have been cached, we do the same to allMPIjobs
204-
if len(allPods) > 0 {
204+
if useCache {
205205
mpijobList, err := mpijobClient.KubeflowV1alpha1().MPIJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
206206
// mpijobList, err := mpijobClient.KubeflowV1alpha2().mpijob(metav1.NamespaceAll).List(metav1.ListOptions{})
207207
if err != nil {
@@ -238,7 +238,7 @@ func (tt *MPIJobTrainer) IsSupported(name, ns string) bool {
238238

239239
isMPI := false
240240

241-
if len(allMPIjobs) > 0 {
241+
if useCache {
242242
for _, job := range allMPIjobs {
243243
if tt.isMPIJob(name, ns, job) {
244244
isMPI = true

cmd/arena/commands/trainer_standalone.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (s *StandaloneJobTrainer) Type() string {
8585
func (s *StandaloneJobTrainer) IsSupported(name, ns string) bool {
8686
supported := false
8787

88-
if len(allJobs) > 0 {
88+
if useCache {
8989
for _, job := range allJobs {
9090
if isStandaloneJob(name, ns, job) {
9191
supported = true
@@ -113,7 +113,7 @@ func (s *StandaloneJobTrainer) IsSupported(name, ns string) bool {
113113
}
114114

115115
func (s *StandaloneJobTrainer) GetTrainingJob(name, namespace string) (tj TrainingJob, err error) {
116-
if len(allPods) > 0 {
116+
if useCache {
117117
tj, err = s.getTrainingJobFromCache(name, namespace)
118118
} else {
119119
tj, err = s.getTrainingJob(name, namespace)

cmd/arena/commands/trainer_tensorflow.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func NewTensorFlowJobTrainer(client *kubernetes.Clientset) Trainer {
197197
}
198198
}
199199
// allPods have been cached, we do the same to allTfjobs
200-
if len(allPods) > 0 {
200+
if useCache {
201201
tfjobList, err := tfjobClient.KubeflowV1alpha2().TFJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
202202
if err != nil {
203203
log.Debugf("unsupported tfjobs due to %v", err)
@@ -232,7 +232,7 @@ func (tt *TensorFlowJobTrainer) IsSupported(name, ns string) bool {
232232

233233
isTensorFlow := false
234234

235-
if len(allTfjobs) > 0 {
235+
if useCache {
236236
for _, job := range allTfjobs {
237237
if tt.isTensorFlowJob(name, ns, job) {
238238
isTensorFlow = true
@@ -258,7 +258,7 @@ func (tt *TensorFlowJobTrainer) IsSupported(name, ns string) bool {
258258
}
259259

260260
func (tt *TensorFlowJobTrainer) GetTrainingJob(name, namespace string) (tj TrainingJob, err error) {
261-
if len(allTfjobs) > 0 {
261+
if useCache {
262262
tj, err = tt.getTrainingJobFromCache(name, namespace)
263263
} else {
264264
tj, err = tt.getTrainingJob(name, namespace)
@@ -269,7 +269,7 @@ func (tt *TensorFlowJobTrainer) GetTrainingJob(name, namespace string) (tj Train
269269

270270
func (tt *TensorFlowJobTrainer) getTrainingJob(name, namespace string) (TrainingJob, error) {
271271
var (
272-
tfjob tfv1alpha2.TFJob
272+
tfjob tfv1alpha2.TFJob
273273
)
274274

275275
// 1. Get the batchJob of training Job
@@ -318,7 +318,7 @@ func (tt *TensorFlowJobTrainer) getTrainingJob(name, namespace string) (Training
318318
func (tt *TensorFlowJobTrainer) getTrainingJobFromCache(name, ns string) (TrainingJob, error) {
319319

320320
var (
321-
tfjob tfv1alpha2.TFJob
321+
tfjob tfv1alpha2.TFJob
322322
)
323323

324324
// 1. Find the batch job
@@ -329,7 +329,6 @@ func (tt *TensorFlowJobTrainer) getTrainingJobFromCache(name, ns string) (Traini
329329
}
330330
}
331331

332-
333332
// 2. Find the pods, and determine the pod of the job
334333
pods, chiefPod := getPodsOfTFJob(tt, tfjob, allPods)
335334

0 commit comments

Comments
 (0)