Skip to content

Commit 1f3e304

Browse files
committed
TUN-8701: Add metrics and adjust logs for datagram v3
Closes TUN-8701
1 parent 952622a commit 1f3e304

File tree

11 files changed

+189
-62
lines changed

11 files changed

+189
-62
lines changed

connection/quic_datagram_v3.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/quic-go/quic-go"
1111
"github.com/rs/zerolog"
1212

13+
"github.com/cloudflare/cloudflared/management"
1314
cfdquic "github.com/cloudflare/cloudflared/quic/v3"
1415
"github.com/cloudflare/cloudflared/tunnelrpc/pogs"
1516
)
@@ -25,9 +26,15 @@ func NewDatagramV3Connection(ctx context.Context,
2526
conn quic.Connection,
2627
sessionManager cfdquic.SessionManager,
2728
index uint8,
29+
metrics cfdquic.Metrics,
2830
logger *zerolog.Logger,
2931
) DatagramSessionHandler {
30-
datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, index, logger)
32+
log := logger.
33+
With().
34+
Int(management.EventTypeKey, int(management.UDP)).
35+
Uint8(LogFieldConnIndex, index).
36+
Logger()
37+
datagramMuxer := cfdquic.NewDatagramConn(conn, sessionManager, index, metrics, &log)
3138

