@@ -7,24 +7,63 @@ import (
7
7
"log"
8
8
"net/http"
9
9
"path/filepath"
10
+ "sync"
10
11
11
12
"github.com/fsnotify/fsnotify"
12
13
"github.com/oklog/run"
13
14
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
14
15
"github.com/prometheus/client_golang/prometheus"
15
16
"github.com/prometheus/client_golang/prometheus/promhttp"
17
+ "github.com/prometheus/prometheus/pkg/labels"
18
+ "github.com/prometheus/prometheus/promql/parser"
16
19
"github.com/pyrra-dev/pyrra/kubernetes/api/v1alpha1"
17
20
"github.com/pyrra-dev/pyrra/openapi"
18
21
openapiserver "github.com/pyrra-dev/pyrra/openapi/server/go"
19
22
"github.com/pyrra-dev/pyrra/slo"
20
23
"sigs.k8s.io/yaml"
21
24
)
22
25
23
- var objectives = map [string ]slo.Objective {}
26
+ type Objectives struct {
27
+ mu sync.RWMutex
28
+ objectives map [string ]slo.Objective
29
+ }
24
30
25
- func cmdFilesystem (configFiles , prometheusFolder string ) {
26
- var gr run.Group
31
+ func (os * Objectives ) Set (o slo.Objective ) {
32
+ os .mu .Lock ()
33
+ os .objectives [o .Labels .String ()] = o
34
+ os .mu .Unlock ()
35
+ }
36
+
37
+ func (os * Objectives ) Match (ms []* labels.Matcher ) []slo.Objective {
38
+ if ms == nil || len (ms ) == 0 {
39
+ os .mu .RLock ()
40
+ objectives := make ([]slo.Objective , 0 , len (os .objectives ))
41
+ for _ , o := range os .objectives {
42
+ objectives = append (objectives , o )
43
+ }
44
+ os .mu .RUnlock ()
45
+ return objectives
46
+ }
47
+
48
+ os .mu .RLock ()
49
+ defer os .mu .RUnlock ()
50
+
51
+ var objectives []slo.Objective
27
52
53
+ for _ , o := range os .objectives {
54
+ for _ , m := range ms {
55
+ v := o .Labels .Get (m .Name )
56
+ if ! m .Matches (v ) {
57
+ continue
58
+ }
59
+ objectives = append (objectives , o )
60
+ }
61
+ }
62
+
63
+ return objectives
64
+ }
65
+
66
+ func cmdFilesystem (configFiles , prometheusFolder string ) {
28
67
reg := prometheus .NewRegistry ()
29
68
30
69
reconcilesTotal := prometheus .NewCounter (prometheus.CounterOpts {
@@ -43,8 +82,10 @@ func cmdFilesystem(configFiles, prometheusFolder string) {
43
82
)
44
83
45
84
ctx , cancel := context .WithCancel (context .Background ())
85
+ objectives := & Objectives {objectives : map [string ]slo.Objective {}}
46
86
files := make (chan string , 16 )
47
87
88
+ var gr run.Group
48
89
{
49
90
gr .Add (func () error {
50
91
// Initially read all files and send them to be processed and added to the in memory store.
@@ -143,7 +184,7 @@ func cmdFilesystem(configFiles, prometheusFolder string) {
143
184
return err
144
185
}
145
186
146
- objectives [ objective . Labels . String ()] = objective
187
+ objectives . Set ( objective )
147
188
}
148
189
}
149
190
}, func (err error ) {
@@ -152,7 +193,9 @@ func cmdFilesystem(configFiles, prometheusFolder string) {
152
193
}
153
194
{
154
195
router := openapiserver .NewRouter (
155
- openapiserver .NewObjectivesApiController (& FilesystemObjectiveServer {}),
196
+ openapiserver .NewObjectivesApiController (& FilesystemObjectiveServer {
197
+ objectives : objectives ,
198
+ }),
156
199
)
157
200
router .Handle ("/metrics" , promhttp .HandlerFor (reg , promhttp.HandlerOpts {}))
158
201
@@ -172,12 +215,24 @@ func cmdFilesystem(configFiles, prometheusFolder string) {
172
215
}
173
216
}
174
217
175
- type FilesystemObjectiveServer struct {}
218
+ type FilesystemObjectiveServer struct {
219
+ objectives * Objectives
220
+ }
176
221
177
- func (f FilesystemObjectiveServer ) ListObjectives (ctx context.Context ) (openapiserver.ImplResponse , error ) {
222
+ func (f FilesystemObjectiveServer ) ListObjectives (ctx context.Context , query string ) (openapiserver.ImplResponse , error ) {
223
+ var matchers []* labels.Matcher
224
+ if query != "" {
225
+ var err error
226
+ matchers , err = parser .ParseMetricSelector (query )
227
+ if err != nil {
228
+ return openapiserver.ImplResponse {Code : http .StatusBadRequest }, err
229
+ }
230
+ }
231
+
232
+ objectives := f .objectives .Match (matchers )
178
233
list := make ([]openapiserver.Objective , 0 , len (objectives ))
179
- for _ , objective := range objectives {
180
- list = append (list , openapi .ServerFromInternal (objective ))
234
+ for _ , o := range objectives {
235
+ list = append (list , openapi .ServerFromInternal (o ))
181
236
}
182
237
183
238
return openapiserver.ImplResponse {
@@ -188,7 +243,10 @@ func (f FilesystemObjectiveServer) ListObjectives(ctx context.Context) (openapis
188
243
189
244
func (f FilesystemObjectiveServer ) GetObjective (ctx context.Context , expr string ) (openapiserver.ImplResponse , error ) {
190
245
// TODO: Parse expr to match properly
191
- objective , ok := objectives [expr ]
246
+
247
+ f .objectives .mu .RLock ()
248
+ objective , ok := f .objectives .objectives [expr ]
249
+ f .objectives .mu .RUnlock ()
192
250
if ! ok {
193
251
return openapiserver.ImplResponse {Code : http .StatusNotFound }, nil
194
252
}
0 commit comments