1
+ use influx_db_client as influxdb;
2
+ use metrics;
3
+ use std:: env;
4
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
5
+ use timing;
6
+
7
+ const DEFAULT_METRICS_RATE : usize = 100 ;
8
+
9
+ pub struct Counter {
10
+ pub name : & ' static str ,
11
+
12
+ pub counts : AtomicUsize ,
13
+ pub times : AtomicUsize ,
14
+
15
+ pub lastlog : AtomicUsize ,
16
+ pub lograte : AtomicUsize ,
17
+ }
18
+
19
+ macro_rules! create_counter {
20
+ ( $name: expr, $lograte: expr) => {
21
+ Counter {
22
+ name: $name,
23
+ counts: AtomicUsize :: new( 0 ) ,
24
+ times: AtomicUsize :: new( 0 ) ,
25
+ lastlog: AtomicUsize :: new( 0 ) ,
26
+ lograte: AtomicUsize :: new( $lograte) ,
27
+ }
28
+ } ;
29
+ }
30
+
31
+ macro_rules! inc_counter {
32
+ ( $name: expr, $count: expr) => {
33
+ unsafe { $name. inc( $count) } ;
34
+ } ;
35
+ }
36
+
37
+ macro_rules! inc_new_counter_info {
38
+ ( $name: expr, $count: expr) => { {
39
+ inc_new_counter!( $name, $count, Level :: Info , 0 ) ;
40
+ } } ;
41
+ ( $name: expr, $count: expr, $lograte: expr) => { {
42
+ inc_new_counter!( $name, $count, Level :: Info , $lograte) ;
43
+ } } ;
44
+ }
45
+
46
+ macro_rules! inc_new_counter {
47
+ ( $name: expr, $count: expr, $level: expr, $lograte: expr) => { {
48
+ if log_enabled!( $level) {
49
+ static mut INC_NEW_COUNTER : Counter = create_counter!( $name, $lograte) ;
50
+ inc_counter!( INC_NEW_COUNTER , $count) ;
51
+ }
52
+ } } ;
53
+ }
54
+
55
+ impl Counter {
56
+ fn default_log_rate ( ) -> usize {
57
+ let v = env:: var ( "XPZ_DEFAULT_METRICS_RATE" )
58
+ . map ( |x| x. parse ( ) . unwrap_or ( DEFAULT_METRICS_RATE ) )
59
+ . unwrap_or ( DEFAULT_METRICS_RATE ) ;
60
+ if v == 0 {
61
+ DEFAULT_METRICS_RATE
62
+ } else {
63
+ v
64
+ }
65
+ }
66
+ pub fn inc ( & mut self , events : usize ) {
67
+ let counts = self . counts . fetch_add ( events, Ordering :: Relaxed ) ;
68
+ let times = self . times . fetch_add ( 1 , Ordering :: Relaxed ) ;
69
+ let mut lograte = self . lograte . load ( Ordering :: Relaxed ) ;
70
+ if lograte == 0 {
71
+ lograte = Counter :: default_log_rate ( ) ;
72
+ self . lograte . store ( lograte, Ordering :: Relaxed ) ;
73
+ }
74
+ if times % lograte == 0 && times > 0 {
75
+ let lastlog = self . lastlog . load ( Ordering :: Relaxed ) ;
76
+ info ! (
77
+ "COUNTER:{{\" name\" : \" {}\" , \" counts\" : {}, \" samples\" : {}, \" now\" : {}, \" events\" : {}}}" ,
78
+ self . name,
79
+ counts + events,
80
+ times,
81
+ timing:: timestamp( ) ,
82
+ events,
83
+ ) ;
84
+ metrics:: submit (
85
+ influxdb:: Point :: new ( & format ! ( "counter-{}" , self . name) )
86
+ . add_field (
87
+ "count" ,
88
+ influxdb:: Value :: Integer ( counts as i64 - lastlog as i64 ) ,
89
+ ) . to_owned ( ) ,
90
+ ) ;
91
+ self . lastlog
92
+ . compare_and_swap ( lastlog, counts, Ordering :: Relaxed ) ;
93
+ }
94
+ }
95
+ }
96
+ #[ cfg( test) ]
97
+ mod tests {
98
+ use counter:: { Counter , DEFAULT_METRICS_RATE } ;
99
+ use log:: Level ;
100
+ use std:: env;
101
+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
102
+ use std:: sync:: { Once , RwLock , ONCE_INIT } ;
103
+
104
+ fn get_env_lock ( ) -> & ' static RwLock < ( ) > {
105
+ static mut ENV_LOCK : Option < RwLock < ( ) > > = None ;
106
+ static INIT_HOOK : Once = ONCE_INIT ;
107
+
108
+ unsafe {
109
+ INIT_HOOK . call_once ( || {
110
+ ENV_LOCK = Some ( RwLock :: new ( ( ) ) ) ;
111
+ } ) ;
112
+ & ENV_LOCK . as_ref ( ) . unwrap ( )
113
+ }
114
+ }
115
+
116
+ #[ test]
117
+ fn test_counter ( ) {
118
+ let _readlock = get_env_lock ( ) . read ( ) ;
119
+ static mut COUNTER : Counter = create_counter ! ( "test" , 100 ) ;
120
+ let count = 1 ;
121
+ inc_counter ! ( COUNTER , count) ;
122
+ unsafe {
123
+ assert_eq ! ( COUNTER . counts. load( Ordering :: Relaxed ) , 1 ) ;
124
+ assert_eq ! ( COUNTER . times. load( Ordering :: Relaxed ) , 1 ) ;
125
+ assert_eq ! ( COUNTER . lograte. load( Ordering :: Relaxed ) , 100 ) ;
126
+ assert_eq ! ( COUNTER . lastlog. load( Ordering :: Relaxed ) , 0 ) ;
127
+ assert_eq ! ( COUNTER . name, "test" ) ;
128
+ }
129
+ for _ in 0 ..199 {
130
+ inc_counter ! ( COUNTER , 2 ) ;
131
+ }
132
+ unsafe {
133
+ assert_eq ! ( COUNTER . lastlog. load( Ordering :: Relaxed ) , 199 ) ;
134
+ }
135
+ inc_counter ! ( COUNTER , 2 ) ;
136
+ unsafe {
137
+ assert_eq ! ( COUNTER . lastlog. load( Ordering :: Relaxed ) , 399 ) ;
138
+ }
139
+ }
140
+ #[ test]
141
+ fn test_inc_new_counter ( ) {
142
+ let _readlock = get_env_lock ( ) . read ( ) ;
143
+ inc_new_counter_info ! ( "counter-1" , 1 ) ;
144
+ inc_new_counter_info ! ( "counter-2" , 1 , 2 ) ;
145
+ }
146
+ #[ test]
147
+ fn test_lograte ( ) {
148
+ let _readlock = get_env_lock ( ) . read ( ) ;
149
+ assert_eq ! (
150
+ Counter :: default_log_rate( ) ,
151
+ DEFAULT_METRICS_RATE ,
152
+ "default_log_rate() is {}, expected {}, XPZ_DEFAULT_METRICS_RATE environment variable set?" ,
153
+ Counter :: default_log_rate( ) ,
154
+ DEFAULT_METRICS_RATE ,
155
+ ) ;
156
+ static mut COUNTER : Counter = create_counter ! ( "test_lograte" , 0 ) ;
157
+ inc_counter ! ( COUNTER , 2 ) ;
158
+ unsafe {
159
+ assert_eq ! (
160
+ COUNTER . lograte. load( Ordering :: Relaxed ) ,
161
+ DEFAULT_METRICS_RATE
162
+ ) ;
163
+ }
164
+ }
165
+
166
+ #[ test]
167
+ fn test_lograte_env ( ) {
168
+ assert_ne ! ( DEFAULT_METRICS_RATE , 0 ) ;
169
+ let _writelock = get_env_lock ( ) . write ( ) ;
170
+ static mut COUNTER : Counter = create_counter ! ( "test_lograte_env" , 0 ) ;
171
+ env:: set_var ( "XPZ_DEFAULT_METRICS_RATE" , "50" ) ;
172
+ inc_counter ! ( COUNTER , 2 ) ;
173
+ unsafe {
174
+ assert_eq ! ( COUNTER . lograte. load( Ordering :: Relaxed ) , 50 ) ;
175
+ }
176
+
177
+ static mut COUNTER2 : Counter = create_counter ! ( "test_lograte_env" , 0 ) ;
178
+ env:: set_var ( "XPZ_DEFAULT_METRICS_RATE" , "0" ) ;
179
+ inc_counter ! ( COUNTER2 , 2 ) ;
180
+ unsafe {
181
+ assert_eq ! (
182
+ COUNTER2 . lograte. load( Ordering :: Relaxed ) ,
183
+ DEFAULT_METRICS_RATE
184
+ ) ;
185
+ }
186
+ }
187
+ }
188
+
189
+ 01
0 commit comments