3239
return &datagramV3Connection{
3340
conn,

quic/v3/manager.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ import (
1313

1414
var (
1515
// ErrSessionNotFound indicates that a session has not been registered yet for the request id.
16-
ErrSessionNotFound = errors.New("session not found")
16+
ErrSessionNotFound = errors.New("flow not found")
1717
// ErrSessionBoundToOtherConn is returned when a registration already exists for a different connection.
18-
ErrSessionBoundToOtherConn = errors.New("session is in use by another connection")
18+
ErrSessionBoundToOtherConn = errors.New("flow is in use by another connection")
1919
// ErrSessionAlreadyRegistered is returned when a registration already exists for this connection.
20-
ErrSessionAlreadyRegistered = errors.New("session is already registered for this connection")
20+
ErrSessionAlreadyRegistered = errors.New("flow is already registered for this connection")
2121
)
2222

2323
type SessionManager interface {
@@ -39,12 +39,14 @@ type DialUDP func(dest netip.AddrPort) (*net.UDPConn, error)
3939
type sessionManager struct {
4040
sessions map[RequestID]Session
4141
mutex sync.RWMutex
42+
metrics Metrics
4243
log *zerolog.Logger
4344
}
4445

45-
func NewSessionManager(log *zerolog.Logger, originDialer DialUDP) SessionManager {
46+
func NewSessionManager(metrics Metrics, log *zerolog.Logger, originDialer DialUDP) SessionManager {
4647
return &sessionManager{
4748
sessions: make(map[RequestID]Session),
49+
metrics: metrics,
4850
log: log,
4951
}
5052
}
@@ -65,7 +67,7 @@ func (s *sessionManager) RegisterSession(request *UDPSessionRegistrationDatagram
6567
return nil, err
6668
}
6769
// Create and insert the new session in the map
68-
session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.log)
70+
session := NewSession(request.RequestID, request.IdleDurationHint, origin, conn, s.metrics, s.log)
6971
s.sessions[request.RequestID] = session
7072
return session, nil
7173
}

quic/v3/manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515

1616
func TestRegisterSession(t *testing.T) {
1717
log := zerolog.Nop()
18-
manager := v3.NewSessionManager(&log, ingress.DialUDPAddrPort)
18+
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort)
1919

2020
request := v3.UDPSessionRegistrationDatagram{
2121
RequestID: testRequestID,
@@ -71,7 +71,7 @@ func TestRegisterSession(t *testing.T) {
7171

7272
func TestGetSession_Empty(t *testing.T) {
7373
log := zerolog.Nop()
74-
manager := v3.NewSessionManager(&log, ingress.DialUDPAddrPort)
74+
manager := v3.NewSessionManager(&noopMetrics{}, &log, ingress.DialUDPAddrPort)
7575

7676
_, err := manager.GetSession(testRequestID)
7777
if !errors.Is(err, v3.ErrSessionNotFound) {

quic/v3/metrics.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package v3
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
)
6+
7+
const (
8+
namespace = "cloudflared"
9+
subsystem = "udp"
10+
)
11+
12+
type Metrics interface {
13+
IncrementFlows()
14+
DecrementFlows()
15+
PayloadTooLarge()
16+
RetryFlowResponse()
17+
MigrateFlow()
18+
}
19+
20+
type metrics struct {
21+
activeUDPFlows prometheus.Gauge
22+
totalUDPFlows prometheus.Counter
23+
payloadTooLarge prometheus.Counter
24+
retryFlowResponses prometheus.Counter
25+
migratedFlows prometheus.Counter
26+
}
27+
28+
func (m *metrics) IncrementFlows() {
29+
m.totalUDPFlows.Inc()
30+
m.activeUDPFlows.Inc()
31+
}
32+
33+
func (m *metrics) DecrementFlows() {
34+
m.activeUDPFlows.Dec()
35+
}
36+
37+
func (m *metrics) PayloadTooLarge() {
38+
m.payloadTooLarge.Inc()
39+
}
40+
41+
func (m *metrics) RetryFlowResponse() {
42+
m.retryFlowResponses.Inc()
43+
}
44+
45+
func (m *metrics) MigrateFlow() {
46+
m.migratedFlows.Inc()
47+
}
48+
49+
func NewMetrics(registerer prometheus.Registerer) Metrics {
50+
m := &metrics{
51+
activeUDPFlows: prometheus.NewGauge(prometheus.GaugeOpts{
52+
Namespace: namespace,
53+
Subsystem: subsystem,
54+
Name: "active_flows",
55+
Help: "Concurrent count of UDP flows that are being proxied to any origin",
56+
}),
57+
totalUDPFlows: prometheus.NewCounter(prometheus.CounterOpts{
58+
Namespace: namespace,
59+
Subsystem: subsystem,
60+
Name: "total_flows",
61+
Help: "Total count of UDP flows that have been proxied to any origin",
62+
}),
63+
payloadTooLarge: prometheus.NewCounter(prometheus.CounterOpts{
64+
Namespace: namespace,
65+
Subsystem: subsystem,
66+
Name: "payload_too_large",
67+
Help: "Total count of UDP flows that have had origin payloads that are too large to proxy",
68+
}),
69+
retryFlowResponses: prometheus.NewCounter(prometheus.CounterOpts{
70+
Namespace: namespace,
71+
Subsystem: subsystem,
72+
Name: "retry_flow_responses",
73+
Help: "Total count of UDP flows that have had to send their registration response more than once",
74+
}),
75+
migratedFlows: prometheus.NewCounter(prometheus.CounterOpts{
76+
Namespace: namespace,
77+
Subsystem: subsystem,
78+
Name: "migrated_flows",
79+
Help: "Total count of UDP flows have been migrated across local connections",
80+
}),
81+
}
82+
registerer.MustRegister(
83+
m.activeUDPFlows,
84+
m.totalUDPFlows,
85+
m.payloadTooLarge,
86+
m.retryFlowResponses,
87+
m.migratedFlows,
88+
)
89+
return m
90+
}

quic/v3/metrics_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package v3_test
2+
3+
type noopMetrics struct{}
4+
5+
func (noopMetrics) IncrementFlows() {}
6+
func (noopMetrics) DecrementFlows() {}
7+
func (noopMetrics) PayloadTooLarge() {}
8+
func (noopMetrics) RetryFlowResponse() {}
9+
func (noopMetrics) MigrateFlow() {}

quic/v3/muxer.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,20 @@ type datagramConn struct {
4545
conn QuicConnection
4646
index uint8
4747
sessionManager SessionManager
48+
metrics Metrics
4849
logger *zerolog.Logger
4950

5051
datagrams chan []byte
5152
readErrors chan error
5253
}
5354

54-
func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, index uint8, logger *zerolog.Logger) DatagramConn {
55+
func NewDatagramConn(conn QuicConnection, sessionManager SessionManager, index uint8, metrics Metrics, logger *zerolog.Logger) DatagramConn {
5556
log := logger.With().Uint8("datagramVersion", 3).Logger()
5657
return &datagramConn{
5758
conn: conn,
5859
index: index,
5960
sessionManager: sessionManager,
61+
metrics: metrics,
6062
logger: &log,
6163
datagrams: make(chan []byte, demuxChanCapacity),
6264
readErrors: make(chan error, 2),
@@ -143,19 +145,21 @@ func (c *datagramConn) Serve(ctx context.Context) error {
143145
c.logger.Err(err).Msgf("unable to unmarshal session registration datagram")
144146
return
145147
}
148+
logger := c.logger.With().Str(logFlowID, reg.RequestID.String()).Logger()
146149
// We bind the new session to the quic connection context instead of cloudflared context to allow for the
147150
// quic connection to close and close only the sessions bound to it. Closing of cloudflared will also
148151
// initiate the close of the quic connection, so we don't have to worry about the application context
149152
// in the scope of a session.
150-
c.handleSessionRegistrationDatagram(connCtx, reg)
153+
c.handleSessionRegistrationDatagram(connCtx, reg, &logger)
151154
case UDPSessionPayloadType:
152155
payload := &UDPSessionPayloadDatagram{}
153156
err := payload.UnmarshalBinary(datagram)
154157
if err != nil {
155158
c.logger.Err(err).Msgf("unable to unmarshal session payload datagram")
156159
return
157160
}
158-
c.handleSessionPayloadDatagram(payload)
161+
logger := c.logger.With().Str(logFlowID, payload.RequestID.String()).Logger()
162+
c.handleSessionPayloadDatagram(payload, &logger)
159163
case UDPSessionRegistrationResponseType:
160164
// cloudflared should never expect to receive UDP session responses as it will not initiate new
161165
// sessions towards the edge.
@@ -169,31 +173,33 @@ func (c *datagramConn) Serve(ctx context.Context) error {
169173
}
170174

171175
// This method handles new registrations of a session and the serve loop for the session.
172-
func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram) {
176+
func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, datagram *UDPSessionRegistrationDatagram, logger *zerolog.Logger) {
173177
session, err := c.sessionManager.RegisterSession(datagram, c)
174178
switch err {
175179
case nil:
176180
// Continue as normal
177181
case ErrSessionAlreadyRegistered:
178182
// Session is already registered and likely the response got lost
179-
c.handleSessionAlreadyRegistered(datagram.RequestID)
183+
c.handleSessionAlreadyRegistered(datagram.RequestID, logger)
180184
return
181185
case ErrSessionBoundToOtherConn:
182186
// Session is already registered but to a different connection
183-
c.handleSessionMigration(datagram.RequestID)
187+
c.handleSessionMigration(datagram.RequestID, logger)
184188
return
185189
default:
186-
c.logger.Err(err).Msgf("session registration failure")
187-
c.handleSessionRegistrationFailure(datagram.RequestID)
190+
logger.Err(err).Msgf("flow registration failure")
191+
c.handleSessionRegistrationFailure(datagram.RequestID, logger)
188192
return
189193
}
194+
c.metrics.IncrementFlows()
190195
// Make sure to eventually remove the session from the session manager when the session is closed
191196
defer c.sessionManager.UnregisterSession(session.ID())
197+
defer c.metrics.DecrementFlows()
192198

193199
// Respond that we are able to process the new session
194200
err = c.SendUDPSessionResponse(datagram.RequestID, ResponseOk)
195201
if err != nil {
196-
c.logger.Err(err).Msgf("session registration failure: unable to send session registration response")
202+
logger.Err(err).Msgf("flow registration failure: unable to send session registration response")
197203
return
198204
}
199205

@@ -203,24 +209,24 @@ func (c *datagramConn) handleSessionRegistrationDatagram(ctx context.Context, da
203209
if err == nil {
204210
// We typically don't expect a session to close without some error response. [SessionIdleErr] is the typical
205211
// expected error response.
206-
c.logger.Warn().Msg("session was closed without explicit close or timeout")
212+
logger.Warn().Msg("flow was closed without explicit close or timeout")
207213
return
208214
}
209215
// SessionIdleErr and SessionCloseErr are valid and successful error responses to end a session.
210216
if errors.Is(err, SessionIdleErr{}) || errors.Is(err, SessionCloseErr) {
211-
c.logger.Debug().Msg(err.Error())
217+
logger.Debug().Msg(err.Error())
212218
return
213219
}
214220

215221
// All other errors should be reported as errors
216-
c.logger.Err(err).Msgf("session was closed with an error")
222+
logger.Err(err).Msgf("flow was closed with an error")
217223
}
218224

219-
func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID) {
225+
func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID, logger *zerolog.Logger) {
220226
// Send another registration response since the session is already active
221227
err := c.SendUDPSessionResponse(requestID, ResponseOk)
222228
if err != nil {
223-
c.logger.Err(err).Msgf("session registration failure: unable to send an additional session registration response")
229+
logger.Err(err).Msgf("flow registration failure: unable to send an additional flow registration response")
224230
return
225231
}
226232

@@ -233,9 +239,10 @@ func (c *datagramConn) handleSessionAlreadyRegistered(requestID RequestID) {
233239
// The session is already running in another routine so we want to restart the idle timeout since no proxied
234240
// packets have come down yet.
235241
session.ResetIdleTimer()
242+
c.metrics.RetryFlowResponse()
236243
}
237244

238-
func (c *datagramConn) handleSessionMigration(requestID RequestID) {
245+
func (c *datagramConn) handleSessionMigration(requestID RequestID, logger *zerolog.Logger) {
239246
// We need to migrate the currently running session to this edge connection.
240247
session, err := c.sessionManager.GetSession(requestID)
241248
if err != nil {
@@ -250,29 +257,29 @@ func (c *datagramConn) handleSessionMigration(requestID RequestID) {
250257
// Send another registration response since the session is already active
251258
err = c.SendUDPSessionResponse(requestID, ResponseOk)
252259
if err != nil {
253-
c.logger.Err(err).Msgf("session registration failure: unable to send an additional session registration response")
260+
logger.Err(err).Msgf("flow registration failure: unable to send an additional flow registration response")
254261
return
255262
}
256263
}
257264

258-
func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID) {
265+
func (c *datagramConn) handleSessionRegistrationFailure(requestID RequestID, logger *zerolog.Logger) {
259266
err := c.SendUDPSessionResponse(requestID, ResponseUnableToBindSocket)
260267
if err != nil {
261-
c.logger.Err(err).Msgf("unable to send session registration error response (%d)", ResponseUnableToBindSocket)
268+
logger.Err(err).Msgf("unable to send flow registration error response (%d)", ResponseUnableToBindSocket)
262269
}
263270
}
264271

265272
// Handles incoming datagrams that need to be sent to a registered session.
266-
func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram) {
273+
func (c *datagramConn) handleSessionPayloadDatagram(datagram *UDPSessionPayloadDatagram, logger *zerolog.Logger) {
267274
s, err := c.sessionManager.GetSession(datagram.RequestID)
268275
if err != nil {
269-
c.logger.Err(err).Msgf("unable to find session")
276+
logger.Err(err).Msgf("unable to find flow")
270277
return
271278
}
272279
// We ignore the bytes written to the socket because any partial write must return an error.
273280
_, err = s.Write(datagram.Payload)
274281
if err != nil {
275-
c.logger.Err(err).Msgf("unable to write payload for unavailable session")
282+
logger.Err(err).Msgf("unable to write payload for the flow")
276283
return
277284
}
278285
}

0 commit comments

Comments
 (0)