Skip to content

Commit bfda3e0

Browse files
committed
Fix race in a03526d
Modify packets before adding them to NACK buffer. WARNING: DATA RACE Read at 0x00c00056d456 by goroutine 10741: github.com/pion/interceptor/pkg/nack.(*sendBuffer).get() /home/runner/go/pkg/mod/github.com/pion/[email protected]/pkg/nack/send_buffer.go:95 +0x1e4 github.com/pion/interceptor/pkg/nack.(*ResponderInterceptor).resendPackets.func1() /home/runner/go/pkg/mod/github.com/pion/[email protected]/pkg/nack/responder_interceptor.go:153 +0x70 github.com/pion/rtcp.(*NackPair).Range() /home/runner/go/pkg/mod/github.com/pion/[email protected]/transport_layer_nack.go:65 +0x43 github.com/pion/interceptor/pkg/nack.(*ResponderInterceptor).resendPackets() /home/runner/go/pkg/mod/github.com/pion/[email protected]/pkg/nack/responder_interceptor.go:152 +0x124 github.com/pion/interceptor/pkg/nack.(*ResponderInterceptor).BindRTCPReader.func1.gowrap1() /home/runner/go/pkg/mod/github.com/pion/[email protected]/pkg/nack/responder_interceptor.go:100 +0x44 Previous write at 0x00c00056d456 by goroutine 10735: github.com/pion/interceptor/pkg/nack.(*ResponderInterceptor).resendPackets.func1() /home/runner/go/pkg/mod/github.com/pion/[email protected]/pkg/nack/responder_interceptor.go:186 +0x684 github.com/pion/rtcp.(*NackPair).Range() /home/runner/go/pkg/mod/github.com/pion/[email protected]/transport_layer_nack.go:65 +0x43 github.com/pion/interceptor/pkg/nack.(*ResponderInterceptor).resendPackets() /home/runner/go/pkg/mod/github.com/pion/[email protected]/pkg/nack/responder_interceptor.go:152 +0x124 github.com/pion/interceptor/pkg/nack.(*ResponderInterceptor).BindRTCPReader.func1.gowrap1() /home/runner/go/pkg/mod/github.com/pion/[email protected]/pkg/nack/responder_interceptor.go:100 +0x44
1 parent a03526d commit bfda3e0

File tree

4 files changed

+51
-65
lines changed

4 files changed

+51
-65
lines changed

pkg/nack/responder_interceptor.go

Lines changed: 6 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package nack
55

66
import (
7-
"encoding/binary"
87
"sync"
98

109
"github.com/pion/interceptor"
@@ -19,7 +18,7 @@ type ResponderInterceptorFactory struct {
1918
}
2019

2120
type packetFactory interface {
22-
NewPacket(header *rtp.Header, payload []byte) (*retainablePacket, error)
21+
NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*retainablePacket, error)
2322
}
2423

