Skip to content

Commit 99e6fdf

Browse files
authored
output-ws (#1136)
* add websocket output * add tests * update mod
1 parent 23825d8 commit 99e6fdf

File tree

6 files changed

+237
-2
lines changed

6 files changed

+237
-2
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ require (
77
github.com/aws/aws-sdk-go v1.33.2
88
github.com/coocood/freecache v1.2.0
99
github.com/google/gopacket v1.1.20-0.20210429153827-3eaba0894325
10+
github.com/gorilla/websocket v1.4.2
11+
github.com/klauspost/compress v1.11.13 // indirect
1012
github.com/mattbaird/elastigo v0.0.0-20170123220020-2fe47fd29e4b
1113
github.com/stretchr/testify v1.7.0
1214
github.com/xdg-go/scram v1.1.1
@@ -36,7 +38,6 @@ require (
3638
github.com/jcmturner/gofork v1.0.0 // indirect
3739
github.com/jmespath/go-jmespath v0.3.0 // indirect
3840
github.com/json-iterator/go v1.1.12 // indirect
39-
github.com/klauspost/compress v1.9.8 // indirect
4041
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4142
github.com/modern-go/reflect2 v1.0.2 // indirect
4243
github.com/pierrec/lz4 v2.4.1+incompatible // indirect

go.sum

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy
197197
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
198198
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
199199
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
200+
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
200201
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
201202
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
202203
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
@@ -220,8 +221,9 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
220221
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
221222
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
222223
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
223-
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
224224
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
225+
github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4=
226+
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
225227
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
226228
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
227229
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=

output_ws.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"encoding/base64"
7+
"fmt"
8+
"github.com/gorilla/websocket"
9+
"hash/fnv"
10+
"log"
11+
"net/http"
12+
"net/url"
13+
"strings"
14+
"time"
15+
)
16+
17+
// WebSocketOutput used for sending raw tcp payloads
18+
// Can be used for transferring binary payloads like protocol buffers
19+
type WebSocketOutput struct {
20+
address string
21+
limit int
22+
buf []chan *Message
23+
bufStats *GorStat
24+
config *WebSocketOutputConfig
25+
workerIndex uint32
26+
headers http.Header
27+
28+
close bool
29+
}
30+
31+
// WebSocketOutputConfig WebSocket output configuration
32+
type WebSocketOutputConfig struct {
33+
Sticky bool `json:"output-ws-sticky"`
34+
SkipVerify bool `json:"output-ws-skip-verify"`
35+
Workers int `json:"output-ws-workers"`
36+
}
37+
38+
// NewWebSocketOutput constructor for WebSocketOutput
39+
// Initialize X workers which hold keep-alive connection
40+
func NewWebSocketOutput(address string, config *WebSocketOutputConfig) PluginWriter {
41+
o := new(WebSocketOutput)
42+
43+
u, err := url.Parse(address)
44+
if err != nil {
45+
log.Fatal(fmt.Sprintf("[OUTPUT-WS] parse WS output URL error[%q]", err))
46+
}
47+
48+
o.config = config
49+
o.headers = http.Header{
50+
"Authorization": []string{"Basic " + base64.StdEncoding.EncodeToString([]byte(u.User.String()))},
51+
}
52+
53+
u.User = nil // must be after creating the headers
54+
o.address = u.String()
55+
56+
if Settings.OutputWebSocketStats {
57+
o.bufStats = NewGorStat("output_ws", 5000)
58+
}
59+
60+
// create X buffers and send the buffer index to the worker
61+
o.buf = make([]chan *Message, o.config.Workers)
62+
for i := 0; i < o.config.Workers; i++ {
63+
o.buf[i] = make(chan *Message, 100)
64+
go o.worker(i)
65+
}
66+
67+
return o
68+
}
69+
70+
func (o *WebSocketOutput) worker(bufferIndex int) {
71+
retries := 0
72+
conn, err := o.connect(o.address)
73+
for {
74+
if o.close {
75+
return
76+
}
77+
78+
if err == nil {
79+
break
80+
}
81+
82+
Debug(1, fmt.Sprintf("Can't connect to aggregator instance, reconnecting in 1 second. Retries:%d", retries))
83+
time.Sleep(1 * time.Second)
84+
85+
conn, err = o.connect(o.address)
86+
retries++
87+
}
88+
89+
if retries > 0 {
90+
Debug(2, fmt.Sprintf("Connected to aggregator instance after %d retries", retries))
91+
}
92+
93+
defer conn.Close()
94+
95+
for {
96+
msg := <-o.buf[bufferIndex]
97+
err = conn.WriteMessage(websocket.BinaryMessage, append(msg.Meta, msg.Data...))
98+
if err != nil {
99+
Debug(2, "INFO: WebSocket output connection closed, reconnecting "+err.Error())
100+
o.buf[bufferIndex] <- msg
101+
go o.worker(bufferIndex)
102+
break
103+
}
104+
}
105+
}
106+
107+
func (o *WebSocketOutput) getBufferIndex(msg *Message) int {
108+
if !o.config.Sticky {
109+
o.workerIndex++
110+
return int(o.workerIndex) % o.config.Workers
111+
}
112+
113+
hasher := fnv.New32a()
114+
hasher.Write(payloadID(msg.Meta))
115+
return int(hasher.Sum32()) % o.config.Workers
116+
}
117+
118+
// PluginWrite writes message to this plugin
119+
func (o *WebSocketOutput) PluginWrite(msg *Message) (n int, err error) {
120+
if !isOriginPayload(msg.Meta) {
121+
return len(msg.Data), nil
122+
}
123+
124+
bufferIndex := o.getBufferIndex(msg)
125+
o.buf[bufferIndex] <- msg
126+
127+
if Settings.OutputTCPStats {
128+
o.bufStats.Write(len(o.buf[bufferIndex]))
129+
}
130+
131+
return len(msg.Data) + len(msg.Meta), nil
132+
}
133+
134+
func (o *WebSocketOutput) connect(address string) (conn *websocket.Conn, err error) {
135+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
136+
defer cancel()
137+
138+
d := websocket.DefaultDialer
139+
if strings.HasPrefix(address, "wss://") {
140+
d.TLSClientConfig = &tls.Config{InsecureSkipVerify: o.config.SkipVerify}
141+
}
142+
143+
conn, _, err = d.DialContext(ctx, address, o.headers)
144+
return
145+
}
146+
147+
func (o *WebSocketOutput) String() string {
148+
return fmt.Sprintf("WebSocket output %s, limit: %d", o.address, o.limit)
149+
}
150+
151+
// Close closes the output
152+
func (o *WebSocketOutput) Close() {
153+
o.close = true
154+
}

output_ws_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package main
2+
3+
import (
4+
"github.com/gorilla/websocket"
5+
"log"
6+
"net/http"
7+
"sync"
8+
"testing"
9+
)
10+
11+
func TestWebSocketOutput(t *testing.T) {
12+
wg := new(sync.WaitGroup)
13+
14+
wsAddr := startWebsocket(func(data []byte) {
15+
wg.Done()
16+
})
17+
input := NewTestInput()
18+
output := NewWebSocketOutput(wsAddr, &WebSocketOutputConfig{Workers: 1})
19+
20+
plugins := &InOutPlugins{
21+
Inputs: []PluginReader{input},
22+
Outputs: []PluginWriter{output},
23+
}
24+
25+
emitter := NewEmitter()
26+
go emitter.Start(plugins, Settings.Middleware)
27+
28+
for i := 0; i < 10; i++ {
29+
wg.Add(1)
30+
input.EmitGET()
31+
}
32+
33+
wg.Wait()
34+
emitter.Close()
35+
}
36+
37+
func startWebsocket(cb func([]byte)) string {
38+
upgrader := websocket.Upgrader{}
39+
40+
http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
41+
c, err := upgrader.Upgrade(w, r, nil)
42+
if err != nil {
43+
log.Print("upgrade:", err)
44+
return
45+
}
46+
47+
go func(conn *websocket.Conn) {
48+
defer conn.Close()
49+
for {
50+
_, msg, _ := conn.ReadMessage()
51+
cb(msg)
52+
}
53+
}(c)
54+
})
55+
56+
go func() {
57+
err := http.ListenAndServe("localhost:8081", nil)
58+
if err != nil {
59+
log.Fatal("Can't start:", err)
60+
}
61+
}()
62+
63+
return "ws://localhost:8081/test"
64+
}

