You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

330 lines
7.6 KiB

//go:build !(js && wasm)
package bbolt
import (
"sync"
"time"
bolt "go.etcd.io/bbolt"
"lol.mleku.dev/chk"
)
// BatchedWrite represents a single write operation
type BatchedWrite struct {
BucketName []byte
Key []byte
Value []byte
IsDelete bool
}
// EventBatch represents a complete event with all its indexes and graph updates
type EventBatch struct {
Serial uint64
EventData []byte // Serialized compact event data
Indexes []BatchedWrite // Index entries
EventVertex *EventVertex // Graph vertex for this event
PubkeyUpdate *PubkeyVertexUpdate // Update to author's pubkey vertex
MentionUpdates []*PubkeyVertexUpdate // Updates to mentioned pubkeys
EdgeKeys []EdgeKey // Edge keys for bloom filter
}
// PubkeyVertexUpdate represents an update to a pubkey's vertex
type PubkeyVertexUpdate struct {
PubkeySerial uint64
AddAuthored uint64 // Event serial to add to authored (0 if none)
AddMention uint64 // Event serial to add to mentions (0 if none)
}
// WriteBatcher accumulates writes and flushes them in batches.
// Optimized for HDD with large batches and periodic flushes.
type WriteBatcher struct {
db *bolt.DB
bloom *EdgeBloomFilter
logger *Logger
mu sync.Mutex
pending []*EventBatch
pendingSize int64
stopped bool
// Configuration
maxEvents int
maxBytes int64
flushPeriod time.Duration
// Channels for coordination
flushCh chan struct{}
shutdownCh chan struct{}
doneCh chan struct{}
// Stats
stats BatcherStats
}
// BatcherStats contains batcher statistics
type BatcherStats struct {
TotalBatches uint64
TotalEvents uint64
TotalBytes uint64
AverageLatencyMs float64
LastFlushTime time.Time
LastFlushDuration time.Duration
}
// NewWriteBatcher creates a new write batcher
func NewWriteBatcher(db *bolt.DB, bloom *EdgeBloomFilter, cfg *BboltConfig, logger *Logger) *WriteBatcher {
wb := &WriteBatcher{
db: db,
bloom: bloom,
logger: logger,
maxEvents: cfg.BatchMaxEvents,
maxBytes: cfg.BatchMaxBytes,
flushPeriod: cfg.BatchFlushTimeout,
pending: make([]*EventBatch, 0, cfg.BatchMaxEvents),
flushCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
go wb.flushLoop()
return wb
}
// Add adds an event batch to the pending writes
func (wb *WriteBatcher) Add(batch *EventBatch) error {
wb.mu.Lock()
defer wb.mu.Unlock()
if wb.stopped {
return ErrBatcherStopped
}
wb.pending = append(wb.pending, batch)
wb.pendingSize += int64(len(batch.EventData))
for _, idx := range batch.Indexes {
wb.pendingSize += int64(len(idx.Key) + len(idx.Value))
}
// Check thresholds
if len(wb.pending) >= wb.maxEvents || wb.pendingSize >= wb.maxBytes {
wb.triggerFlush()
}
return nil
}
// triggerFlush signals the flush loop to flush (must be called with lock held)
func (wb *WriteBatcher) triggerFlush() {
select {
case wb.flushCh <- struct{}{}:
default:
// Already a flush pending
}
}
// flushLoop runs the background flush timer
func (wb *WriteBatcher) flushLoop() {
defer close(wb.doneCh)
timer := time.NewTimer(wb.flushPeriod)
defer timer.Stop()
for {
select {
case <-timer.C:
if err := wb.Flush(); err != nil {
wb.logger.Errorf("bbolt: flush error: %v", err)
}
timer.Reset(wb.flushPeriod)
case <-wb.flushCh:
if err := wb.Flush(); err != nil {
wb.logger.Errorf("bbolt: flush error: %v", err)
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(wb.flushPeriod)
case <-wb.shutdownCh:
// Final flush
if err := wb.Flush(); err != nil {
wb.logger.Errorf("bbolt: final flush error: %v", err)
}
return
}
}
}
// Flush writes all pending batches to BBolt
func (wb *WriteBatcher) Flush() error {
wb.mu.Lock()
if len(wb.pending) == 0 {
wb.mu.Unlock()
return nil
}
// Swap out pending slice
toFlush := wb.pending
toFlushSize := wb.pendingSize
wb.pending = make([]*EventBatch, 0, wb.maxEvents)
wb.pendingSize = 0
wb.mu.Unlock()
startTime := time.Now()
// Collect all edge keys for bloom filter update
var allEdgeKeys []EdgeKey
for _, batch := range toFlush {
allEdgeKeys = append(allEdgeKeys, batch.EdgeKeys...)
}
// Update bloom filter first (memory only)
if len(allEdgeKeys) > 0 {
wb.bloom.AddBatch(allEdgeKeys)
}
// Write all batches in a single transaction
err := wb.db.Update(func(tx *bolt.Tx) error {
for _, batch := range toFlush {
// Write compact event data
cmpBucket := tx.Bucket(bucketCmp)
if cmpBucket != nil {
key := makeSerialKey(batch.Serial)
if err := cmpBucket.Put(key, batch.EventData); err != nil {
return err
}
}
// Write all indexes
for _, idx := range batch.Indexes {
bucket := tx.Bucket(idx.BucketName)
if bucket == nil {
continue
}
if idx.IsDelete {
if err := bucket.Delete(idx.Key); err != nil {
return err
}
} else {
if err := bucket.Put(idx.Key, idx.Value); err != nil {
return err
}
}
}
// Write event vertex
if batch.EventVertex != nil {
evBucket := tx.Bucket(bucketEv)
if evBucket != nil {
key := makeSerialKey(batch.Serial)
if err := evBucket.Put(key, batch.EventVertex.Encode()); err != nil {
return err
}
}
}
// Update pubkey vertices
if err := wb.updatePubkeyVertex(tx, batch.PubkeyUpdate); err != nil {
return err
}
for _, mentionUpdate := range batch.MentionUpdates {
if err := wb.updatePubkeyVertex(tx, mentionUpdate); err != nil {
return err
}
}
}
return nil
})
// Update stats
duration := time.Since(startTime)
wb.mu.Lock()
wb.stats.TotalBatches++
wb.stats.TotalEvents += uint64(len(toFlush))
wb.stats.TotalBytes += uint64(toFlushSize)
wb.stats.LastFlushTime = time.Now()
wb.stats.LastFlushDuration = duration
latencyMs := float64(duration.Milliseconds())
wb.stats.AverageLatencyMs = (wb.stats.AverageLatencyMs*float64(wb.stats.TotalBatches-1) + latencyMs) / float64(wb.stats.TotalBatches)
wb.mu.Unlock()
if err == nil {
wb.logger.Debugf("bbolt: flushed %d events (%d bytes) in %v", len(toFlush), toFlushSize, duration)
}
return err
}
// updatePubkeyVertex reads, updates, and writes a pubkey vertex
func (wb *WriteBatcher) updatePubkeyVertex(tx *bolt.Tx, update *PubkeyVertexUpdate) error {
if update == nil {
return nil
}
pvBucket := tx.Bucket(bucketPv)
if pvBucket == nil {
return nil
}
key := makeSerialKey(update.PubkeySerial)
// Load existing vertex or create new
var pv PubkeyVertex
existing := pvBucket.Get(key)
if existing != nil {
if err := pv.Decode(existing); chk.E(err) {
// If decode fails, start fresh
pv = PubkeyVertex{}
}
}
// Apply updates
if update.AddAuthored != 0 {
pv.AddAuthored(update.AddAuthored)
}
if update.AddMention != 0 {
pv.AddMention(update.AddMention)
}
// Write back
return pvBucket.Put(key, pv.Encode())
}
// Shutdown gracefully shuts down the batcher
func (wb *WriteBatcher) Shutdown() error {
wb.mu.Lock()
wb.stopped = true
wb.mu.Unlock()
close(wb.shutdownCh)
<-wb.doneCh
return nil
}
// Stats returns current batcher statistics
func (wb *WriteBatcher) Stats() BatcherStats {
wb.mu.Lock()
defer wb.mu.Unlock()
return wb.stats
}
// PendingCount returns the number of pending events
func (wb *WriteBatcher) PendingCount() int {
wb.mu.Lock()
defer wb.mu.Unlock()
return len(wb.pending)
}
// ErrBatcherStopped is returned when adding to a stopped batcher
var ErrBatcherStopped = &batcherStoppedError{}
type batcherStoppedError struct{}
func (e *batcherStoppedError) Error() string {
return "batcher has been stopped"
}