You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
257 lines
6.8 KiB
257 lines
6.8 KiB
package main |
|
|
|
import ( |
|
"bufio" |
|
"encoding/json" |
|
"fmt" |
|
"math" |
|
"math/rand" |
|
"os" |
|
"path/filepath" |
|
"time" |
|
|
|
"next.orly.dev/pkg/encoders/event" |
|
"next.orly.dev/pkg/encoders/tag" |
|
"next.orly.dev/pkg/encoders/timestamp" |
|
"next.orly.dev/pkg/interfaces/signer/p8k" |
|
) |
|
|
|
// EventStream manages disk-based event generation to avoid memory bloat |
|
type EventStream struct { |
|
baseDir string |
|
count int |
|
chunkSize int |
|
rng *rand.Rand |
|
} |
|
|
|
// NewEventStream creates a new event stream that stores events on disk |
|
func NewEventStream(baseDir string, count int) (*EventStream, error) { |
|
// Create events directory |
|
eventsDir := filepath.Join(baseDir, "events") |
|
if err := os.MkdirAll(eventsDir, 0755); err != nil { |
|
return nil, fmt.Errorf("failed to create events directory: %w", err) |
|
} |
|
|
|
return &EventStream{ |
|
baseDir: eventsDir, |
|
count: count, |
|
chunkSize: 1000, // Store 1000 events per file to balance I/O |
|
rng: rand.New(rand.NewSource(time.Now().UnixNano())), |
|
}, nil |
|
} |
|
|
|
// Generate creates all events and stores them in chunk files |
|
func (es *EventStream) Generate() error { |
|
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize |
|
|
|
for chunk := 0; chunk < numChunks; chunk++ { |
|
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) |
|
f, err := os.Create(chunkFile) |
|
if err != nil { |
|
return fmt.Errorf("failed to create chunk file %s: %w", chunkFile, err) |
|
} |
|
|
|
writer := bufio.NewWriter(f) |
|
startIdx := chunk * es.chunkSize |
|
endIdx := min(startIdx+es.chunkSize, es.count) |
|
|
|
for i := startIdx; i < endIdx; i++ { |
|
ev, err := es.generateEvent(i) |
|
if err != nil { |
|
f.Close() |
|
return fmt.Errorf("failed to generate event %d: %w", i, err) |
|
} |
|
|
|
// Marshal event to JSON |
|
eventJSON, err := json.Marshal(ev) |
|
if err != nil { |
|
f.Close() |
|
return fmt.Errorf("failed to marshal event %d: %w", i, err) |
|
} |
|
|
|
// Write JSON line |
|
if _, err := writer.Write(eventJSON); err != nil { |
|
f.Close() |
|
return fmt.Errorf("failed to write event %d: %w", i, err) |
|
} |
|
if _, err := writer.WriteString("\n"); err != nil { |
|
f.Close() |
|
return fmt.Errorf("failed to write newline after event %d: %w", i, err) |
|
} |
|
} |
|
|
|
if err := writer.Flush(); err != nil { |
|
f.Close() |
|
return fmt.Errorf("failed to flush chunk file %s: %w", chunkFile, err) |
|
} |
|
|
|
if err := f.Close(); err != nil { |
|
return fmt.Errorf("failed to close chunk file %s: %w", chunkFile, err) |
|
} |
|
|
|
if (chunk+1)%10 == 0 || chunk == numChunks-1 { |
|
fmt.Printf(" Generated %d/%d events (%.1f%%)\n", |
|
endIdx, es.count, float64(endIdx)/float64(es.count)*100) |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// generateEvent creates a single event with realistic size distribution |
|
func (es *EventStream) generateEvent(index int) (*event.E, error) { |
|
// Create signer for this event |
|
keys, err := p8k.New() |
|
if err != nil { |
|
return nil, fmt.Errorf("failed to create signer: %w", err) |
|
} |
|
if err := keys.Generate(); err != nil { |
|
return nil, fmt.Errorf("failed to generate keys: %w", err) |
|
} |
|
|
|
ev := event.New() |
|
ev.Kind = 1 // Text note |
|
ev.CreatedAt = timestamp.Now().I64() |
|
|
|
// Add some tags for realism |
|
numTags := es.rng.Intn(5) |
|
tags := make([]*tag.T, 0, numTags) |
|
for i := 0; i < numTags; i++ { |
|
tags = append(tags, tag.NewFromBytesSlice( |
|
[]byte("t"), |
|
[]byte(fmt.Sprintf("tag%d", es.rng.Intn(100))), |
|
)) |
|
} |
|
ev.Tags = tag.NewS(tags...) |
|
|
|
// Generate content with log-distributed size |
|
contentSize := es.generateLogDistributedSize() |
|
ev.Content = []byte(es.generateRandomContent(contentSize)) |
|
|
|
// Sign the event |
|
if err := ev.Sign(keys); err != nil { |
|
return nil, fmt.Errorf("failed to sign event: %w", err) |
|
} |
|
|
|
return ev, nil |
|
} |
|
|
|
// generateLogDistributedSize generates sizes following a power law distribution |
|
// This creates realistic size distribution: |
|
// - Most events are small (< 1KB) |
|
// - Some events are medium (1-10KB) |
|
// - Few events are large (10-100KB) |
|
func (es *EventStream) generateLogDistributedSize() int { |
|
// Use power law with exponent 4.0 for strong skew toward small sizes |
|
const powerExponent = 4.0 |
|
uniform := es.rng.Float64() |
|
skewed := math.Pow(uniform, powerExponent) |
|
|
|
// Scale to max size of 100KB |
|
const maxSize = 100 * 1024 |
|
size := int(skewed * maxSize) |
|
|
|
// Ensure minimum size of 10 bytes |
|
if size < 10 { |
|
size = 10 |
|
} |
|
|
|
return size |
|
} |
|
|
|
// generateRandomContent creates random text content of specified size |
|
func (es *EventStream) generateRandomContent(size int) string { |
|
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 \n" |
|
content := make([]byte, size) |
|
for i := range content { |
|
content[i] = charset[es.rng.Intn(len(charset))] |
|
} |
|
return string(content) |
|
} |
|
|
|
// GetEventChannel returns a channel that streams events from disk |
|
// bufferSize controls memory usage - larger buffers improve throughput but use more memory |
|
func (es *EventStream) GetEventChannel(bufferSize int) (<-chan *event.E, <-chan error) { |
|
eventChan := make(chan *event.E, bufferSize) |
|
errChan := make(chan error, 1) |
|
|
|
go func() { |
|
defer close(eventChan) |
|
defer close(errChan) |
|
|
|
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize |
|
|
|
for chunk := 0; chunk < numChunks; chunk++ { |
|
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) |
|
f, err := os.Open(chunkFile) |
|
if err != nil { |
|
errChan <- fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err) |
|
return |
|
} |
|
|
|
scanner := bufio.NewScanner(f) |
|
// Increase buffer size for large events |
|
buf := make([]byte, 0, 64*1024) |
|
scanner.Buffer(buf, 1024*1024) // Max 1MB per line |
|
|
|
for scanner.Scan() { |
|
var ev event.E |
|
if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil { |
|
f.Close() |
|
errChan <- fmt.Errorf("failed to unmarshal event: %w", err) |
|
return |
|
} |
|
eventChan <- &ev |
|
} |
|
|
|
if err := scanner.Err(); err != nil { |
|
f.Close() |
|
errChan <- fmt.Errorf("error reading chunk file %s: %w", chunkFile, err) |
|
return |
|
} |
|
|
|
f.Close() |
|
} |
|
}() |
|
|
|
return eventChan, errChan |
|
} |
|
|
|
// ForEach iterates over all events without loading them all into memory |
|
func (es *EventStream) ForEach(fn func(*event.E) error) error { |
|
numChunks := (es.count + es.chunkSize - 1) / es.chunkSize |
|
|
|
for chunk := 0; chunk < numChunks; chunk++ { |
|
chunkFile := filepath.Join(es.baseDir, fmt.Sprintf("chunk_%04d.jsonl", chunk)) |
|
f, err := os.Open(chunkFile) |
|
if err != nil { |
|
return fmt.Errorf("failed to open chunk file %s: %w", chunkFile, err) |
|
} |
|
|
|
scanner := bufio.NewScanner(f) |
|
buf := make([]byte, 0, 64*1024) |
|
scanner.Buffer(buf, 1024*1024) |
|
|
|
for scanner.Scan() { |
|
var ev event.E |
|
if err := json.Unmarshal(scanner.Bytes(), &ev); err != nil { |
|
f.Close() |
|
return fmt.Errorf("failed to unmarshal event: %w", err) |
|
} |
|
|
|
if err := fn(&ev); err != nil { |
|
f.Close() |
|
return err |
|
} |
|
} |
|
|
|
if err := scanner.Err(); err != nil { |
|
f.Close() |
|
return fmt.Errorf("error reading chunk file %s: %w", chunkFile, err) |
|
} |
|
|
|
f.Close() |
|
} |
|
|
|
return nil |
|
}
|
|
|