Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f77a957

Browse files
committedMay 28, 2025·
Refactor random number generation in tests for improved security and clarity
1 parent 251e45a commit f77a957

File tree

3 files changed

+221
-29
lines changed

3 files changed

+221
-29
lines changed
 

‎input_file.go

Lines changed: 145 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -202,42 +202,156 @@ func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReade
202202

203203
// FileInput can read requests generated by FileOutput
204204
type FileInput struct {
205-
mu sync.Mutex
206-
data chan []byte
207-
exit chan bool
208-
path string
209-
readers []*fileInputReader
210-
speedFactor float64
211-
loop bool
212-
readDepth int
213-
dryRun bool
214-
maxWait time.Duration
205+
mu sync.Mutex
206+
data chan []byte
207+
exit chan bool
208+
path string
209+
readers []*fileInputReader
210+
processedFiles map[string]bool
211+
speedFactor float64
212+
loop bool
213+
readDepth int
214+
dryRun bool
215+
maxWait time.Duration
216+
watchInterval time.Duration
217+
watching bool
215218

216219
stats *expvar.Map
217220
}
218221

222+
// FileInputConfig configuration for the FileInput
223+
type FileInputConfig struct {
224+
Loop bool
225+
ReadDepth int
226+
MaxWait time.Duration
227+
DryRun bool
228+
WatchNewFiles bool // Whether to watch for new files matching the pattern
229+
WatchInterval time.Duration // Interval to check for new files
230+
}
231+
219232
// NewFileInput constructor for FileInput. Accepts file path as argument.
220233
func NewFileInput(path string, loop bool, readDepth int, maxWait time.Duration, dryRun bool) (i *FileInput) {
234+
config := &FileInputConfig{
235+
Loop: loop,
236+
ReadDepth: readDepth,
237+
MaxWait: maxWait,
238+
DryRun: dryRun,
239+
WatchNewFiles: Settings.InputFileWatch,
240+
WatchInterval: Settings.InputFileWatchInterval,
241+
}
242+
return NewFileInputWithConfig(path, config)
243+
}
244+
245+
// NewFileInputWithConfig constructor for FileInput with detailed configuration.
246+
func NewFileInputWithConfig(path string, config *FileInputConfig) (i *FileInput) {
221247
i = new(FileInput)
222248
i.data = make(chan []byte, 1000)
223249
i.exit = make(chan bool)
224250
i.path = path
225251
i.speedFactor = 1
226-
i.loop = loop
227-
i.readDepth = readDepth
252+
i.loop = config.Loop
253+
i.readDepth = config.ReadDepth
228254
i.stats = expvar.NewMap("file-" + path)
229-
i.dryRun = dryRun
230-
i.maxWait = maxWait
255+
i.dryRun = config.DryRun
256+
i.maxWait = config.MaxWait
257+
i.processedFiles = make(map[string]bool)
258+
i.watching = config.WatchNewFiles
259+
260+
if config.WatchInterval > 0 {
261+
i.watchInterval = config.WatchInterval
262+
} else {
263+
i.watchInterval = 5 * time.Second
264+
}
231265

232266
if err := i.init(); err != nil {
233267
return
234268
}
235269

236270
go i.emit()
271+
272+
if i.watching {
273+
go i.watchForNewFiles()
274+
}
237275

238276
return
239277
}
240278