2524
// NewInterceptor constructs a new ResponderInterceptor
@@ -63,11 +62,6 @@ type ResponderInterceptor struct {
6362
type localStream struct {
6463
sendBuffer *sendBuffer
6564
rtpWriter interceptor.RTPWriter
66-
67-
// Non-zero if Retransmissions should be sent on a distinct stream
68-
rtxSsrc uint32
69-
rtxPayloadType uint8
70-
rtxSequencer rtp.Sequencer
7165
}
7266

7367
// NewResponderInterceptor returns a new ResponderInterceptorFactor
@@ -115,16 +109,13 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
115109
sendBuffer, _ := newSendBuffer(n.size)
116110
n.streamsMu.Lock()
117111
n.streams[info.SSRC] = &localStream{
118-
sendBuffer: sendBuffer,
119-
rtpWriter: writer,
120-
rtxSsrc: info.SSRCRetransmission,
121-
rtxPayloadType: info.PayloadTypeRetransmission,
122-
rtxSequencer: rtp.NewRandomSequencer(),
112+
sendBuffer: sendBuffer,
113+
rtpWriter: writer,
123114
}
124115
n.streamsMu.Unlock()
125116

126117
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
127-
pkt, err := n.packetFactory.NewPacket(header, payload)
118+
pkt, err := n.packetFactory.NewPacket(header, payload, info.SSRCRetransmission, info.PayloadTypeRetransmission)
128119
if err != nil {
129120
return 0, err
130121
}
@@ -151,43 +142,8 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
151142
for i := range nack.Nacks {
152143
nack.Nacks[i].Range(func(seq uint16) bool {
153144
if p := stream.sendBuffer.get(seq); p != nil {
154-
if stream.rtxSsrc != 0 {
155-
// Store the original sequence number and rewrite the sequence number.
156-
originalSequenceNumber := p.Header().SequenceNumber
157-
p.Header().SequenceNumber = stream.rtxSequencer.NextSequenceNumber()
158-
159-
// Rewrite the SSRC.
160-
p.Header().SSRC = stream.rtxSsrc
161-
// Rewrite the payload type.
162-
p.Header().PayloadType = stream.rtxPayloadType
163-
164-
// Remove padding if present.
165-
paddingLength := 0
166-
originPayload := p.Payload()
167-
if p.Header().Padding {
168-
paddingLength = int(originPayload[len(originPayload)-1])
169-
p.Header().Padding = false
170-
}
171-
172-
// Write the original sequence number at the beginning of the payload.
173-
payload := make([]byte, 2)
174-
binary.BigEndian.PutUint16(payload, originalSequenceNumber)
175-
payload = append(payload, originPayload[:len(originPayload)-paddingLength]...)
176-
177-
// Send RTX packet.
178-
if _, err := stream.rtpWriter.Write(p.Header(), payload, interceptor.Attributes{}); err != nil {
179-
n.log.Warnf("failed sending rtx packet: %+v", err)
180-
}
181-
182-
// Resore the Padding and SSRC.
183-
if paddingLength > 0 {
184-
p.Header().Padding = true
185-
}
186-
p.Header().SequenceNumber = originalSequenceNumber
187-
} else {
188-
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
189-
n.log.Warnf("failed resending nacked packet: %+v", err)
190-
}
145+
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
146+
n.log.Warnf("failed resending nacked packet: %+v", err)
191147
}
192148
p.Release()
193149
}

pkg/nack/retainable_packet.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package nack
55

66
import (
7+
"encoding/binary"
78
"io"
89
"sync"
910

@@ -13,8 +14,9 @@ import (
1314
const maxPayloadLen = 1460
1415

1516
type packetManager struct {
16-
headerPool *sync.Pool
17-
payloadPool *sync.Pool
17+
headerPool *sync.Pool
18+
payloadPool *sync.Pool
19+
rtxSequencer rtp.Sequencer
1820
}
1921

2022
func newPacketManager() *packetManager {
@@ -30,16 +32,18 @@ func newPacketManager() *packetManager {
3032
return &buf
3133
},
3234
},
35+
rtxSequencer: rtp.NewRandomSequencer(),
3336
}
3437
}
3538

36-
func (m *packetManager) NewPacket(header *rtp.Header, payload []byte) (*retainablePacket, error) {
39+
func (m *packetManager) NewPacket(header *rtp.Header, payload []byte, rtxSsrc uint32, rtxPayloadType uint8) (*retainablePacket, error) {
3740
if len(payload) > maxPayloadLen {
3841
return nil, io.ErrShortBuffer
3942
}
4043

4144
p := &retainablePacket{
42-
onRelease: m.releasePacket,
45+
onRelease: m.releasePacket,
46+
sequenceNumber: header.SequenceNumber,
4347
// new packets have retain count of 1
4448
count: 1,
4549
}
@@ -62,6 +66,29 @@ func (m *packetManager) NewPacket(header *rtp.Header, payload []byte) (*retainab
6266
p.payload = (*p.buffer)[:size]
6367
}
6468

69+
if rtxSsrc != 0 && rtxPayloadType != 0 {
70+
// Store the original sequence number and rewrite the sequence number.
71+
originalSequenceNumber := p.header.SequenceNumber
72+
p.header.SequenceNumber = m.rtxSequencer.NextSequenceNumber()
73+
74+
// Rewrite the SSRC.
75+
p.header.SSRC = rtxSsrc
76+
// Rewrite the payload type.
77+
p.header.PayloadType = rtxPayloadType
78+
79+
// Remove padding if present.
80+
paddingLength := 0
81+
if p.header.Padding {
82+
paddingLength = int(p.payload[len(p.payload)-1])
83+
p.header.Padding = false
84+
}
85+
86+
// Write the original sequence number at the beginning of the payload.
87+
payload := make([]byte, 2)
88+
binary.BigEndian.PutUint16(payload, originalSequenceNumber)
89+
p.payload = append(payload, p.payload[:len(p.payload)-paddingLength]...)
90+
}
91+
6592
return p, nil
6693
}
6794

@@ -74,12 +101,13 @@ func (m *packetManager) releasePacket(header *rtp.Header, payload *[]byte) {
74101

75102
type noOpPacketFactory struct{}
76103

77-
func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte) (*retainablePacket, error) {
104+
func (f *noOpPacketFactory) NewPacket(header *rtp.Header, payload []byte, _ uint32, _ uint8) (*retainablePacket, error) {
78105
return &retainablePacket{
79-
onRelease: f.releasePacket,
80-
count: 1,
81-
header: header,
82-
payload: payload,
106+
onRelease: f.releasePacket,
107+
count: 1,
108+
header: header,
109+
payload: payload,
110+
sequenceNumber: header.SequenceNumber,
83111
}, nil
84112
}
85113

@@ -96,6 +124,8 @@ type retainablePacket struct {
96124
header *rtp.Header
97125
buffer *[]byte
98126
payload []byte
127+
128+
sequenceNumber uint16
99129
}
100130

101131
func (p *retainablePacket) Header() *rtp.Header {

pkg/nack/send_buffer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (s *sendBuffer) add(packet *retainablePacket) {
4646
s.m.Lock()
4747
defer s.m.Unlock()
4848

49-
seq := packet.Header().SequenceNumber
49+
seq := packet.sequenceNumber
5050
if !s.started {
5151
s.packets[seq%s.size] = packet
5252
s.lastAdded = seq
@@ -92,7 +92,7 @@ func (s *sendBuffer) get(seq uint16) *retainablePacket {
9292

9393
pkt := s.packets[seq%s.size]
9494
if pkt != nil {
95-
if pkt.Header().SequenceNumber != seq {
95+
if pkt.sequenceNumber != seq {
9696
return nil
9797
}
9898
// already released

pkg/nack/send_buffer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestSendBuffer(t *testing.T) {
2121
add := func(nums ...uint16) {
2222
for _, n := range nums {
2323
seq := start + n
24-
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: seq}, nil)
24+
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: seq}, nil, 0, 0)
2525
require.NoError(t, err)
2626
sb.add(pkt)
2727
}
@@ -78,7 +78,7 @@ func TestSendBuffer_Overridden(t *testing.T) {
7878
require.Equal(t, uint16(1), sb.size)
7979

8080
originalBytes := []byte("originalContent")
81-
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: 1}, originalBytes)
81+
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: 1}, originalBytes, 0, 0)
8282
require.NoError(t, err)
8383
sb.add(pkt)
8484

@@ -91,7 +91,7 @@ func TestSendBuffer_Overridden(t *testing.T) {
9191
require.Equal(t, 1, retrieved.count)
9292

9393
// ensure original packet is released
94-
pkt, err = pm.NewPacket(&rtp.Header{SequenceNumber: 2}, originalBytes)
94+
pkt, err = pm.NewPacket(&rtp.Header{SequenceNumber: 2}, originalBytes, 0, 0)
9595
require.NoError(t, err)
9696
sb.add(pkt)
9797
require.Equal(t, 0, retrieved.count)
@@ -113,7 +113,7 @@ func TestSendBuffer_Race(t *testing.T) {
113113
add := func(nums ...uint16) {
114114
for _, n := range nums {
115115
seq := start + n
116-
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: seq}, nil)
116+
pkt, err := pm.NewPacket(&rtp.Header{SequenceNumber: seq}, nil, 0, 0)
117117
require.NoError(t, err)
118118
sb.add(pkt)
119119
}

0 commit comments

Comments
 (0)