6 changed files with 614 additions and 90 deletions
@ -0,0 +1,257 @@
@@ -0,0 +1,257 @@
|
||||
package main |
||||
|
||||
import ( |
||||
"bufio" |
||||
"encoding/json" |
||||
"fmt" |
||||
"math" |
||||
"math/rand" |
||||
"os" |
||||
"path/filepath" |
||||
"time" |
||||
|
||||
"next.orly.dev/pkg/encoders/event" |
||||
"next.orly.dev/pkg/encoders/tag" |
||||
"next.orly.dev/pkg/encoders/timestamp" |
||||
"next.orly.dev/pkg/interfaces/signer/p8k" |
||||
) |
||||
|
||||
// EventStream manages disk-based event generation to avoid memory bloat
|
||||
type EventStream struct { |
||||
baseDir string |
||||
count int |
||||
chunkSize int |
||||
rng *rand.Rand |
||||
} |
||||
|
||||
// NewEventStream creates a new event stream that stores events on disk
|
||||
func NewEventStream(baseDir string, count int) (*EventStream, error) { |
||||
// Create events directory
|
||||
eventsDir := filepath.Join(baseDir, "events") |
||||
if err := os.MkdirAll(eventsDir, 0755); err != nil { |
||||
return nil, fmt.Errorf("failed to create events directory: %w", err) |
||||
} |
||||
|
||||
return &EventStream{ |
||||
baseDir: eventsDir, |
||||
count: count, |
||||
chunkSize: 1000, // Store 1000 events per file to balance I/O
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())), |
||||
}, nil |
||||
} |
||||
|
||||
// Generate creates all events and stores them in chunk files
|
||||
func (es *EventStream) Generate() error { |
||||
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize |
||||
|
||||
for chunk := 0; chunk < numChunks; chunk++ { |
||||
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) |
||||
f, err := os.Create(chunkFile) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to create chunk file %s: %w", chunkFile, err) |
||||
} |
||||
|
||||
writer := bufio.NewWriter(f) |
||||
startIdx := chunk * es.chunkSize |
||||
endIdx := min(startIdx+es.chunkSize, es.count) |
||||
|
||||
for i := startIdx; i < endIdx; i++ { |
||||
ev, err := es.generateEvent(i) |
||||
if err != nil { |
||||
f.Close() |
||||
return fmt.Errorf("failed to generate event %d: %w", i, err) |
||||
} |
||||
|
||||
// Marshal event to JSON
|
||||
eventJSON, err := json.Marshal(ev) |
||||
if err != nil { |
||||
f.Close() |
||||
return fmt.Errorf("failed to marshal event %d: %w", i, err) |
||||
} |
||||
|
||||
// Write JSON line
|
||||
if _, err := writer.Write(eventJSON); err != nil { |
||||
f.Close() |
||||
return fmt.Errorf("failed to write event %d: %w", i, err) |
||||
} |
||||
if _, err := writer.WriteString("\n"); err != nil { |
||||
f.Close() |
||||
return fmt.Errorf("failed to write newline after event %d: %w", i, err) |
||||
} |
||||
} |
||||
|
||||
if err := writer.Flush(); err != nil { |
||||
f.Close() |
||||
return fmt.Errorf("failed to flush chunk file %s: %w", chunkFile, err) |
||||
} |
||||
|
||||
if err := f.Close(); err != nil { |
||||
return fmt.Errorf("failed to close chunk file %s: %w", chunkFile, err) |
||||
} |
||||
|
||||
if (chunk+1)%10 == 0 || chunk == numChunks-1 { |
||||
fmt.Printf(" Generated %d/%d events (%.1f%%)\n", |
||||
endIdx, es.count, float64(endIdx)/float64(es.count)*100) |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// generateEvent creates a single event with realistic size distribution
|
||||
func (es *EventStream) generateEvent(index int) (*event.E, error) { |
||||
// Create signer for this event
|
||||
keys, err := p8k.New() |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to create signer: %w", err) |
||||
} |
||||
if err := keys.Generate(); err != nil { |
||||
return nil, fmt.Errorf("failed to generate keys: %w", err) |
||||
} |
||||
|
||||
ev := event.New() |
||||
ev.Kind = 1 // Text note
|
||||
ev.CreatedAt = timestamp.Now().I64() |
||||
|
||||
// Add some tags for realism
|
||||
numTags := es.rng.Intn(5) |
||||
tags := make([]*tag.T, 0, numTags) |
||||
for i := 0; i < numTags; i++ { |
||||
tags = append(tags, tag.NewFromBytesSlice( |
||||
[]byte("t"), |
||||
[]byte(fmt.Sprintf("tag%d", es.rng.Intn(100))), |
||||
)) |
||||
} |
||||
ev.Tags = tag.NewS(tags...) |
||||
|
||||
// Generate content with log-distributed size
|
||||
contentSize := es.generateLogDistributedSize() |
||||
ev.Content = []byte(es.generateRandomContent(contentSize)) |
||||
|
||||
// Sign the event
|
||||
if err := ev.Sign(keys); err != nil { |
||||
return nil, fmt.Errorf("failed to sign event: %w", err) |
||||
} |
||||
|
||||
return ev, nil |
||||
} |
||||
|
||||
// generateLogDistributedSize generates sizes following a power law distribution
|
||||
// This creates realistic size distribution:
|
||||
// - Most events are small (< 1KB)
|
||||
// - Some events are medium (1-10KB)
|
||||
// - Few events are large (10-100KB)
|
||||
func (es *EventStream) generateLogDistributedSize() int { |
||||
// Use power law with exponent 4.0 for strong skew toward small sizes
|
||||
const powerExponent = 4.0 |
||||
uniform := es.rng.Float64() |
||||
skewed := math.Pow(uniform, powerExponent) |
||||
|
||||
// Scale to max size of 100KB
|
||||
const maxSize = 100 * 1024 |
||||
size := int(skewed * maxSize) |
||||
|
||||
// Ensure minimum size of 10 bytes
|
||||
if size < 10 { |
||||
size = 10 |
||||
} |
||||
|
||||
return size |
||||
} |
||||
|
||||
// generateRandomContent creates random text content of specified size
|
||||
func (es *EventStream) generateRandomContent(size int) string { |
||||
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 \n" |
||||
content := make([]byte, size) |
||||
for i := range content { |
||||
content[i] = charset[es.rng.Intn(len(charset))] |
||||
} |
||||
return string(content) |
||||
} |
||||
|
||||
// GetEventChannel returns a channel that streams events from disk
|
||||
// bufferSize controls memory usage - larger buffers improve throughput but use more memory
|
||||
func (es *EventStream) GetEventChannel(bufferSize int) (<-chan *event.E, <-chan error) { |
||||
eventChan := make(chan *event.E, bufferSize) |
||||
errChan := make(chan error, 1) |
||||
|
||||
go func() { |
||||
defer close(eventChan) |
||||
defer close(errChan) |
||||
|
||||
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize |
||||
|
||||
for chunk := 0; chunk < numChunks; chunk++ { |
||||
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) |
||||
f, err := os.Open(chunkFile) |
||||
if err != nil { |
||||
errChan <- fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err) |
||||
return |
||||
} |
||||
|
||||
scanner := bufio.NewScanner(f) |
||||
// Increase buffer size for large events
|
||||
buf := make([]byte, 0, 64*1024) |
||||
scanner.Buffer(buf, 1024*1024) // Max 1MB per line
|
||||
|
||||
for scanner.Scan() { |
||||
var ev event.E |
||||
if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil { |
||||
f.Close() |
||||
errChan <- fmt.Errorf("failed to unmarshal event: %w", err) |
||||
return |
||||
} |
||||
eventChan <- &ev |
||||
} |
||||
|
||||
if err := scanner.Err(); err != nil { |
||||
f.Close() |
||||
errChan <- fmt.Errorf("error reading chunk file %s: %w", chunkFile, err) |
||||
return |
||||
} |
||||
|
||||
f.Close() |
||||
} |
||||
}() |
||||
|
||||
return eventChan, errChan |
||||
} |
||||
|
||||
// ForEach iterates over all events without loading them all into memory
|
||||
func (es *EventStream) ForEach(fn func(*event.E) error) error { |
||||
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize |
||||
|
||||
for chunk := 0; chunk < numChunks; chunk++ { |
||||
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) |
||||
f, err := os.Open(chunkFile) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err) |
||||
} |
||||
|
||||
scanner := bufio.NewScanner(f) |
||||
buf := make([]byte, 0, 64*1024) |
||||
scanner.Buffer(buf, 1024*1024) |
||||
|
||||
for scanner.Scan() { |
||||
var ev event.E |
||||
if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil { |
||||
f.Close() |
||||
return fmt.Errorf("failed to unmarshal event: %w", err) |
||||
} |
||||
|
||||
if err := fn(&ev); err != nil { |
||||
f.Close() |
||||
return err |
||||
} |
||||
} |
||||
|
||||
if err := scanner.Err(); err != nil { |
||||
f.Close() |
||||
return fmt.Errorf("error reading chunk file %s: %w", chunkFile, err) |
||||
} |
||||
|
||||
f.Close() |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
@ -0,0 +1,173 @@
@@ -0,0 +1,173 @@
|
||||
package main |
||||
|
||||
import ( |
||||
"bufio" |
||||
"encoding/binary" |
||||
"fmt" |
||||
"os" |
||||
"path/filepath" |
||||
"sort" |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
// LatencyRecorder writes latency measurements to disk to avoid memory bloat
|
||||
type LatencyRecorder struct { |
||||
file *os.File |
||||
writer *bufio.Writer |
||||
mu sync.Mutex |
||||
count int64 |
||||
} |
||||
|
||||
// LatencyStats contains calculated latency statistics
|
||||
type LatencyStats struct { |
||||
Avg time.Duration |
||||
P90 time.Duration |
||||
P95 time.Duration |
||||
P99 time.Duration |
||||
Bottom10 time.Duration |
||||
Count int64 |
||||
} |
||||
|
||||
// NewLatencyRecorder creates a new latency recorder that writes to disk
|
||||
func NewLatencyRecorder(baseDir string, testName string) (*LatencyRecorder, error) { |
||||
latencyFile := filepath.Join(baseDir, fmt.Sprintf("latency_%s.bin", testName)) |
||||
f, err := os.Create(latencyFile) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to create latency file: %w", err) |
||||
} |
||||
|
||||
return &LatencyRecorder{ |
||||
file: f, |
||||
writer: bufio.NewWriter(f), |
||||
count: 0, |
||||
}, nil |
||||
} |
||||
|
||||
// Record writes a latency measurement to disk (8 bytes per measurement)
|
||||
func (lr *LatencyRecorder) Record(latency time.Duration) error { |
||||
lr.mu.Lock() |
||||
defer lr.mu.Unlock() |
||||
|
||||
// Write latency as 8-byte value (int64 nanoseconds)
|
||||
buf := make([]byte, 8) |
||||
binary.LittleEndian.PutUint64(buf, uint64(latency.Nanoseconds())) |
||||
|
||||
if _, err := lr.writer.Write(buf); err != nil { |
||||
return fmt.Errorf("failed to write latency: %w", err) |
||||
} |
||||
|
||||
lr.count++ |
||||
return nil |
||||
} |
||||
|
||||
// Close flushes and closes the latency file
|
||||
func (lr *LatencyRecorder) Close() error { |
||||
lr.mu.Lock() |
||||
defer lr.mu.Unlock() |
||||
|
||||
if err := lr.writer.Flush(); err != nil { |
||||
return fmt.Errorf("failed to flush latency file: %w", err) |
||||
} |
||||
|
||||
if err := lr.file.Close(); err != nil { |
||||
return fmt.Errorf("failed to close latency file: %w", err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// CalculateStats reads all latencies from disk, sorts them, and calculates statistics
|
||||
// This is done on-demand to avoid keeping all latencies in memory during the test
|
||||
func (lr *LatencyRecorder) CalculateStats() (*LatencyStats, error) { |
||||
lr.mu.Lock() |
||||
filePath := lr.file.Name() |
||||
count := lr.count |
||||
lr.mu.Unlock() |
||||
|
||||
// If no measurements, return zeros
|
||||
if count == 0 { |
||||
return &LatencyStats{ |
||||
Avg: 0, |
||||
P90: 0, |
||||
P95: 0, |
||||
P99: 0, |
||||
Bottom10: 0, |
||||
Count: 0, |
||||
}, nil |
||||
} |
||||
|
||||
// Open file for reading
|
||||
f, err := os.Open(filePath) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to open latency file for reading: %w", err) |
||||
} |
||||
defer f.Close() |
||||
|
||||
// Read all latencies into memory temporarily for sorting
|
||||
latencies := make([]time.Duration, 0, count) |
||||
buf := make([]byte, 8) |
||||
reader := bufio.NewReader(f) |
||||
|
||||
for { |
||||
n, err := reader.Read(buf) |
||||
if err != nil { |
||||
if err.Error() == "EOF" { |
||||
break |
||||
} |
||||
return nil, fmt.Errorf("failed to read latency data: %w", err) |
||||
} |
||||
if n != 8 { |
||||
break |
||||
} |
||||
|
||||
nanos := binary.LittleEndian.Uint64(buf) |
||||
latencies = append(latencies, time.Duration(nanos)) |
||||
} |
||||
|
||||
// Check if we actually got any latencies
|
||||
if len(latencies) == 0 { |
||||
return &LatencyStats{ |
||||
Avg: 0, |
||||
P90: 0, |
||||
P95: 0, |
||||
P99: 0, |
||||
Bottom10: 0, |
||||
Count: 0, |
||||
}, nil |
||||
} |
||||
|
||||
// Sort for percentile calculation
|
||||
sort.Slice(latencies, func(i, j int) bool { |
||||
return latencies[i] < latencies[j] |
||||
}) |
||||
|
||||
// Calculate statistics
|
||||
stats := &LatencyStats{ |
||||
Count: int64(len(latencies)), |
||||
} |
||||
|
||||
// Average
|
||||
var sum time.Duration |
||||
for _, lat := range latencies { |
||||
sum += lat |
||||
} |
||||
stats.Avg = sum / time.Duration(len(latencies)) |
||||
|
||||
// Percentiles
|
||||
stats.P90 = latencies[int(float64(len(latencies))*0.90)] |
||||
stats.P95 = latencies[int(float64(len(latencies))*0.95)] |
||||
stats.P99 = latencies[int(float64(len(latencies))*0.99)] |
||||
|
||||
// Bottom 10% average
|
||||
bottom10Count := int(float64(len(latencies)) * 0.10) |
||||
if bottom10Count > 0 { |
||||
var bottom10Sum time.Duration |
||||
for i := 0; i < bottom10Count; i++ { |
||||
bottom10Sum += latencies[i] |
||||
} |
||||
stats.Bottom10 = bottom10Sum / time.Duration(bottom10Count) |
||||
} |
||||
|
||||
return stats, nil |
||||
} |
||||
Loading…
Reference in new issue