279+
// watchForNewFiles periodically checks for new files matching the path pattern
280+
func (i *FileInput) watchForNewFiles() {
281+
ticker := time.NewTicker(i.watchInterval)
282+
defer ticker.Stop()
283+
284+
for {
285+
select {
286+
case <-i.exit:
287+
return
288+
case <-ticker.C:
289+
i.checkForNewFiles()
290+
}
291+
}
292+
}
293+
294+
// checkForNewFiles looks for new files that match the pattern and adds them to readers
295+
func (i *FileInput) checkForNewFiles() {
296+
defer i.mu.Unlock()
297+
i.mu.Lock()
298+
299+
var matches []string
300+
var err error
301+
302+
if strings.HasPrefix(i.path, "s3://") {
303+
sess := session.Must(session.NewSession(awsConfig()))
304+
svc := s3.New(sess)
305+
306+
bucket, key := parseS3Url(i.path)
307+
308+
params := &s3.ListObjectsInput{
309+
Bucket: aws.String(bucket),
310+
Prefix: aws.String(key),
311+
}
312+
313+
resp, err := svc.ListObjects(params)
314+
if err != nil {
315+
Debug(2, "[INPUT-FILE] Error while retrieving list of files from S3", i.path, err)
316+
return
317+
}
318+
319+
for _, c := range resp.Contents {
320+
path := "s3://" + bucket + "/" + (*c.Key)
321+
matches = append(matches, path)
322+
}
323+
} else if matches, err = filepath.Glob(i.path); err != nil {
324+
Debug(2, "[INPUT-FILE] Wrong file pattern", i.path, err)
325+
return
326+
}
327+
328+
if len(matches) == 0 {
329+
return
330+
}
331+
332+
newFilesFound := false
333+
334+
// Check for new files that haven't been processed yet
335+
for _, path := range matches {
336+
if i.processedFiles[path] {
337+
continue
338+
}
339+
340+
Debug(2, fmt.Sprintf("[INPUT-FILE] Found new file: %s", path))
341+
reader := newFileInputReader(path, i.readDepth, i.dryRun)
342+
if reader != nil {
343+
i.readers = append(i.readers, reader)
344+
i.processedFiles[path] = true
345+
newFilesFound = true
346+
}
347+
}
348+
349+
if newFilesFound {
350+
i.stats.Add("reader_count", int64(len(i.readers)))
351+
i.stats.Add("new_files_found", 1)
352+
}
353+
}
354+
241355
func parseS3Url(path string) (bucket, key string) {
242356
path = path[5:] // stripping `s3://`
243357
sep := strings.IndexByte(path, '/')
@@ -272,7 +386,9 @@ func (i *FileInput) init() (err error) {
272386
}
273387

274388
for _, c := range resp.Contents {
275-
matches = append(matches, "s3://"+bucket+"/"+(*c.Key))
389+
path := "s3://" + bucket + "/" + (*c.Key)
390+
matches = append(matches, path)
391+
i.processedFiles[path] = true
276392
}
277393
} else if matches, err = filepath.Glob(i.path); err != nil {
278394
Debug(2, "[INPUT-FILE] Wrong file pattern", i.path, err)
@@ -288,6 +404,7 @@ func (i *FileInput) init() (err error) {
288404

289405
for idx, p := range matches {
290406
i.readers[idx] = newFileInputReader(p, i.readDepth, i.dryRun)
407+
i.processedFiles[p] = true
291408
}
292409

293410
i.stats.Add("reader_count", int64(len(matches)))
@@ -341,6 +458,7 @@ func (i *FileInput) emit() {
341458
minWait = math.MaxInt64
342459

343460
i.stats.Add("negative_wait", 0)
461+
i.stats.Add("watch_pauses", 0)
344462

345463
for {
346464
select {
@@ -356,7 +474,14 @@ func (i *FileInput) emit() {
356474
i.init()
357475
lastTime = -1
358476
continue
477+
} else if i.watching {
478+
// When watching for new files, we just wait and continue
479+
i.stats.Add("watch_pauses", 1)
480+
Debug(2, fmt.Sprintf("[INPUT-FILE] No active readers, waiting for new files matching pattern '%s'", i.path))
481+
time.Sleep(i.watchInterval)
482+
continue
359483
} else {
484+
// If not watching, we break out and exit
360485
break
361486
}
362487
}
@@ -420,7 +545,11 @@ func (i *FileInput) emit() {
420545
i.stats.Set("max_wait", time.Duration(maxWait))
421546
i.stats.Set("min_wait", time.Duration(minWait))
422547

423-
Debug(2, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))
548+
if i.watching {
549+
Debug(2, fmt.Sprintf("[INPUT-FILE] No more active readers. Will continue watching for new files matching '%s'\n", i.path))
550+
} else {
551+
Debug(2, fmt.Sprintf("[INPUT-FILE] FileInput: end of file '%s'\n", i.path))
552+
}
424553

425554
if i.dryRun {
426555
fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n",

‎input_file_test.go

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,26 @@ package goreplay
22

33
import (
44
"bytes"
5+
cryptoRand "crypto/rand"
56
"errors"
67
"fmt"
78
"io/ioutil"
8-
"math/rand"
9+
"math/big"
910
"os"
1011
"sync"
1112
"testing"
1213
"time"
1314
)
1415

16+
// generateSecureRandomID generates a cryptographically secure random ID for use in tests
17+
func generateSecureRandomID(t *testing.T) int64 {
18+
randomBigInt, err := cryptoRand.Int(cryptoRand.Reader, big.NewInt(1<<62))
19+
if err != nil {
20+
t.Fatalf("Failed to generate secure random number: %v", err)
21+
}
22+
return randomBigInt.Int64()
23+
}
24+
1525
func TestInputFileWithGET(t *testing.T) {
1626
input := NewTestInput()
1727
rg := NewRequestGenerator([]PluginReader{input}, func() { input.EmitGET() }, 1)
@@ -88,7 +98,7 @@ func TestInputFileWithGETAndPOST(t *testing.T) {
8898
}
8999

90100
func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) {
91-
rnd := rand.Int63()
101+
rnd := generateSecureRandomID(t)
92102

93103
file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
94104
file1.Write([]byte("1 1 1\ntest1"))
@@ -118,7 +128,7 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) {
118128
}
119129

120130
func TestInputFileRequestsWithLatency(t *testing.T) {
121-
rnd := rand.Int63()
131+
rnd := generateSecureRandomID(t)
122132

123133
file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
124134
defer file.Close()
@@ -146,7 +156,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) {
146156
}
147157

148158
func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) {
149-
rnd := rand.Int63()
159+
rnd := generateSecureRandomID(t)
150160

151161
file1, _ := os.OpenFile(fmt.Sprintf("/tmp/%d_0", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
152162
file1.Write([]byte("1 1 1\nrequest1"))
@@ -189,7 +199,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) {
189199
}
190200

191201
func TestInputFileLoop(t *testing.T) {
192-
rnd := rand.Int63()
202+
rnd := generateSecureRandomID(t)
193203

194204
file, _ := os.OpenFile(fmt.Sprintf("/tmp/%d", rnd), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
195205
file.Write([]byte("1 1 1\ntest1"))
@@ -210,7 +220,7 @@ func TestInputFileLoop(t *testing.T) {
210220
}
211221

212222
func TestInputFileCompressed(t *testing.T) {
213-
rnd := rand.Int63()
223+
rnd := generateSecureRandomID(t)
214224

215225
output := NewFileOutput(fmt.Sprintf("/tmp/%d_0.gz", rnd), &FileOutputConfig{FlushInterval: time.Minute, Append: true})
216226
for i := 0; i < 1000; i++ {
@@ -235,6 +245,55 @@ func TestInputFileCompressed(t *testing.T) {
235245
os.Remove(name2)
236246
}
237247

248+
func TestInputFileWatchForNewFiles(t *testing.T) {
249+
rnd := generateSecureRandomID(t)
250+
basePath := fmt.Sprintf("/tmp/%d", rnd)
251+
252+
// Create first file
253+
file1, _ := os.OpenFile(fmt.Sprintf("%s_1", basePath), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
254+
file1.Write([]byte("1 1 1\ntest1"))
255+
file1.Write([]byte(payloadSeparator))
256+
file1.Close()
257+
258+
// Initialize input with watching enabled and short watch interval
259+
config := &FileInputConfig{
260+
Loop: false,
261+
ReadDepth: 100,
262+
MaxWait: 0,
263+
DryRun: false,
264+
WatchNewFiles: true,
265+
WatchInterval: 300 * time.Millisecond, // Faster interval for testing
266+
}
267+
268+
input := NewFileInputWithConfig(fmt.Sprintf("%s_*", basePath), config)
269+
270+
// Read the first message
271+
msg1, err := input.PluginRead()
272+
if err != nil || string(msg1.Data) != "test1" {
273+
t.Error("Should read first file correctly:", err)
274+
}
275+
276+
// Add a second file while input is running
277+
file2, _ := os.OpenFile(fmt.Sprintf("%s_2", basePath), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
278+
file2.Write([]byte("1 1 2\ntest2"))
279+
file2.Write([]byte(payloadSeparator))
280+
file2.Close()
281+
282+
// Wait for file discovery and processing (at least 2 watch intervals)
283+
time.Sleep(700 * time.Millisecond)
284+
285+
// Should be able to read from the newly added file
286+
msg2, err := input.PluginRead()
287+
if err != nil || string(msg2.Data) != "test2" {
288+
t.Error("Should read newly added file correctly:", err)
289+
}
290+
291+
// Clean up
292+
input.Close()
293+
os.Remove(file1.Name())
294+
os.Remove(file2.Name())
295+
}
296+
238297
type CaptureFile struct {
239298
msgs []*Message
240299
file *os.File

‎settings.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,15 @@ type AppSettings struct {
8787
OutputWebSocketConfig WebSocketOutputConfig
8888
OutputWebSocketStats bool `json:"output-ws-stats"`
8989

90-
InputFile []string `json:"input-file"`
91-
InputFileLoop bool `json:"input-file-loop"`
92-
InputFileReadDepth int `json:"input-file-read-depth"`
93-
InputFileDryRun bool `json:"input-file-dry-run"`
94-
InputFileMaxWait time.Duration `json:"input-file-max-wait"`
95-
OutputFile []string `json:"output-file"`
96-
OutputFileConfig FileOutputConfig
90+
InputFile []string `json:"input-file"`
91+
InputFileLoop bool `json:"input-file-loop"`
92+
InputFileReadDepth int `json:"input-file-read-depth"`
93+
InputFileDryRun bool `json:"input-file-dry-run"`
94+
InputFileMaxWait time.Duration `json:"input-file-max-wait"`
95+
InputFileWatch bool `json:"input-file-watch"`
96+
InputFileWatchInterval time.Duration `json:"input-file-watch-interval"`
97+
OutputFile []string `json:"output-file"`
98+
OutputFileConfig FileOutputConfig
9799

98100
InputRAW []string `json:"input_raw"`
99101
InputRAWConfig RAWInputConfig
@@ -167,6 +169,8 @@ func init() {
167169
flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance")
168170
flag.BoolVar(&Settings.InputFileDryRun, "input-file-dry-run", false, "Simulate reading from the data source without replaying it. You will get information about expected replay time, number of found records etc.")
169171
flag.DurationVar(&Settings.InputFileMaxWait, "input-file-max-wait", 0, "Set the maximum time between requests. Can help in situations when you have too long periods between request, and you want to skip them. Example: --input-raw-max-wait 1s")
172+
flag.BoolVar(&Settings.InputFileWatch, "input-file-watch", true, "Watch for new files matching pattern. When turned on, Gor will continue running after processing all existing files, watching for new ones.")
173+
flag.DurationVar(&Settings.InputFileWatchInterval, "input-file-watch-interval", 5*time.Second, "Interval for checking for new files. Example: --input-file-watch-interval 10s")
170174

171175
flag.Var(&MultiOption{&Settings.OutputFile}, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor")
172176
flag.DurationVar(&Settings.OutputFileConfig.FlushInterval, "output-file-flush-interval", time.Second, "Interval for forcing buffer flush to the file, default: 1s.")

0 commit comments

Comments
 (0)
Please sign in to comment.