Skip to content

Commit e3a9a9b

Browse files
committed
Add kafka output
1 parent 47db326 commit e3a9a9b

File tree

4 files changed

+76
-2
lines changed

4 files changed

+76
-2
lines changed

emitter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func Start(stop chan int) {
1515
middleware.ReadFrom(in)
1616
}
1717

18-
// We going only to read responses, so using same ReadFrom method
18+
// We are going only to read responses, so using same ReadFrom method
1919
for _, out := range Plugins.Outputs {
2020
if r, ok := out.(io.Reader); ok {
2121
middleware.ReadFrom(r)

output_kafka.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package main
2+
3+
import (
4+
"github.com/Shopify/sarama"
5+
"log"
6+
"strings"
7+
"time"
8+
)
9+
10+
// KafkaConfig should contains required information to
11+
// build producers.
12+
type KafkaConfig struct {
13+
zookeeper string
14+
topic string
15+
}
16+
17+
// KafkaOutput should make producer client.
18+
type KafkaOutput struct {
19+
address string
20+
config *KafkaConfig
21+
producer sarama.AsyncProducer
22+
}
23+
24+
// NewKafkaOutput creates instance of kafka producer client.
25+
func NewKafkaOutput(address string, config *KafkaConfig) *KafkaOutput {
26+
c := sarama.NewConfig()
27+
c.Producer.RequiredAcks = sarama.WaitForLocal
28+
c.Producer.Compression = sarama.CompressionSnappy
29+
c.Producer.Flush.Frequency = 500 * time.Millisecond
30+
31+
brokerList := strings.Split(config.zookeeper, ",")
32+
33+
producer, err := sarama.NewAsyncProducer(brokerList, c)
34+
if err != nil {
35+
log.Fatalln("Failed to start Sarama(Kafka) producer:", err)
36+
}
37+
38+
o := &KafkaOutput{
39+
address: address,
40+
config: config,
41+
producer: producer,
42+
}
43+
44+
// Start infinite loop for tracking errors for kafka producer.
45+
go o.ErrorHandler()
46+
47+
return o
48+
}
49+
50+
// ErrorHandler should receive errors
51+
func (o *KafkaOutput) ErrorHandler() {
52+
for err := range o.producer.Errors() {
53+
log.Println("Failed to write access log entry:", err)
54+
}
55+
}
56+
57+
func (o *KafkaOutput) Write(data []byte) (n int, err error) {
58+
buf := make(sarama.ByteEncoder, len(data))
59+
copy(buf, data)
60+
61+
o.producer.Input() <- &sarama.ProducerMessage{
62+
Topic: o.config.topic,
63+
Key: sarama.StringEncoder(o.address),
64+
Value: buf,
65+
}
66+
67+
return len(data), nil
68+
}

plugins.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,6 @@ func InitPlugins() {
142142
for _, options := range Settings.outputHTTP {
143143
registerPlugin(NewHTTPOutput, options, &Settings.outputHTTPConfig)
144144
}
145+
146+
registerPlugin(NewKafkaOutput, &Settings.outputKafkaConfig)
145147
}

settings.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ type AppSettings struct {
5858

5959
outputHTTPConfig HTTPOutputConfig
6060
modifierConfig HTTPModifierConfig
61+
62+
outputKafkaConfig KafkaConfig
6163
}
6264

6365
// Settings holds Gor configuration
@@ -118,13 +120,15 @@ func init() {
118120
flag.IntVar(&Settings.outputHTTPConfig.BufferSize, "output-http-response-buffer", 0, "HTTP response buffer size, all data after this size will be discarded.")
119121
flag.IntVar(&Settings.outputHTTPConfig.workers, "output-http-workers", 0, "Gor uses dynamic worker scaling by default. Enter a number to run a set number of workers.")
120122
flag.IntVar(&Settings.outputHTTPConfig.redirectLimit, "output-http-redirects", 0, "Enable how often redirects should be followed.")
121-
flag.DurationVar(&Settings.outputHTTPConfig.Timeout, "output-http-timeout", 5 * time.Second, "Specify HTTP request/response timeout. By default 5s. Example: --output-http-timeout 30s")
123+
flag.DurationVar(&Settings.outputHTTPConfig.Timeout, "output-http-timeout", 5*time.Second, "Specify HTTP request/response timeout. By default 5s. Example: --output-http-timeout 30s")
122124

123125
flag.BoolVar(&Settings.outputHTTPConfig.stats, "output-http-stats", false, "Report http output queue stats to console every 5 seconds.")
124126
flag.BoolVar(&Settings.outputHTTPConfig.OriginalHost, "http-original-host", false, "Normally gor replaces the Host http header with the host supplied with --output-http. This option disables that behavior, preserving the original Host header.")
125127
flag.BoolVar(&Settings.outputHTTPConfig.Debug, "output-http-debug", false, "Enables http debug output.")
126128

127129
flag.StringVar(&Settings.outputHTTPConfig.elasticSearch, "output-http-elasticsearch", "", "Send request and response stats to ElasticSearch:\n\tgor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'")
130+
flag.StringVar(&Settings.outputKafkaConfig.zookeeper, "output-kafka-zookeeper", "", "Send request and response stats to Kafka:\n\tgor --input-raw :8080 --output-kafka-zookeeper '192.168.0.1:2181,192.168.0.2:2181'")
131+
flag.StringVar(&Settings.outputKafkaConfig.topic, "output-kafka-topic", "", "Send request and response stats to Kafka:\n\tgor --input-raw :8080 --output-kafka-topic 'kafka-log'")
128132

129133
flag.Var(&Settings.modifierConfig.headers, "http-set-header", "Inject additional headers to http reqest:\n\tgor --input-raw :8080 --output-http staging.com --http-set-header 'User-Agent: Gor'")
130134
flag.Var(&Settings.modifierConfig.headers, "output-http-header", "WARNING: `--output-http-header` DEPRECATED, use `--http-set-header` instead")

0 commit comments

Comments
 (0)