plugins.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ func NewPlugins() *InOutPlugins {
117117
plugins.registerPlugin(NewTCPOutput, options, &Settings.OutputTCPConfig)
118118
}
119119

120+
for _, options := range Settings.OutputWebSocket {
121+
plugins.registerPlugin(NewWebSocketOutput, options, &Settings.OutputWebSocketConfig)
122+
}
123+
120124
for _, options := range Settings.InputFile {
121125
plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth, Settings.InputFileMaxWait, Settings.InputFileDryRun)
122126
}

settings.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ type AppSettings struct {
8383
OutputTCPConfig TCPOutputConfig
8484
OutputTCPStats bool `json:"output-tcp-stats"`
8585

86+
OutputWebSocket []string `json:"output-ws"`
87+
OutputWebSocketConfig WebSocketOutputConfig
88+
OutputWebSocketStats bool `json:"output-ws-stats"`
89+
8690
InputFile []string `json:"input-file"`
8791
InputFileLoop bool `json:"input-file-loop"`
8892
InputFileReadDepth int `json:"input-file-read-depth"`
@@ -152,6 +156,12 @@ func init() {
152156
flag.IntVar(&Settings.OutputTCPConfig.Workers, "output-tcp-workers", 10, "Number of parallel tcp connections, default is 10")
153157
flag.BoolVar(&Settings.OutputTCPStats, "output-tcp-stats", false, "Report TCP output queue stats to console every 5 seconds.")
154158

159+
flag.Var(&MultiOption{&Settings.OutputWebSocket}, "output-ws", "Just like output tcp, just with WebSocket. Example: \n\t# Listen for requests on 80 port and forward them to other Gor instance on 28020 port\n\tgor --input-raw :80 --output-ws wss://replay.local:28020/endpoint")
160+
flag.BoolVar(&Settings.OutputWebSocketConfig.SkipVerify, "output-ws-skip-verify", false, "Don't verify hostname on TLS secure connection.")
161+
flag.BoolVar(&Settings.OutputWebSocketConfig.Sticky, "output-ws-sticky", false, "Use Sticky connection. Request/Response with same ID will be sent to the same connection.")
162+
flag.IntVar(&Settings.OutputWebSocketConfig.Workers, "output-ws-workers", 10, "Number of parallel ws connections, default is 10")
163+
flag.BoolVar(&Settings.OutputWebSocketStats, "output-ws-stats", false, "Report WebSocket output queue stats to console every 5 seconds.")
164+
155165
flag.Var(&MultiOption{&Settings.InputFile}, "input-file", "Read requests from file: \n\tgor --input-file ./requests.gor --output-http staging.com")
156166
flag.BoolVar(&Settings.InputFileLoop, "input-file-loop", false, "Loop input files, useful for performance testing.")
157167
flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance")

0 commit comments

Comments
 (0)