You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
330 lines
7.6 KiB
330 lines
7.6 KiB
//go:build !(js && wasm) |
|
|
|
package bbolt |
|
|
|
import ( |
|
"sync" |
|
"time" |
|
|
|
bolt "go.etcd.io/bbolt" |
|
"lol.mleku.dev/chk" |
|
) |
|
|
|
// BatchedWrite represents a single write operation |
|
type BatchedWrite struct { |
|
BucketName []byte |
|
Key []byte |
|
Value []byte |
|
IsDelete bool |
|
} |
|
|
|
// EventBatch represents a complete event with all its indexes and graph updates |
|
type EventBatch struct { |
|
Serial uint64 |
|
EventData []byte // Serialized compact event data |
|
Indexes []BatchedWrite // Index entries |
|
EventVertex *EventVertex // Graph vertex for this event |
|
PubkeyUpdate *PubkeyVertexUpdate // Update to author's pubkey vertex |
|
MentionUpdates []*PubkeyVertexUpdate // Updates to mentioned pubkeys |
|
EdgeKeys []EdgeKey // Edge keys for bloom filter |
|
} |
|
|
|
// PubkeyVertexUpdate represents an update to a pubkey's vertex |
|
type PubkeyVertexUpdate struct { |
|
PubkeySerial uint64 |
|
AddAuthored uint64 // Event serial to add to authored (0 if none) |
|
AddMention uint64 // Event serial to add to mentions (0 if none) |
|
} |
|
|
|
// WriteBatcher accumulates writes and flushes them in batches. |
|
// Optimized for HDD with large batches and periodic flushes. |
|
type WriteBatcher struct { |
|
db *bolt.DB |
|
bloom *EdgeBloomFilter |
|
logger *Logger |
|
|
|
mu sync.Mutex |
|
pending []*EventBatch |
|
pendingSize int64 |
|
stopped bool |
|
|
|
// Configuration |
|
maxEvents int |
|
maxBytes int64 |
|
flushPeriod time.Duration |
|
|
|
// Channels for coordination |
|
flushCh chan struct{} |
|
shutdownCh chan struct{} |
|
doneCh chan struct{} |
|
|
|
// Stats |
|
stats BatcherStats |
|
} |
|
|
|
// BatcherStats contains batcher statistics |
|
type BatcherStats struct { |
|
TotalBatches uint64 |
|
TotalEvents uint64 |
|
TotalBytes uint64 |
|
AverageLatencyMs float64 |
|
LastFlushTime time.Time |
|
LastFlushDuration time.Duration |
|
} |
|
|
|
// NewWriteBatcher creates a new write batcher |
|
func NewWriteBatcher(db *bolt.DB, bloom *EdgeBloomFilter, cfg *BboltConfig, logger *Logger) *WriteBatcher { |
|
wb := &WriteBatcher{ |
|
db: db, |
|
bloom: bloom, |
|
logger: logger, |
|
maxEvents: cfg.BatchMaxEvents, |
|
maxBytes: cfg.BatchMaxBytes, |
|
flushPeriod: cfg.BatchFlushTimeout, |
|
pending: make([]*EventBatch, 0, cfg.BatchMaxEvents), |
|
flushCh: make(chan struct{}, 1), |
|
shutdownCh: make(chan struct{}), |
|
doneCh: make(chan struct{}), |
|
} |
|
|
|
go wb.flushLoop() |
|
return wb |
|
} |
|
|
|
// Add adds an event batch to the pending writes |
|
func (wb *WriteBatcher) Add(batch *EventBatch) error { |
|
wb.mu.Lock() |
|
defer wb.mu.Unlock() |
|
|
|
if wb.stopped { |
|
return ErrBatcherStopped |
|
} |
|
|
|
wb.pending = append(wb.pending, batch) |
|
wb.pendingSize += int64(len(batch.EventData)) |
|
for _, idx := range batch.Indexes { |
|
wb.pendingSize += int64(len(idx.Key) + len(idx.Value)) |
|
} |
|
|
|
// Check thresholds |
|
if len(wb.pending) >= wb.maxEvents || wb.pendingSize >= wb.maxBytes { |
|
wb.triggerFlush() |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// triggerFlush signals the flush loop to flush (must be called with lock held) |
|
func (wb *WriteBatcher) triggerFlush() { |
|
select { |
|
case wb.flushCh <- struct{}{}: |
|
default: |
|
// Already a flush pending |
|
} |
|
} |
|
|
|
// flushLoop runs the background flush timer |
|
func (wb *WriteBatcher) flushLoop() { |
|
defer close(wb.doneCh) |
|
|
|
timer := time.NewTimer(wb.flushPeriod) |
|
defer timer.Stop() |
|
|
|
for { |
|
select { |
|
case <-timer.C: |
|
if err := wb.Flush(); err != nil { |
|
wb.logger.Errorf("bbolt: flush error: %v", err) |
|
} |
|
timer.Reset(wb.flushPeriod) |
|
|
|
case <-wb.flushCh: |
|
if err := wb.Flush(); err != nil { |
|
wb.logger.Errorf("bbolt: flush error: %v", err) |
|
} |
|
if !timer.Stop() { |
|
select { |
|
case <-timer.C: |
|
default: |
|
} |
|
} |
|
timer.Reset(wb.flushPeriod) |
|
|
|
case <-wb.shutdownCh: |
|
// Final flush |
|
if err := wb.Flush(); err != nil { |
|
wb.logger.Errorf("bbolt: final flush error: %v", err) |
|
} |
|
return |
|
} |
|
} |
|
} |
|
|
|
// Flush writes all pending batches to BBolt |
|
func (wb *WriteBatcher) Flush() error { |
|
wb.mu.Lock() |
|
if len(wb.pending) == 0 { |
|
wb.mu.Unlock() |
|
return nil |
|
} |
|
|
|
// Swap out pending slice |
|
toFlush := wb.pending |
|
toFlushSize := wb.pendingSize |
|
wb.pending = make([]*EventBatch, 0, wb.maxEvents) |
|
wb.pendingSize = 0 |
|
wb.mu.Unlock() |
|
|
|
startTime := time.Now() |
|
|
|
// Collect all edge keys for bloom filter update |
|
var allEdgeKeys []EdgeKey |
|
for _, batch := range toFlush { |
|
allEdgeKeys = append(allEdgeKeys, batch.EdgeKeys...) |
|
} |
|
|
|
// Update bloom filter first (memory only) |
|
if len(allEdgeKeys) > 0 { |
|
wb.bloom.AddBatch(allEdgeKeys) |
|
} |
|
|
|
// Write all batches in a single transaction |
|
err := wb.db.Update(func(tx *bolt.Tx) error { |
|
for _, batch := range toFlush { |
|
// Write compact event data |
|
cmpBucket := tx.Bucket(bucketCmp) |
|
if cmpBucket != nil { |
|
key := makeSerialKey(batch.Serial) |
|
if err := cmpBucket.Put(key, batch.EventData); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
// Write all indexes |
|
for _, idx := range batch.Indexes { |
|
bucket := tx.Bucket(idx.BucketName) |
|
if bucket == nil { |
|
continue |
|
} |
|
if idx.IsDelete { |
|
if err := bucket.Delete(idx.Key); err != nil { |
|
return err |
|
} |
|
} else { |
|
if err := bucket.Put(idx.Key, idx.Value); err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
|
|
// Write event vertex |
|
if batch.EventVertex != nil { |
|
evBucket := tx.Bucket(bucketEv) |
|
if evBucket != nil { |
|
key := makeSerialKey(batch.Serial) |
|
if err := evBucket.Put(key, batch.EventVertex.Encode()); err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
|
|
// Update pubkey vertices |
|
if err := wb.updatePubkeyVertex(tx, batch.PubkeyUpdate); err != nil { |
|
return err |
|
} |
|
for _, mentionUpdate := range batch.MentionUpdates { |
|
if err := wb.updatePubkeyVertex(tx, mentionUpdate); err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
return nil |
|
}) |
|
|
|
// Update stats |
|
duration := time.Since(startTime) |
|
wb.mu.Lock() |
|
wb.stats.TotalBatches++ |
|
wb.stats.TotalEvents += uint64(len(toFlush)) |
|
wb.stats.TotalBytes += uint64(toFlushSize) |
|
wb.stats.LastFlushTime = time.Now() |
|
wb.stats.LastFlushDuration = duration |
|
latencyMs := float64(duration.Milliseconds()) |
|
wb.stats.AverageLatencyMs = (wb.stats.AverageLatencyMs*float64(wb.stats.TotalBatches-1) + latencyMs) / float64(wb.stats.TotalBatches) |
|
wb.mu.Unlock() |
|
|
|
if err == nil { |
|
wb.logger.Debugf("bbolt: flushed %d events (%d bytes) in %v", len(toFlush), toFlushSize, duration) |
|
} |
|
|
|
return err |
|
} |
|
|
|
// updatePubkeyVertex reads, updates, and writes a pubkey vertex |
|
func (wb *WriteBatcher) updatePubkeyVertex(tx *bolt.Tx, update *PubkeyVertexUpdate) error { |
|
if update == nil { |
|
return nil |
|
} |
|
|
|
pvBucket := tx.Bucket(bucketPv) |
|
if pvBucket == nil { |
|
return nil |
|
} |
|
|
|
key := makeSerialKey(update.PubkeySerial) |
|
|
|
// Load existing vertex or create new |
|
var pv PubkeyVertex |
|
existing := pvBucket.Get(key) |
|
if existing != nil { |
|
if err := pv.Decode(existing); chk.E(err) { |
|
// If decode fails, start fresh |
|
pv = PubkeyVertex{} |
|
} |
|
} |
|
|
|
// Apply updates |
|
if update.AddAuthored != 0 { |
|
pv.AddAuthored(update.AddAuthored) |
|
} |
|
if update.AddMention != 0 { |
|
pv.AddMention(update.AddMention) |
|
} |
|
|
|
// Write back |
|
return pvBucket.Put(key, pv.Encode()) |
|
} |
|
|
|
// Shutdown gracefully shuts down the batcher |
|
func (wb *WriteBatcher) Shutdown() error { |
|
wb.mu.Lock() |
|
wb.stopped = true |
|
wb.mu.Unlock() |
|
|
|
close(wb.shutdownCh) |
|
<-wb.doneCh |
|
return nil |
|
} |
|
|
|
// Stats returns current batcher statistics |
|
func (wb *WriteBatcher) Stats() BatcherStats { |
|
wb.mu.Lock() |
|
defer wb.mu.Unlock() |
|
return wb.stats |
|
} |
|
|
|
// PendingCount returns the number of pending events |
|
func (wb *WriteBatcher) PendingCount() int { |
|
wb.mu.Lock() |
|
defer wb.mu.Unlock() |
|
return len(wb.pending) |
|
} |
|
|
|
// ErrBatcherStopped is returned when adding to a stopped batcher |
|
var ErrBatcherStopped = &batcherStoppedError{} |
|
|
|
type batcherStoppedError struct{} |
|
|
|
func (e *batcherStoppedError) Error() string { |
|
return "batcher has been stopped" |
|
}
|
|
|