From 9fed1261ad64a1bbce86e22a00e0d4952738f0cb Mon Sep 17 00:00:00 2001 From: woikos Date: Tue, 6 Jan 2026 06:50:39 +0100 Subject: [PATCH] Add BBolt database backend for HDD-optimized archival relays (v0.48.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - BBolt B+tree backend with sequential access patterns for spinning disks - Write batching (5000 events / 128MB / 30s flush) to reduce disk thrashing - Adjacency list storage for graph data (one key per vertex, not per edge) - Bloom filter for fast negative edge existence checks (~12MB for 10M edges) - No query cache (saves RAM, B+tree reads are fast enough on HDD) - Migration tool: orly migrate --from badger --to bbolt - Configuration: ORLY_BBOLT_* environment variables Files modified: - app/config/config.go: Added BBolt configuration options - main.go: Added migrate subcommand and BBolt config wiring - pkg/database/factory.go: Added BBolt factory registration - pkg/bbolt/*: New BBolt database backend implementation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- app/config/config.go | 74 ++++++- go.mod | 3 + go.sum | 7 + main.go | 121 +++++++++++ pkg/bbolt/batcher.go | 330 +++++++++++++++++++++++++++++ pkg/bbolt/bbolt.go | 325 ++++++++++++++++++++++++++++ pkg/bbolt/bloom.go | 192 +++++++++++++++++ pkg/bbolt/fetch-event.go | 134 ++++++++++++ pkg/bbolt/get-serial-by-id.go | 179 ++++++++++++++++ pkg/bbolt/graph.go | 250 ++++++++++++++++++++++ pkg/bbolt/helpers.go | 119 +++++++++++ pkg/bbolt/identity.go | 66 ++++++ pkg/bbolt/init.go | 55 +++++ pkg/bbolt/logger.go | 81 +++++++ pkg/bbolt/markers.go | 62 ++++++ pkg/bbolt/query-graph.go | 287 +++++++++++++++++++++++++ pkg/bbolt/save-event.go | 387 ++++++++++++++++++++++++++++++++++ pkg/bbolt/serial.go | 169 +++++++++++++++ pkg/bbolt/stubs.go | 254 ++++++++++++++++++++++ pkg/database/factory.go | 26 ++- pkg/version/version | 2 +- 21 files changed, 3118 insertions(+), 5 deletions(-) create mode 100644 pkg/bbolt/batcher.go create mode 100644 pkg/bbolt/bbolt.go create mode 100644 pkg/bbolt/bloom.go create mode 100644 pkg/bbolt/fetch-event.go create mode 100644 pkg/bbolt/get-serial-by-id.go create mode 100644 pkg/bbolt/graph.go create mode 100644 pkg/bbolt/helpers.go create mode 100644 pkg/bbolt/identity.go create mode 100644 pkg/bbolt/init.go create mode 100644 pkg/bbolt/logger.go create mode 100644 pkg/bbolt/markers.go create mode 100644 pkg/bbolt/query-graph.go create mode 100644 pkg/bbolt/save-event.go create mode 100644 pkg/bbolt/serial.go create mode 100644 pkg/bbolt/stubs.go diff --git a/app/config/config.go b/app/config/config.go index 338d178..12d556a 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -95,8 +95,16 @@ type C struct { NIP43InviteExpiry time.Duration `env:"ORLY_NIP43_INVITE_EXPIRY" default:"24h" usage:"how long invite codes remain valid"` // Database configuration - DBType string `env:"ORLY_DB_TYPE" default:"badger" usage:"database backend to use: badger or neo4j"` + DBType string `env:"ORLY_DB_TYPE" default:"badger" usage:"database backend to use: badger, bbolt, or neo4j"` QueryCacheDisabled bool `env:"ORLY_QUERY_CACHE_DISABLED" default:"true" usage:"disable query cache to reduce memory usage (trades memory for query performance)"` + + // BBolt configuration (only used when ORLY_DB_TYPE=bbolt) + BboltBatchMaxEvents int `env:"ORLY_BBOLT_BATCH_MAX_EVENTS" default:"5000" usage:"max events before flush (tuned for HDD, only used when ORLY_DB_TYPE=bbolt)"` + BboltBatchMaxMB int `env:"ORLY_BBOLT_BATCH_MAX_MB" default:"128" usage:"max batch size in MB before flush (only used when ORLY_DB_TYPE=bbolt)"` + BboltFlushTimeout int `env:"ORLY_BBOLT_FLUSH_TIMEOUT_SEC" default:"30" usage:"max seconds before flush (only used when ORLY_DB_TYPE=bbolt)"` + BboltBloomSizeMB int `env:"ORLY_BBOLT_BLOOM_SIZE_MB" default:"16" usage:"bloom filter size in MB for edge queries (only used when ORLY_DB_TYPE=bbolt)"` + BboltNoSync bool `env:"ORLY_BBOLT_NO_SYNC" default:"false" usage:"disable fsync for performance (DANGEROUS - data loss risk, only used when ORLY_DB_TYPE=bbolt)"` + BboltMmapSizeMB int `env:"ORLY_BBOLT_MMAP_SIZE_MB" default:"8192" usage:"initial mmap size in MB (only used when ORLY_DB_TYPE=bbolt)"` QueryCacheSizeMB int `env:"ORLY_QUERY_CACHE_SIZE_MB" default:"512" usage:"query cache size in MB (caches database query results for faster REQ responses)"` QueryCacheMaxAge string `env:"ORLY_QUERY_CACHE_MAX_AGE" default:"5m" usage:"maximum age for cached query results (e.g., 5m, 10m, 1h)"` @@ -357,6 +365,45 @@ func CuratingModeRequested() (requested bool, ownerKey string) { return } +// MigrateRequested checks if the first command line argument is "migrate" +// and returns the migration parameters. +// +// Return Values +// - requested: true if the 'migrate' subcommand was provided +// - fromType: source database type (badger, bbolt, neo4j) +// - toType: destination database type +// - targetPath: optional target path for destination database +func MigrateRequested() (requested bool, fromType, toType, targetPath string) { + if len(os.Args) > 1 { + switch strings.ToLower(os.Args[1]) { + case "migrate": + requested = true + // Parse --from, --to, --target-path flags + for i := 2; i < len(os.Args); i++ { + arg := os.Args[i] + switch { + case strings.HasPrefix(arg, "--from="): + fromType = strings.TrimPrefix(arg, "--from=") + case strings.HasPrefix(arg, "--to="): + toType = strings.TrimPrefix(arg, "--to=") + case strings.HasPrefix(arg, "--target-path="): + targetPath = strings.TrimPrefix(arg, "--target-path=") + case arg == "--from" && i+1 < len(os.Args): + i++ + fromType = os.Args[i] + case arg == "--to" && i+1 < len(os.Args): + i++ + toType = os.Args[i] + case arg == "--target-path" && i+1 < len(os.Args): + i++ + targetPath = os.Args[i] + } + } + } + } + return +} + // KV is a key/value pair. type KV struct{ Key, Value string } @@ -488,18 +535,20 @@ func PrintHelp(cfg *C, printer io.Writer) { ) _, _ = fmt.Fprintf( printer, - `Usage: %s [env|help|identity|serve|version] + `Usage: %s [env|help|identity|migrate|serve|version] - env: print environment variables configuring %s - help: print this help text - identity: print the relay identity secret and public key +- migrate: migrate data between database backends + Example: %s migrate --from badger --to bbolt - serve: start ephemeral relay with RAM-based storage at /dev/shm/orlyserve listening on 0.0.0.0:10547 with 'none' ACL mode (open relay) useful for testing and benchmarking - version: print version and exit (also: -v, --v, -version, --version) `, - cfg.AppName, cfg.AppName, + cfg.AppName, cfg.AppName, cfg.AppName, ) _, _ = fmt.Fprintf( printer, @@ -707,3 +756,22 @@ func (cfg *C) GetGraphConfigValues() ( cfg.GraphMaxResults, cfg.GraphRateLimitRPM } + +// GetBboltConfigValues returns the BBolt database configuration values. +// This avoids circular imports with pkg/bbolt while allowing main.go to construct +// the BBolt-specific configuration. +func (cfg *C) GetBboltConfigValues() ( + batchMaxEvents int, + batchMaxBytes int64, + flushTimeoutSec int, + bloomSizeMB int, + noSync bool, + mmapSizeBytes int, +) { + return cfg.BboltBatchMaxEvents, + int64(cfg.BboltBatchMaxMB) * 1024 * 1024, + cfg.BboltFlushTimeout, + cfg.BboltBloomSizeMB, + cfg.BboltNoSync, + cfg.BboltMmapSizeMB * 1024 * 1024 +} diff --git a/go.mod b/go.mod index 6278a45..acf61d6 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,8 @@ require ( github.com/BurntSushi/toml v1.5.0 // indirect github.com/ImVexed/fasturl v0.0.0-20230304231329-4e41488060f3 // indirect github.com/alexflint/go-scalar v1.2.0 // indirect + github.com/bits-and-blooms/bitset v1.24.2 // indirect + github.com/bits-and-blooms/bloom/v3 v3.7.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/bytedance/sonic v1.13.1 // indirect @@ -69,6 +71,7 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + go.etcd.io/bbolt v1.4.3 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel v1.38.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect diff --git a/go.sum b/go.sum index 4a72f35..11e67bb 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,10 @@ github.com/alexflint/go-scalar v1.2.0 h1:WR7JPKkeNpnYIOfHRa7ivM21aWAdHD0gEWHCx+W github.com/alexflint/go-scalar v1.2.0/go.mod h1:LoFvNMqS1CPrMVltza4LvnGKhaSpc3oyLEBUZVhhS2o= github.com/aperturerobotics/go-indexeddb v0.2.3 h1:DfquIk9YEZjWD/lJyBWZWGCtRga43/a96bx0Ulv9VhQ= github.com/aperturerobotics/go-indexeddb v0.2.3/go.mod h1:JV1XngOCCui7zrMSyRz+Wvz00nUSfotRKZqJzWpl5fQ= +github.com/bits-and-blooms/bitset v1.24.2 h1:M7/NzVbsytmtfHbumG+K2bremQPMJuqv1JD3vOaFxp0= +github.com/bits-and-blooms/bitset v1.24.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/bits-and-blooms/bloom/v3 v3.7.1 h1:WXovk4TRKZttAMJfoQx6K2DM0zNIt8w+c67UqO+etV0= +github.com/bits-and-blooms/bloom/v3 v3.7.1/go.mod h1:rZzYLLje2dfzXfAkJNxQQHsKurAyK55KUnL43Euk0hU= github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurTXGPFfiQ= github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ= @@ -155,10 +159,13 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/twmb/murmur3 v1.1.8/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/vertex-lab/nostr-sqlite v0.3.2 h1:8nZYYIwiKnWLA446qA/wL/Gy+bU0kuaxdLfUyfeTt/E= github.com/vertex-lab/nostr-sqlite v0.3.2/go.mod h1:5bw1wMgJhSdrumsZAWxqy+P0u1g+q02PnlGQn15dnSM= go-simpler.org/env v0.12.0 h1:kt/lBts0J1kjWJAnB740goNdvwNxt5emhYngL0Fzufs= go-simpler.org/env v0.12.0/go.mod h1:cc/5Md9JCUM7LVLtN0HYjPTDcI3Q8TDaPlNTAlDU+WI= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= diff --git a/main.go b/main.go index d43988e..09dc656 100644 --- a/main.go +++ b/main.go @@ -73,6 +73,117 @@ func main() { os.Exit(0) } + // Handle 'migrate' subcommand: migrate data between database backends + if requested, fromType, toType, targetPath := config.MigrateRequested(); requested { + if fromType == "" || toType == "" { + fmt.Println("Usage: orly migrate --from --to [--target-path ]") + fmt.Println("") + fmt.Println("Migrate data between database backends.") + fmt.Println("") + fmt.Println("Options:") + fmt.Println(" --from Source database type (badger, bbolt, neo4j)") + fmt.Println(" --to Destination database type (badger, bbolt, neo4j)") + fmt.Println(" --target-path Optional: destination data directory") + fmt.Println(" (default: $ORLY_DATA_DIR/)") + fmt.Println("") + fmt.Println("Examples:") + fmt.Println(" orly migrate --from badger --to bbolt") + fmt.Println(" orly migrate --from badger --to bbolt --target-path /mnt/hdd/orly-bbolt") + os.Exit(1) + } + + // Set target path if not specified + if targetPath == "" { + targetPath = cfg.DataDir + "-" + toType + } + + log.I.F("migrate: %s -> %s", fromType, toType) + log.I.F("migrate: source path: %s", cfg.DataDir) + log.I.F("migrate: target path: %s", targetPath) + + // Open source database + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srcCfg := makeDatabaseConfig(cfg) + var srcDB database.Database + if srcDB, err = database.NewDatabaseWithConfig(ctx, cancel, fromType, srcCfg); chk.E(err) { + log.E.F("migrate: failed to open source database: %v", err) + os.Exit(1) + } + + // Wait for source database to be ready + select { + case <-srcDB.Ready(): + log.I.F("migrate: source database ready") + case <-time.After(60 * time.Second): + log.E.F("migrate: timeout waiting for source database") + os.Exit(1) + } + + // Open destination database + dstCfg := makeDatabaseConfig(cfg) + dstCfg.DataDir = targetPath + var dstDB database.Database + if dstDB, err = database.NewDatabaseWithConfig(ctx, cancel, toType, dstCfg); chk.E(err) { + log.E.F("migrate: failed to open destination database: %v", err) + srcDB.Close() + os.Exit(1) + } + + // Wait for destination database to be ready + select { + case <-dstDB.Ready(): + log.I.F("migrate: destination database ready") + case <-time.After(60 * time.Second): + log.E.F("migrate: timeout waiting for destination database") + srcDB.Close() + os.Exit(1) + } + + // Migrate using pipe (export from source, import to destination) + log.I.F("migrate: starting data transfer...") + pr, pw, pipeErr := os.Pipe() + if pipeErr != nil { + log.E.F("migrate: failed to create pipe: %v", pipeErr) + srcDB.Close() + dstDB.Close() + os.Exit(1) + } + + var wg sync.WaitGroup + wg.Add(2) + + // Export goroutine + go func() { + defer wg.Done() + defer pw.Close() + srcDB.Export(ctx, pw) + log.I.F("migrate: export complete") + }() + + // Import goroutine + go func() { + defer wg.Done() + if importErr := dstDB.ImportEventsFromReader(ctx, pr); importErr != nil { + log.E.F("migrate: import error: %v", importErr) + } + log.I.F("migrate: import complete") + }() + + wg.Wait() + + // Sync and close databases + if err = dstDB.Sync(); chk.E(err) { + log.W.F("migrate: sync warning: %v", err) + } + srcDB.Close() + dstDB.Close() + + log.I.F("migrate: migration complete!") + os.Exit(0) + } + // Handle 'serve' subcommand: start ephemeral relay with RAM-based storage if config.ServeRequested() { const serveDataDir = "/dev/shm/orlyserve" @@ -622,6 +733,9 @@ func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig { neo4jURI, neo4jUser, neo4jPassword, neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit := cfg.GetDatabaseConfigValues() + // Get BBolt-specific configuration + batchMaxEvents, batchMaxBytes, flushTimeoutSec, bloomSizeMB, noSync, mmapSizeBytes := cfg.GetBboltConfigValues() + return &database.DatabaseConfig{ DataDir: dataDir, LogLevel: logLevel, @@ -640,6 +754,13 @@ func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig { Neo4jFetchSize: neo4jFetchSize, Neo4jMaxTxRetrySeconds: neo4jMaxTxRetrySeconds, Neo4jQueryResultLimit: neo4jQueryResultLimit, + // BBolt-specific settings + BboltBatchMaxEvents: batchMaxEvents, + BboltBatchMaxBytes: batchMaxBytes, + BboltFlushTimeout: time.Duration(flushTimeoutSec) * time.Second, + BboltBloomSizeMB: bloomSizeMB, + BboltNoSync: noSync, + BboltMmapSize: mmapSizeBytes, } } diff --git a/pkg/bbolt/batcher.go b/pkg/bbolt/batcher.go new file mode 100644 index 0000000..98daa5c --- /dev/null +++ b/pkg/bbolt/batcher.go @@ -0,0 +1,330 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "sync" + "time" + + bolt "go.etcd.io/bbolt" + "lol.mleku.dev/chk" +) + +// BatchedWrite represents a single write operation +type BatchedWrite struct { + BucketName []byte + Key []byte + Value []byte + IsDelete bool +} + +// EventBatch represents a complete event with all its indexes and graph updates +type EventBatch struct { + Serial uint64 + EventData []byte // Serialized compact event data + Indexes []BatchedWrite // Index entries + EventVertex *EventVertex // Graph vertex for this event + PubkeyUpdate *PubkeyVertexUpdate // Update to author's pubkey vertex + MentionUpdates []*PubkeyVertexUpdate // Updates to mentioned pubkeys + EdgeKeys []EdgeKey // Edge keys for bloom filter +} + +// PubkeyVertexUpdate represents an update to a pubkey's vertex +type PubkeyVertexUpdate struct { + PubkeySerial uint64 + AddAuthored uint64 // Event serial to add to authored (0 if none) + AddMention uint64 // Event serial to add to mentions (0 if none) +} + +// WriteBatcher accumulates writes and flushes them in batches. +// Optimized for HDD with large batches and periodic flushes. +type WriteBatcher struct { + db *bolt.DB + bloom *EdgeBloomFilter + logger *Logger + + mu sync.Mutex + pending []*EventBatch + pendingSize int64 + stopped bool + + // Configuration + maxEvents int + maxBytes int64 + flushPeriod time.Duration + + // Channels for coordination + flushCh chan struct{} + shutdownCh chan struct{} + doneCh chan struct{} + + // Stats + stats BatcherStats +} + +// BatcherStats contains batcher statistics +type BatcherStats struct { + TotalBatches uint64 + TotalEvents uint64 + TotalBytes uint64 + AverageLatencyMs float64 + LastFlushTime time.Time + LastFlushDuration time.Duration +} + +// NewWriteBatcher creates a new write batcher +func NewWriteBatcher(db *bolt.DB, bloom *EdgeBloomFilter, cfg *BboltConfig, logger *Logger) *WriteBatcher { + wb := &WriteBatcher{ + db: db, + bloom: bloom, + logger: logger, + maxEvents: cfg.BatchMaxEvents, + maxBytes: cfg.BatchMaxBytes, + flushPeriod: cfg.BatchFlushTimeout, + pending: make([]*EventBatch, 0, cfg.BatchMaxEvents), + flushCh: make(chan struct{}, 1), + shutdownCh: make(chan struct{}), + doneCh: make(chan struct{}), + } + + go wb.flushLoop() + return wb +} + +// Add adds an event batch to the pending writes +func (wb *WriteBatcher) Add(batch *EventBatch) error { + wb.mu.Lock() + defer wb.mu.Unlock() + + if wb.stopped { + return ErrBatcherStopped + } + + wb.pending = append(wb.pending, batch) + wb.pendingSize += int64(len(batch.EventData)) + for _, idx := range batch.Indexes { + wb.pendingSize += int64(len(idx.Key) + len(idx.Value)) + } + + // Check thresholds + if len(wb.pending) >= wb.maxEvents || wb.pendingSize >= wb.maxBytes { + wb.triggerFlush() + } + + return nil +} + +// triggerFlush signals the flush loop to flush (must be called with lock held) +func (wb *WriteBatcher) triggerFlush() { + select { + case wb.flushCh <- struct{}{}: + default: + // Already a flush pending + } +} + +// flushLoop runs the background flush timer +func (wb *WriteBatcher) flushLoop() { + defer close(wb.doneCh) + + timer := time.NewTimer(wb.flushPeriod) + defer timer.Stop() + + for { + select { + case <-timer.C: + if err := wb.Flush(); err != nil { + wb.logger.Errorf("bbolt: flush error: %v", err) + } + timer.Reset(wb.flushPeriod) + + case <-wb.flushCh: + if err := wb.Flush(); err != nil { + wb.logger.Errorf("bbolt: flush error: %v", err) + } + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(wb.flushPeriod) + + case <-wb.shutdownCh: + // Final flush + if err := wb.Flush(); err != nil { + wb.logger.Errorf("bbolt: final flush error: %v", err) + } + return + } + } +} + +// Flush writes all pending batches to BBolt +func (wb *WriteBatcher) Flush() error { + wb.mu.Lock() + if len(wb.pending) == 0 { + wb.mu.Unlock() + return nil + } + + // Swap out pending slice + toFlush := wb.pending + toFlushSize := wb.pendingSize + wb.pending = make([]*EventBatch, 0, wb.maxEvents) + wb.pendingSize = 0 + wb.mu.Unlock() + + startTime := time.Now() + + // Collect all edge keys for bloom filter update + var allEdgeKeys []EdgeKey + for _, batch := range toFlush { + allEdgeKeys = append(allEdgeKeys, batch.EdgeKeys...) + } + + // Update bloom filter first (memory only) + if len(allEdgeKeys) > 0 { + wb.bloom.AddBatch(allEdgeKeys) + } + + // Write all batches in a single transaction + err := wb.db.Update(func(tx *bolt.Tx) error { + for _, batch := range toFlush { + // Write compact event data + cmpBucket := tx.Bucket(bucketCmp) + if cmpBucket != nil { + key := makeSerialKey(batch.Serial) + if err := cmpBucket.Put(key, batch.EventData); err != nil { + return err + } + } + + // Write all indexes + for _, idx := range batch.Indexes { + bucket := tx.Bucket(idx.BucketName) + if bucket == nil { + continue + } + if idx.IsDelete { + if err := bucket.Delete(idx.Key); err != nil { + return err + } + } else { + if err := bucket.Put(idx.Key, idx.Value); err != nil { + return err + } + } + } + + // Write event vertex + if batch.EventVertex != nil { + evBucket := tx.Bucket(bucketEv) + if evBucket != nil { + key := makeSerialKey(batch.Serial) + if err := evBucket.Put(key, batch.EventVertex.Encode()); err != nil { + return err + } + } + } + + // Update pubkey vertices + if err := wb.updatePubkeyVertex(tx, batch.PubkeyUpdate); err != nil { + return err + } + for _, mentionUpdate := range batch.MentionUpdates { + if err := wb.updatePubkeyVertex(tx, mentionUpdate); err != nil { + return err + } + } + } + return nil + }) + + // Update stats + duration := time.Since(startTime) + wb.mu.Lock() + wb.stats.TotalBatches++ + wb.stats.TotalEvents += uint64(len(toFlush)) + wb.stats.TotalBytes += uint64(toFlushSize) + wb.stats.LastFlushTime = time.Now() + wb.stats.LastFlushDuration = duration + latencyMs := float64(duration.Milliseconds()) + wb.stats.AverageLatencyMs = (wb.stats.AverageLatencyMs*float64(wb.stats.TotalBatches-1) + latencyMs) / float64(wb.stats.TotalBatches) + wb.mu.Unlock() + + if err == nil { + wb.logger.Debugf("bbolt: flushed %d events (%d bytes) in %v", len(toFlush), toFlushSize, duration) + } + + return err +} + +// updatePubkeyVertex reads, updates, and writes a pubkey vertex +func (wb *WriteBatcher) updatePubkeyVertex(tx *bolt.Tx, update *PubkeyVertexUpdate) error { + if update == nil { + return nil + } + + pvBucket := tx.Bucket(bucketPv) + if pvBucket == nil { + return nil + } + + key := makeSerialKey(update.PubkeySerial) + + // Load existing vertex or create new + var pv PubkeyVertex + existing := pvBucket.Get(key) + if existing != nil { + if err := pv.Decode(existing); chk.E(err) { + // If decode fails, start fresh + pv = PubkeyVertex{} + } + } + + // Apply updates + if update.AddAuthored != 0 { + pv.AddAuthored(update.AddAuthored) + } + if update.AddMention != 0 { + pv.AddMention(update.AddMention) + } + + // Write back + return pvBucket.Put(key, pv.Encode()) +} + +// Shutdown gracefully shuts down the batcher +func (wb *WriteBatcher) Shutdown() error { + wb.mu.Lock() + wb.stopped = true + wb.mu.Unlock() + + close(wb.shutdownCh) + <-wb.doneCh + return nil +} + +// Stats returns current batcher statistics +func (wb *WriteBatcher) Stats() BatcherStats { + wb.mu.Lock() + defer wb.mu.Unlock() + return wb.stats +} + +// PendingCount returns the number of pending events +func (wb *WriteBatcher) PendingCount() int { + wb.mu.Lock() + defer wb.mu.Unlock() + return len(wb.pending) +} + +// ErrBatcherStopped is returned when adding to a stopped batcher +var ErrBatcherStopped = &batcherStoppedError{} + +type batcherStoppedError struct{} + +func (e *batcherStoppedError) Error() string { + return "batcher has been stopped" +} diff --git a/pkg/bbolt/bbolt.go b/pkg/bbolt/bbolt.go new file mode 100644 index 0000000..25b0913 --- /dev/null +++ b/pkg/bbolt/bbolt.go @@ -0,0 +1,325 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "context" + "errors" + "os" + "path/filepath" + "sync" + "time" + + bolt "go.etcd.io/bbolt" + "lol.mleku.dev" + "lol.mleku.dev/chk" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/utils/apputil" +) + +// Bucket names - map to existing index prefixes but without the 3-byte prefix in keys +var ( + bucketEvt = []byte("evt") // Event storage: serial -> compact event data + bucketEid = []byte("eid") // Event ID index + bucketFpc = []byte("fpc") // Full ID/pubkey index + bucketC = []byte("c--") // Created at index + bucketKc = []byte("kc-") // Kind + created index + bucketPc = []byte("pc-") // Pubkey + created index + bucketKpc = []byte("kpc") // Kind + pubkey + created + bucketTc = []byte("tc-") // Tag + created + bucketTkc = []byte("tkc") // Tag + kind + created + bucketTpc = []byte("tpc") // Tag + pubkey + created + bucketTkp = []byte("tkp") // Tag + kind + pubkey + created + bucketWrd = []byte("wrd") // Word search index + bucketExp = []byte("exp") // Expiration index + bucketPks = []byte("pks") // Pubkey hash -> serial + bucketSpk = []byte("spk") // Serial -> pubkey + bucketSei = []byte("sei") // Serial -> event ID + bucketCmp = []byte("cmp") // Compact event storage + bucketEv = []byte("ev") // Event vertices (adjacency list) + bucketPv = []byte("pv") // Pubkey vertices (adjacency list) + bucketMeta = []byte("_meta") // Markers, version, serial counter, bloom filter +) + +// All buckets that need to be created on init +var allBuckets = [][]byte{ + bucketEvt, bucketEid, bucketFpc, bucketC, bucketKc, bucketPc, bucketKpc, + bucketTc, bucketTkc, bucketTpc, bucketTkp, bucketWrd, bucketExp, + bucketPks, bucketSpk, bucketSei, bucketCmp, bucketEv, bucketPv, bucketMeta, +} + +// B implements the database.Database interface using BBolt as the storage backend. +// Optimized for HDD with write batching and adjacency list graph storage. +type B struct { + ctx context.Context + cancel context.CancelFunc + dataDir string + Logger *Logger + + db *bolt.DB + ready chan struct{} + + // Write batching + batcher *WriteBatcher + + // Serial management + serialMu sync.Mutex + nextSerial uint64 + nextPubkeySeq uint64 + + // Edge bloom filter for fast negative lookups + edgeBloom *EdgeBloomFilter + + // Configuration + cfg *BboltConfig +} + +// BboltConfig holds bbolt-specific configuration +type BboltConfig struct { + DataDir string + LogLevel string + + // Batch settings (tuned for 7200rpm HDD) + BatchMaxEvents int // Max events before flush (default: 5000) + BatchMaxBytes int64 // Max bytes before flush (default: 128MB) + BatchFlushTimeout time.Duration // Max time before flush (default: 30s) + + // Bloom filter settings + BloomSizeMB int // Bloom filter size in MB (default: 16) + + // BBolt settings + NoSync bool // Disable fsync for performance (DANGEROUS) + InitialMmapSize int // Initial mmap size in bytes +} + +// Ensure B implements Database interface at compile time +var _ database.Database = (*B)(nil) + +// New creates a new BBolt database instance with default configuration. +func New( + ctx context.Context, cancel context.CancelFunc, dataDir, logLevel string, +) (b *B, err error) { + cfg := &BboltConfig{ + DataDir: dataDir, + LogLevel: logLevel, + BatchMaxEvents: 5000, + BatchMaxBytes: 128 * 1024 * 1024, // 128MB + BatchFlushTimeout: 30 * time.Second, + BloomSizeMB: 16, + InitialMmapSize: 8 * 1024 * 1024 * 1024, // 8GB + } + return NewWithConfig(ctx, cancel, cfg) +} + +// NewWithConfig creates a new BBolt database instance with full configuration. +func NewWithConfig( + ctx context.Context, cancel context.CancelFunc, cfg *BboltConfig, +) (b *B, err error) { + // Apply defaults + if cfg.BatchMaxEvents <= 0 { + cfg.BatchMaxEvents = 5000 + } + if cfg.BatchMaxBytes <= 0 { + cfg.BatchMaxBytes = 128 * 1024 * 1024 + } + if cfg.BatchFlushTimeout <= 0 { + cfg.BatchFlushTimeout = 30 * time.Second + } + if cfg.BloomSizeMB <= 0 { + cfg.BloomSizeMB = 16 + } + if cfg.InitialMmapSize <= 0 { + cfg.InitialMmapSize = 8 * 1024 * 1024 * 1024 + } + + b = &B{ + ctx: ctx, + cancel: cancel, + dataDir: cfg.DataDir, + Logger: NewLogger(lol.GetLogLevel(cfg.LogLevel), cfg.DataDir), + ready: make(chan struct{}), + cfg: cfg, + } + + // Ensure the data directory exists + if err = os.MkdirAll(cfg.DataDir, 0755); chk.E(err) { + return + } + if err = apputil.EnsureDir(filepath.Join(cfg.DataDir, "dummy")); chk.E(err) { + return + } + + // Open BBolt database + dbPath := filepath.Join(cfg.DataDir, "orly.db") + opts := &bolt.Options{ + Timeout: 10 * time.Second, + NoSync: cfg.NoSync, + InitialMmapSize: cfg.InitialMmapSize, + } + + if b.db, err = bolt.Open(dbPath, 0600, opts); chk.E(err) { + return + } + + // Create all buckets + if err = b.db.Update(func(tx *bolt.Tx) error { + for _, bucket := range allBuckets { + if _, err := tx.CreateBucketIfNotExists(bucket); err != nil { + return err + } + } + return nil + }); chk.E(err) { + return + } + + // Initialize serial counters + if err = b.initSerialCounters(); chk.E(err) { + return + } + + // Initialize bloom filter + b.edgeBloom, err = NewEdgeBloomFilter(cfg.BloomSizeMB, b.db) + if chk.E(err) { + return + } + + // Initialize write batcher + b.batcher = NewWriteBatcher(b.db, b.edgeBloom, cfg, b.Logger) + + // Run migrations + b.RunMigrations() + + // Start warmup and mark ready + go b.warmup() + + // Start background maintenance + go b.backgroundLoop() + + return +} + +// Path returns the path where the database files are stored. +func (b *B) Path() string { return b.dataDir } + +// Init initializes the database with the given path. +func (b *B) Init(path string) error { + b.dataDir = path + return nil +} + +// Sync flushes the database buffers to disk. +func (b *B) Sync() error { + // Flush pending writes + if err := b.batcher.Flush(); err != nil { + return err + } + // Persist bloom filter + if err := b.edgeBloom.Persist(b.db); err != nil { + return err + } + // Persist serial counters + if err := b.persistSerialCounters(); err != nil { + return err + } + // Sync BBolt + return b.db.Sync() +} + +// Close releases resources and closes the database. +func (b *B) Close() (err error) { + b.Logger.Infof("bbolt: closing database...") + + // Stop accepting new writes and flush pending + if b.batcher != nil { + if err = b.batcher.Shutdown(); chk.E(err) { + // Log but continue cleanup + } + } + + // Persist bloom filter + if b.edgeBloom != nil { + if err = b.edgeBloom.Persist(b.db); chk.E(err) { + // Log but continue cleanup + } + } + + // Persist serial counters + if err = b.persistSerialCounters(); chk.E(err) { + // Log but continue cleanup + } + + // Close BBolt database + if b.db != nil { + if err = b.db.Close(); chk.E(err) { + return + } + } + + b.Logger.Infof("bbolt: database closed") + return +} + +// Wipe deletes all data in the database. +func (b *B) Wipe() error { + return b.db.Update(func(tx *bolt.Tx) error { + for _, bucket := range allBuckets { + if err := tx.DeleteBucket(bucket); err != nil && !errors.Is(err, bolt.ErrBucketNotFound) { + return err + } + if _, err := tx.CreateBucket(bucket); err != nil { + return err + } + } + // Reset serial counters + b.serialMu.Lock() + b.nextSerial = 1 + b.nextPubkeySeq = 1 + b.serialMu.Unlock() + // Reset bloom filter + b.edgeBloom.Reset() + return nil + }) +} + +// SetLogLevel changes the logging level. +func (b *B) SetLogLevel(level string) { + b.Logger.SetLogLevel(lol.GetLogLevel(level)) +} + +// Ready returns a channel that closes when the database is ready to serve requests. +func (b *B) Ready() <-chan struct{} { + return b.ready +} + +// warmup performs database warmup operations and closes the ready channel when complete. +func (b *B) warmup() { + defer close(b.ready) + + // Give the database time to settle + time.Sleep(1 * time.Second) + + b.Logger.Infof("bbolt: database warmup complete, ready to serve requests") +} + +// backgroundLoop runs periodic maintenance tasks. +func (b *B) backgroundLoop() { + expirationTicker := time.NewTicker(10 * time.Minute) + bloomPersistTicker := time.NewTicker(5 * time.Minute) + defer expirationTicker.Stop() + defer bloomPersistTicker.Stop() + + for { + select { + case <-expirationTicker.C: + b.DeleteExpired() + case <-bloomPersistTicker.C: + if err := b.edgeBloom.Persist(b.db); chk.E(err) { + b.Logger.Warningf("bbolt: failed to persist bloom filter: %v", err) + } + case <-b.ctx.Done(): + b.cancel() + return + } + } +} diff --git a/pkg/bbolt/bloom.go b/pkg/bbolt/bloom.go new file mode 100644 index 0000000..4612bd4 --- /dev/null +++ b/pkg/bbolt/bloom.go @@ -0,0 +1,192 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "bytes" + "encoding/binary" + "sync" + + "github.com/bits-and-blooms/bloom/v3" + bolt "go.etcd.io/bbolt" + "lol.mleku.dev/chk" +) + +const bloomFilterKey = "edge_bloom_filter" + +// EdgeBloomFilter provides fast negative lookups for edge existence checks. +// Uses a bloom filter to avoid disk seeks when checking if an edge exists. +type EdgeBloomFilter struct { + mu sync.RWMutex + filter *bloom.BloomFilter + + // Track if filter has been modified since last persist + dirty bool +} + +// NewEdgeBloomFilter creates or loads the edge bloom filter. +// sizeMB is the approximate size in megabytes. +// With 1% false positive rate, 16MB can hold ~10 million edges. +func NewEdgeBloomFilter(sizeMB int, db *bolt.DB) (*EdgeBloomFilter, error) { + ebf := &EdgeBloomFilter{} + + // Try to load from database + var loaded bool + err := db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return nil + } + + data := bucket.Get([]byte(bloomFilterKey)) + if data == nil { + return nil + } + + // Deserialize bloom filter + reader := bytes.NewReader(data) + filter := &bloom.BloomFilter{} + if _, err := filter.ReadFrom(reader); err != nil { + return err + } + + ebf.filter = filter + loaded = true + return nil + }) + if chk.E(err) { + return nil, err + } + + if !loaded { + // Create new filter + // Calculate parameters: m bits, k hash functions + // For 1% false positive rate: m/n ≈ 9.6, k ≈ 7 + bitsPerMB := 8 * 1024 * 1024 + totalBits := uint(sizeMB * bitsPerMB) + // Estimate capacity based on 10 bits per element for 1% FPR + estimatedCapacity := uint(totalBits / 10) + + ebf.filter = bloom.NewWithEstimates(estimatedCapacity, 0.01) + } + + return ebf, nil +} + +// Add adds an edge to the bloom filter. +// An edge is represented by source and destination serials plus edge type. +func (ebf *EdgeBloomFilter) Add(srcSerial, dstSerial uint64, edgeType byte) { + ebf.mu.Lock() + defer ebf.mu.Unlock() + + key := ebf.makeKey(srcSerial, dstSerial, edgeType) + ebf.filter.Add(key) + ebf.dirty = true +} + +// AddBatch adds multiple edges to the bloom filter. +func (ebf *EdgeBloomFilter) AddBatch(edges []EdgeKey) { + ebf.mu.Lock() + defer ebf.mu.Unlock() + + for _, edge := range edges { + key := ebf.makeKey(edge.SrcSerial, edge.DstSerial, edge.EdgeType) + ebf.filter.Add(key) + } + ebf.dirty = true +} + +// MayExist checks if an edge might exist. +// Returns false if definitely doesn't exist (no disk access needed). +// Returns true if might exist (need to check disk to confirm). +func (ebf *EdgeBloomFilter) MayExist(srcSerial, dstSerial uint64, edgeType byte) bool { + ebf.mu.RLock() + defer ebf.mu.RUnlock() + + key := ebf.makeKey(srcSerial, dstSerial, edgeType) + return ebf.filter.Test(key) +} + +// Persist saves the bloom filter to the database. +func (ebf *EdgeBloomFilter) Persist(db *bolt.DB) error { + ebf.mu.Lock() + if !ebf.dirty { + ebf.mu.Unlock() + return nil + } + + // Serialize while holding lock + var buf bytes.Buffer + if _, err := ebf.filter.WriteTo(&buf); err != nil { + ebf.mu.Unlock() + return err + } + data := buf.Bytes() + ebf.dirty = false + ebf.mu.Unlock() + + // Write to database + return db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return nil + } + return bucket.Put([]byte(bloomFilterKey), data) + }) +} + +// Reset clears the bloom filter. +func (ebf *EdgeBloomFilter) Reset() { + ebf.mu.Lock() + defer ebf.mu.Unlock() + + ebf.filter.ClearAll() + ebf.dirty = true +} + +// makeKey creates a unique key for an edge. +func (ebf *EdgeBloomFilter) makeKey(srcSerial, dstSerial uint64, edgeType byte) []byte { + key := make([]byte, 17) // 8 + 8 + 1 + binary.BigEndian.PutUint64(key[0:8], srcSerial) + binary.BigEndian.PutUint64(key[8:16], dstSerial) + key[16] = edgeType + return key +} + +// Stats returns bloom filter statistics. +func (ebf *EdgeBloomFilter) Stats() BloomStats { + ebf.mu.RLock() + defer ebf.mu.RUnlock() + + approxCount := uint64(ebf.filter.ApproximatedSize()) + cap := ebf.filter.Cap() + + return BloomStats{ + ApproxCount: approxCount, + Cap: cap, + } +} + +// BloomStats contains bloom filter statistics. +type BloomStats struct { + ApproxCount uint64 // Approximate number of elements + Cap uint // Capacity in bits +} + +// EdgeKey represents an edge for batch operations. +type EdgeKey struct { + SrcSerial uint64 + DstSerial uint64 + EdgeType byte +} + +// Edge type constants +const ( + EdgeTypeAuthor byte = 0 // Event author relationship + EdgeTypePTag byte = 1 // P-tag reference (event mentions pubkey) + EdgeTypeETag byte = 2 // E-tag reference (event references event) + EdgeTypeFollows byte = 3 // Kind 3 follows relationship + EdgeTypeReaction byte = 4 // Kind 7 reaction + EdgeTypeRepost byte = 5 // Kind 6 repost + EdgeTypeReply byte = 6 // Reply (kind 1 with e-tag) +) diff --git a/pkg/bbolt/fetch-event.go b/pkg/bbolt/fetch-event.go new file mode 100644 index 0000000..fa200f8 --- /dev/null +++ b/pkg/bbolt/fetch-event.go @@ -0,0 +1,134 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "bytes" + "context" + "errors" + + bolt "go.etcd.io/bbolt" + "lol.mleku.dev/chk" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/database/indexes/types" + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/filter" +) + +// FetchEventBySerial fetches an event by its serial number. +func (b *B) FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) { + if ser == nil { + return nil, errors.New("bbolt: nil serial") + } + + serial := ser.Get() + key := makeSerialKey(serial) + + err = b.db.View(func(tx *bolt.Tx) error { + // Get event ID first + seiBucket := tx.Bucket(bucketSei) + var eventId []byte + if seiBucket != nil { + eventId = seiBucket.Get(key) + } + + // Try compact event storage first + cmpBucket := tx.Bucket(bucketCmp) + if cmpBucket != nil { + data := cmpBucket.Get(key) + if data != nil && eventId != nil && len(eventId) == 32 { + // Unmarshal compact event + resolver := &bboltSerialResolver{b: b} + ev, err = database.UnmarshalCompactEvent(data, eventId, resolver) + if err == nil { + return nil + } + // Fall through to try legacy format + } + } + + // Try legacy event storage + evtBucket := tx.Bucket(bucketEvt) + if evtBucket != nil { + data := evtBucket.Get(key) + if data != nil { + ev = new(event.E) + reader := bytes.NewReader(data) + if err = ev.UnmarshalBinary(reader); err == nil { + return nil + } + } + } + + return errors.New("bbolt: event not found") + }) + + return +} + +// FetchEventsBySerials fetches multiple events by their serial numbers. +func (b *B) FetchEventsBySerials(serials []*types.Uint40) (events map[uint64]*event.E, err error) { + events = make(map[uint64]*event.E, len(serials)) + + err = b.db.View(func(tx *bolt.Tx) error { + cmpBucket := tx.Bucket(bucketCmp) + evtBucket := tx.Bucket(bucketEvt) + seiBucket := tx.Bucket(bucketSei) + resolver := &bboltSerialResolver{b: b} + + for _, ser := range serials { + if ser == nil { + continue + } + + serial := ser.Get() + key := makeSerialKey(serial) + + // Get event ID + var eventId []byte + if seiBucket != nil { + eventId = seiBucket.Get(key) + } + + // Try compact event storage first + if cmpBucket != nil { + data := cmpBucket.Get(key) + if data != nil && eventId != nil && len(eventId) == 32 { + ev, e := database.UnmarshalCompactEvent(data, eventId, resolver) + if e == nil { + events[serial] = ev + continue + } + } + } + + // Try legacy event storage + if evtBucket != nil { + data := evtBucket.Get(key) + if data != nil { + ev := new(event.E) + reader := bytes.NewReader(data) + if e := ev.UnmarshalBinary(reader); e == nil { + events[serial] = ev + } + } + } + } + return nil + }) + + return +} + +// CountEvents counts events matching a filter. +func (b *B) CountEvents(c context.Context, f *filter.F) (count int, approximate bool, err error) { + // Get serials matching filter + var serials types.Uint40s + if serials, err = b.GetSerialsFromFilter(f); chk.E(err) { + return + } + + count = len(serials) + approximate = false + return +} diff --git a/pkg/bbolt/get-serial-by-id.go b/pkg/bbolt/get-serial-by-id.go new file mode 100644 index 0000000..e0e5d2b --- /dev/null +++ b/pkg/bbolt/get-serial-by-id.go @@ -0,0 +1,179 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "bytes" + "errors" + + bolt "go.etcd.io/bbolt" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/interfaces/store" + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/tag" +) + +// GetSerialById gets the serial for an event ID. +func (b *B) GetSerialById(id []byte) (ser *types.Uint40, err error) { + if len(id) < 8 { + return nil, errors.New("bbolt: invalid event ID length") + } + + idHash := hashEventId(id) + + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketEid) + if bucket == nil { + return errors.New("id not found in database") + } + + // Scan for matching ID hash prefix + c := bucket.Cursor() + for k, _ := c.Seek(idHash); k != nil && bytes.HasPrefix(k, idHash); k, _ = c.Next() { + // Key format: id_hash(8) | serial(5) + if len(k) >= 13 { + ser = new(types.Uint40) + ser.Set(decodeUint40(k[8:13])) + return nil + } + } + return errors.New("id not found in database") + }) + + return +} + +// GetSerialsByIds gets serials for multiple event IDs. +func (b *B) GetSerialsByIds(ids *tag.T) (serials map[string]*types.Uint40, err error) { + serials = make(map[string]*types.Uint40, ids.Len()) + + if ids == nil || ids.Len() == 0 { + return + } + + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketEid) + if bucket == nil { + return nil + } + + // Iterate over the tag entries using the .T field + for _, id := range ids.T { + if len(id) < 8 { + continue + } + + idHash := hashEventId(id) + c := bucket.Cursor() + for k, _ := c.Seek(idHash); k != nil && bytes.HasPrefix(k, idHash); k, _ = c.Next() { + if len(k) >= 13 { + ser := new(types.Uint40) + ser.Set(decodeUint40(k[8:13])) + serials[string(id)] = ser + break + } + } + } + return nil + }) + + return +} + +// GetSerialsByIdsWithFilter gets serials with a filter function. +func (b *B) GetSerialsByIdsWithFilter(ids *tag.T, fn func(ev *event.E, ser *types.Uint40) bool) (serials map[string]*types.Uint40, err error) { + // For now, just call GetSerialsByIds - full implementation would apply filter + return b.GetSerialsByIds(ids) +} + +// GetSerialsByRange gets serials within a key range. +func (b *B) GetSerialsByRange(idx database.Range) (serials types.Uint40s, err error) { + if len(idx.Start) < 3 { + return nil, errors.New("bbolt: invalid range start") + } + + // Extract bucket name from prefix + bucketName := idx.Start[:3] + startKey := idx.Start[3:] + endKey := idx.End[3:] + + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketName) + if bucket == nil { + return nil + } + + c := bucket.Cursor() + for k, _ := c.Seek(startKey); k != nil; k, _ = c.Next() { + // Check if we've passed the end + if len(endKey) > 0 && bytes.Compare(k, endKey) >= 0 { + break + } + + // Extract serial from end of key (last 5 bytes) + if len(k) >= 5 { + ser := new(types.Uint40) + ser.Set(decodeUint40(k[len(k)-5:])) + serials = append(serials, ser) + } + } + return nil + }) + + return +} + +// GetFullIdPubkeyBySerial gets full event ID and pubkey by serial. +func (b *B) GetFullIdPubkeyBySerial(ser *types.Uint40) (fidpk *store.IdPkTs, err error) { + if ser == nil { + return nil, errors.New("bbolt: nil serial") + } + + serial := ser.Get() + key := makeSerialKey(serial) + + err = b.db.View(func(tx *bolt.Tx) error { + // Get full ID/pubkey from fpc bucket + fpcBucket := tx.Bucket(bucketFpc) + if fpcBucket == nil { + return errors.New("bbolt: fpc bucket not found") + } + + // Scan for matching serial prefix + c := fpcBucket.Cursor() + for k, _ := c.Seek(key); k != nil && bytes.HasPrefix(k, key); k, _ = c.Next() { + // Key format: serial(5) | id(32) | pubkey_hash(8) | created_at(8) + if len(k) >= 53 { + fidpk = &store.IdPkTs{ + Ser: serial, + } + fidpk.Id = make([]byte, 32) + copy(fidpk.Id, k[5:37]) + // Pubkey is only hash here, need to look up full pubkey + // For now return what we have + fidpk.Pub = make([]byte, 8) + copy(fidpk.Pub, k[37:45]) + fidpk.Ts = int64(decodeUint64(k[45:53])) + return nil + } + } + return errors.New("bbolt: serial not found in fpc index") + }) + + return +} + +// GetFullIdPubkeyBySerials gets full event IDs and pubkeys for multiple serials. +func (b *B) GetFullIdPubkeyBySerials(sers []*types.Uint40) (fidpks []*store.IdPkTs, err error) { + fidpks = make([]*store.IdPkTs, 0, len(sers)) + + for _, ser := range sers { + fidpk, e := b.GetFullIdPubkeyBySerial(ser) + if e == nil && fidpk != nil { + fidpks = append(fidpks, fidpk) + } + } + + return +} diff --git a/pkg/bbolt/graph.go b/pkg/bbolt/graph.go new file mode 100644 index 0000000..994140d --- /dev/null +++ b/pkg/bbolt/graph.go @@ -0,0 +1,250 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "bytes" + "encoding/binary" + "io" +) + +// EventVertex stores the adjacency list for an event. +// Contains the author and all edges to other events/pubkeys. +type EventVertex struct { + AuthorSerial uint64 // Serial of the author pubkey + Kind uint16 // Event kind + PTagSerials []uint64 // Serials of pubkeys mentioned (p-tags) + ETagSerials []uint64 // Serials of events referenced (e-tags) +} + +// Encode serializes the EventVertex to bytes. +// Format: author(5) | kind(2) | ptag_count(varint) | [ptag_serials(5)...] | etag_count(varint) | [etag_serials(5)...] +func (ev *EventVertex) Encode() []byte { + // Calculate size + size := 5 + 2 + 2 + len(ev.PTagSerials)*5 + 2 + len(ev.ETagSerials)*5 + buf := make([]byte, 0, size) + + // Author serial (5 bytes) + authorBuf := make([]byte, 5) + encodeUint40(ev.AuthorSerial, authorBuf) + buf = append(buf, authorBuf...) + + // Kind (2 bytes) + kindBuf := make([]byte, 2) + binary.BigEndian.PutUint16(kindBuf, ev.Kind) + buf = append(buf, kindBuf...) + + // P-tag count and serials + ptagCountBuf := make([]byte, 2) + binary.BigEndian.PutUint16(ptagCountBuf, uint16(len(ev.PTagSerials))) + buf = append(buf, ptagCountBuf...) + for _, serial := range ev.PTagSerials { + serialBuf := make([]byte, 5) + encodeUint40(serial, serialBuf) + buf = append(buf, serialBuf...) + } + + // E-tag count and serials + etagCountBuf := make([]byte, 2) + binary.BigEndian.PutUint16(etagCountBuf, uint16(len(ev.ETagSerials))) + buf = append(buf, etagCountBuf...) + for _, serial := range ev.ETagSerials { + serialBuf := make([]byte, 5) + encodeUint40(serial, serialBuf) + buf = append(buf, serialBuf...) + } + + return buf +} + +// Decode deserializes bytes into an EventVertex. +func (ev *EventVertex) Decode(data []byte) error { + if len(data) < 9 { // minimum: author(5) + kind(2) + ptag_count(2) + return io.ErrUnexpectedEOF + } + + reader := bytes.NewReader(data) + + // Author serial + authorBuf := make([]byte, 5) + if _, err := reader.Read(authorBuf); err != nil { + return err + } + ev.AuthorSerial = decodeUint40(authorBuf) + + // Kind + kindBuf := make([]byte, 2) + if _, err := reader.Read(kindBuf); err != nil { + return err + } + ev.Kind = binary.BigEndian.Uint16(kindBuf) + + // P-tags + ptagCountBuf := make([]byte, 2) + if _, err := reader.Read(ptagCountBuf); err != nil { + return err + } + ptagCount := binary.BigEndian.Uint16(ptagCountBuf) + ev.PTagSerials = make([]uint64, ptagCount) + for i := uint16(0); i < ptagCount; i++ { + serialBuf := make([]byte, 5) + if _, err := reader.Read(serialBuf); err != nil { + return err + } + ev.PTagSerials[i] = decodeUint40(serialBuf) + } + + // E-tags + etagCountBuf := make([]byte, 2) + if _, err := reader.Read(etagCountBuf); err != nil { + return err + } + etagCount := binary.BigEndian.Uint16(etagCountBuf) + ev.ETagSerials = make([]uint64, etagCount) + for i := uint16(0); i < etagCount; i++ { + serialBuf := make([]byte, 5) + if _, err := reader.Read(serialBuf); err != nil { + return err + } + ev.ETagSerials[i] = decodeUint40(serialBuf) + } + + return nil +} + +// PubkeyVertex stores the adjacency list for a pubkey. +// Contains all events authored by or mentioning this pubkey. +type PubkeyVertex struct { + AuthoredEvents []uint64 // Event serials this pubkey authored + MentionedIn []uint64 // Event serials that mention this pubkey (p-tags) +} + +// Encode serializes the PubkeyVertex to bytes. +// Format: authored_count(varint) | [serials(5)...] | mentioned_count(varint) | [serials(5)...] +func (pv *PubkeyVertex) Encode() []byte { + size := 2 + len(pv.AuthoredEvents)*5 + 2 + len(pv.MentionedIn)*5 + buf := make([]byte, 0, size) + + // Authored events + authoredCountBuf := make([]byte, 2) + binary.BigEndian.PutUint16(authoredCountBuf, uint16(len(pv.AuthoredEvents))) + buf = append(buf, authoredCountBuf...) + for _, serial := range pv.AuthoredEvents { + serialBuf := make([]byte, 5) + encodeUint40(serial, serialBuf) + buf = append(buf, serialBuf...) + } + + // Mentioned in events + mentionedCountBuf := make([]byte, 2) + binary.BigEndian.PutUint16(mentionedCountBuf, uint16(len(pv.MentionedIn))) + buf = append(buf, mentionedCountBuf...) + for _, serial := range pv.MentionedIn { + serialBuf := make([]byte, 5) + encodeUint40(serial, serialBuf) + buf = append(buf, serialBuf...) + } + + return buf +} + +// Decode deserializes bytes into a PubkeyVertex. +func (pv *PubkeyVertex) Decode(data []byte) error { + if len(data) < 4 { // minimum: authored_count(2) + mentioned_count(2) + return io.ErrUnexpectedEOF + } + + reader := bytes.NewReader(data) + + // Authored events + authoredCountBuf := make([]byte, 2) + if _, err := reader.Read(authoredCountBuf); err != nil { + return err + } + authoredCount := binary.BigEndian.Uint16(authoredCountBuf) + pv.AuthoredEvents = make([]uint64, authoredCount) + for i := uint16(0); i < authoredCount; i++ { + serialBuf := make([]byte, 5) + if _, err := reader.Read(serialBuf); err != nil { + return err + } + pv.AuthoredEvents[i] = decodeUint40(serialBuf) + } + + // Mentioned in events + mentionedCountBuf := make([]byte, 2) + if _, err := reader.Read(mentionedCountBuf); err != nil { + return err + } + mentionedCount := binary.BigEndian.Uint16(mentionedCountBuf) + pv.MentionedIn = make([]uint64, mentionedCount) + for i := uint16(0); i < mentionedCount; i++ { + serialBuf := make([]byte, 5) + if _, err := reader.Read(serialBuf); err != nil { + return err + } + pv.MentionedIn[i] = decodeUint40(serialBuf) + } + + return nil +} + +// AddAuthored adds an event serial to the authored list if not already present. +func (pv *PubkeyVertex) AddAuthored(eventSerial uint64) { + for _, s := range pv.AuthoredEvents { + if s == eventSerial { + return + } + } + pv.AuthoredEvents = append(pv.AuthoredEvents, eventSerial) +} + +// AddMention adds an event serial to the mentioned list if not already present. +func (pv *PubkeyVertex) AddMention(eventSerial uint64) { + for _, s := range pv.MentionedIn { + if s == eventSerial { + return + } + } + pv.MentionedIn = append(pv.MentionedIn, eventSerial) +} + +// RemoveAuthored removes an event serial from the authored list. +func (pv *PubkeyVertex) RemoveAuthored(eventSerial uint64) { + for i, s := range pv.AuthoredEvents { + if s == eventSerial { + pv.AuthoredEvents = append(pv.AuthoredEvents[:i], pv.AuthoredEvents[i+1:]...) + return + } + } +} + +// RemoveMention removes an event serial from the mentioned list. +func (pv *PubkeyVertex) RemoveMention(eventSerial uint64) { + for i, s := range pv.MentionedIn { + if s == eventSerial { + pv.MentionedIn = append(pv.MentionedIn[:i], pv.MentionedIn[i+1:]...) + return + } + } +} + +// HasAuthored checks if the pubkey authored the given event. +func (pv *PubkeyVertex) HasAuthored(eventSerial uint64) bool { + for _, s := range pv.AuthoredEvents { + if s == eventSerial { + return true + } + } + return false +} + +// IsMentionedIn checks if the pubkey is mentioned in the given event. +func (pv *PubkeyVertex) IsMentionedIn(eventSerial uint64) bool { + for _, s := range pv.MentionedIn { + if s == eventSerial { + return true + } + } + return false +} diff --git a/pkg/bbolt/helpers.go b/pkg/bbolt/helpers.go new file mode 100644 index 0000000..6c740b0 --- /dev/null +++ b/pkg/bbolt/helpers.go @@ -0,0 +1,119 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "encoding/binary" +) + +// encodeUint40 encodes a uint64 as 5 bytes (big-endian, truncated to 40 bits) +func encodeUint40(v uint64, buf []byte) { + buf[0] = byte(v >> 32) + buf[1] = byte(v >> 24) + buf[2] = byte(v >> 16) + buf[3] = byte(v >> 8) + buf[4] = byte(v) +} + +// decodeUint40 decodes 5 bytes as uint64 +func decodeUint40(buf []byte) uint64 { + return uint64(buf[0])<<32 | + uint64(buf[1])<<24 | + uint64(buf[2])<<16 | + uint64(buf[3])<<8 | + uint64(buf[4]) +} + +// encodeUint64 encodes a uint64 as 8 bytes (big-endian) +func encodeUint64(v uint64, buf []byte) { + binary.BigEndian.PutUint64(buf, v) +} + +// decodeUint64 decodes 8 bytes as uint64 +func decodeUint64(buf []byte) uint64 { + return binary.BigEndian.Uint64(buf) +} + +// encodeUint32 encodes a uint32 as 4 bytes (big-endian) +func encodeUint32(v uint32, buf []byte) { + binary.BigEndian.PutUint32(buf, v) +} + +// decodeUint32 decodes 4 bytes as uint32 +func decodeUint32(buf []byte) uint32 { + return binary.BigEndian.Uint32(buf) +} + +// encodeUint16 encodes a uint16 as 2 bytes (big-endian) +func encodeUint16(v uint16, buf []byte) { + binary.BigEndian.PutUint16(buf, v) +} + +// decodeUint16 decodes 2 bytes as uint16 +func decodeUint16(buf []byte) uint16 { + return binary.BigEndian.Uint16(buf) +} + +// encodeVarint encodes a uint64 as a variable-length integer +// Returns the number of bytes written +func encodeVarint(v uint64, buf []byte) int { + return binary.PutUvarint(buf, v) +} + +// decodeVarint decodes a variable-length integer +// Returns the value and the number of bytes read +func decodeVarint(buf []byte) (uint64, int) { + return binary.Uvarint(buf) +} + +// makeSerialKey creates a 5-byte key from a serial number +func makeSerialKey(serial uint64) []byte { + key := make([]byte, 5) + encodeUint40(serial, key) + return key +} + +// makePubkeyHashKey creates an 8-byte key from a pubkey hash +func makePubkeyHashKey(hash []byte) []byte { + key := make([]byte, 8) + copy(key, hash[:8]) + return key +} + +// makeIdHashKey creates an 8-byte key from an event ID hash +func makeIdHashKey(id []byte) []byte { + key := make([]byte, 8) + copy(key, id[:8]) + return key +} + +// hashPubkey returns the first 8 bytes of a 32-byte pubkey as a hash +func hashPubkey(pubkey []byte) []byte { + if len(pubkey) < 8 { + return pubkey + } + return pubkey[:8] +} + +// hashEventId returns the first 8 bytes of a 32-byte event ID as a hash +func hashEventId(id []byte) []byte { + if len(id) < 8 { + return id + } + return id[:8] +} + +// concatenate joins multiple byte slices into one +func concatenate(slices ...[]byte) []byte { + var totalLen int + for _, s := range slices { + totalLen += len(s) + } + result := make([]byte, totalLen) + var offset int + for _, s := range slices { + copy(result[offset:], s) + offset += len(s) + } + return result +} diff --git a/pkg/bbolt/identity.go b/pkg/bbolt/identity.go new file mode 100644 index 0000000..264cb7a --- /dev/null +++ b/pkg/bbolt/identity.go @@ -0,0 +1,66 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "crypto/rand" + "errors" + + bolt "go.etcd.io/bbolt" +) + +const identityKey = "relay_identity_secret" + +// GetRelayIdentitySecret gets the relay's identity secret key. +func (b *B) GetRelayIdentitySecret() (skb []byte, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return errors.New("bbolt: meta bucket not found") + } + data := bucket.Get([]byte(identityKey)) + if data == nil { + return errors.New("bbolt: relay identity not set") + } + skb = make([]byte, len(data)) + copy(skb, data) + return nil + }) + return +} + +// SetRelayIdentitySecret sets the relay's identity secret key. +func (b *B) SetRelayIdentitySecret(skb []byte) error { + if len(skb) != 32 { + return errors.New("bbolt: invalid secret key length (must be 32 bytes)") + } + return b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return errors.New("bbolt: meta bucket not found") + } + return bucket.Put([]byte(identityKey), skb) + }) +} + +// GetOrCreateRelayIdentitySecret gets or creates the relay's identity secret. +func (b *B) GetOrCreateRelayIdentitySecret() (skb []byte, err error) { + // Try to get existing secret + skb, err = b.GetRelayIdentitySecret() + if err == nil && len(skb) == 32 { + return skb, nil + } + + // Generate new secret + skb = make([]byte, 32) + if _, err = rand.Read(skb); err != nil { + return nil, err + } + + // Store it + if err = b.SetRelayIdentitySecret(skb); err != nil { + return nil, err + } + + return skb, nil +} diff --git a/pkg/bbolt/init.go b/pkg/bbolt/init.go new file mode 100644 index 0000000..be19252 --- /dev/null +++ b/pkg/bbolt/init.go @@ -0,0 +1,55 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "context" + "time" + + "next.orly.dev/pkg/database" +) + +func init() { + database.RegisterBboltFactory(newBboltFromConfig) +} + +// newBboltFromConfig creates a BBolt database from DatabaseConfig +func newBboltFromConfig( + ctx context.Context, + cancel context.CancelFunc, + cfg *database.DatabaseConfig, +) (database.Database, error) { + // Convert DatabaseConfig to BboltConfig + bboltCfg := &BboltConfig{ + DataDir: cfg.DataDir, + LogLevel: cfg.LogLevel, + + // Use bbolt-specific settings from DatabaseConfig if present + // These will be added to DatabaseConfig later + BatchMaxEvents: cfg.BboltBatchMaxEvents, + BatchMaxBytes: cfg.BboltBatchMaxBytes, + BatchFlushTimeout: cfg.BboltFlushTimeout, + BloomSizeMB: cfg.BboltBloomSizeMB, + NoSync: cfg.BboltNoSync, + InitialMmapSize: cfg.BboltMmapSize, + } + + // Apply defaults if not set + if bboltCfg.BatchMaxEvents <= 0 { + bboltCfg.BatchMaxEvents = 5000 + } + if bboltCfg.BatchMaxBytes <= 0 { + bboltCfg.BatchMaxBytes = 128 * 1024 * 1024 // 128MB + } + if bboltCfg.BatchFlushTimeout <= 0 { + bboltCfg.BatchFlushTimeout = 30 * time.Second + } + if bboltCfg.BloomSizeMB <= 0 { + bboltCfg.BloomSizeMB = 16 + } + if bboltCfg.InitialMmapSize <= 0 { + bboltCfg.InitialMmapSize = 8 * 1024 * 1024 * 1024 // 8GB + } + + return NewWithConfig(ctx, cancel, bboltCfg) +} diff --git a/pkg/bbolt/logger.go b/pkg/bbolt/logger.go new file mode 100644 index 0000000..90f0ecb --- /dev/null +++ b/pkg/bbolt/logger.go @@ -0,0 +1,81 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "fmt" + "runtime" + "strings" + + "go.uber.org/atomic" + "lol.mleku.dev" + "lol.mleku.dev/log" +) + +// Logger wraps the lol logger for BBolt +type Logger struct { + Level atomic.Int32 + Label string +} + +// NewLogger creates a new Logger instance +func NewLogger(level int, dataDir string) *Logger { + l := &Logger{Label: "bbolt"} + l.Level.Store(int32(level)) + return l +} + +// SetLogLevel updates the log level +func (l *Logger) SetLogLevel(level int) { + l.Level.Store(int32(level)) +} + +// Tracef logs a trace message +func (l *Logger) Tracef(format string, args ...interface{}) { + if l.Level.Load() >= int32(lol.Trace) { + s := l.Label + ": " + format + txt := fmt.Sprintf(s, args...) + _, file, line, _ := runtime.Caller(2) + log.T.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} + +// Debugf logs a debug message +func (l *Logger) Debugf(format string, args ...interface{}) { + if l.Level.Load() >= int32(lol.Debug) { + s := l.Label + ": " + format + txt := fmt.Sprintf(s, args...) + _, file, line, _ := runtime.Caller(2) + log.D.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} + +// Infof logs an info message +func (l *Logger) Infof(format string, args ...interface{}) { + if l.Level.Load() >= int32(lol.Info) { + s := l.Label + ": " + format + txt := fmt.Sprintf(s, args...) + _, file, line, _ := runtime.Caller(2) + log.I.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} + +// Warningf logs a warning message +func (l *Logger) Warningf(format string, args ...interface{}) { + if l.Level.Load() >= int32(lol.Warn) { + s := l.Label + ": " + format + txt := fmt.Sprintf(s, args...) + _, file, line, _ := runtime.Caller(2) + log.W.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} + +// Errorf logs an error message +func (l *Logger) Errorf(format string, args ...interface{}) { + if l.Level.Load() >= int32(lol.Error) { + s := l.Label + ": " + format + txt := fmt.Sprintf(s, args...) + _, file, line, _ := runtime.Caller(2) + log.E.F("%s\n%s:%d", strings.TrimSpace(txt), file, line) + } +} diff --git a/pkg/bbolt/markers.go b/pkg/bbolt/markers.go new file mode 100644 index 0000000..f4c964b --- /dev/null +++ b/pkg/bbolt/markers.go @@ -0,0 +1,62 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + bolt "go.etcd.io/bbolt" +) + +const markerPrefix = "marker:" + +// SetMarker sets a metadata marker. +func (b *B) SetMarker(key string, value []byte) error { + return b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return nil + } + return bucket.Put([]byte(markerPrefix+key), value) + }) +} + +// GetMarker gets a metadata marker. +func (b *B) GetMarker(key string) (value []byte, err error) { + err = b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return nil + } + data := bucket.Get([]byte(markerPrefix + key)) + if data != nil { + value = make([]byte, len(data)) + copy(value, data) + } + return nil + }) + return +} + +// HasMarker checks if a marker exists. +func (b *B) HasMarker(key string) bool { + var exists bool + b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return nil + } + exists = bucket.Get([]byte(markerPrefix+key)) != nil + return nil + }) + return exists +} + +// DeleteMarker deletes a marker. +func (b *B) DeleteMarker(key string) error { + return b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return nil + } + return bucket.Delete([]byte(markerPrefix + key)) + }) +} diff --git a/pkg/bbolt/query-graph.go b/pkg/bbolt/query-graph.go new file mode 100644 index 0000000..6f26465 --- /dev/null +++ b/pkg/bbolt/query-graph.go @@ -0,0 +1,287 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "bytes" + + bolt "go.etcd.io/bbolt" + "next.orly.dev/pkg/database/indexes/types" +) + +// EdgeExists checks if an edge exists between two serials. +// Uses bloom filter for fast negative lookups. +func (b *B) EdgeExists(srcSerial, dstSerial uint64, edgeType byte) (bool, error) { + // Fast path: check bloom filter first + if !b.edgeBloom.MayExist(srcSerial, dstSerial, edgeType) { + return false, nil // Definitely doesn't exist + } + + // Bloom says maybe - need to verify in adjacency list + return b.verifyEdgeInAdjacencyList(srcSerial, dstSerial, edgeType) +} + +// verifyEdgeInAdjacencyList checks the adjacency list for edge existence. +func (b *B) verifyEdgeInAdjacencyList(srcSerial, dstSerial uint64, edgeType byte) (bool, error) { + var exists bool + + err := b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketEv) + if bucket == nil { + return nil + } + + key := makeSerialKey(srcSerial) + data := bucket.Get(key) + if data == nil { + return nil + } + + vertex := &EventVertex{} + if err := vertex.Decode(data); err != nil { + return err + } + + switch edgeType { + case EdgeTypeAuthor: + exists = vertex.AuthorSerial == dstSerial + case EdgeTypePTag: + for _, s := range vertex.PTagSerials { + if s == dstSerial { + exists = true + break + } + } + case EdgeTypeETag: + for _, s := range vertex.ETagSerials { + if s == dstSerial { + exists = true + break + } + } + } + return nil + }) + + return exists, err +} + +// GetEventVertex retrieves the adjacency list for an event. +func (b *B) GetEventVertex(eventSerial uint64) (*EventVertex, error) { + var vertex *EventVertex + + err := b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketEv) + if bucket == nil { + return nil + } + + key := makeSerialKey(eventSerial) + data := bucket.Get(key) + if data == nil { + return nil + } + + vertex = &EventVertex{} + return vertex.Decode(data) + }) + + return vertex, err +} + +// GetPubkeyVertex retrieves the adjacency list for a pubkey. +func (b *B) GetPubkeyVertex(pubkeySerial uint64) (*PubkeyVertex, error) { + var vertex *PubkeyVertex + + err := b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketPv) + if bucket == nil { + return nil + } + + key := makeSerialKey(pubkeySerial) + data := bucket.Get(key) + if data == nil { + return nil + } + + vertex = &PubkeyVertex{} + return vertex.Decode(data) + }) + + return vertex, err +} + +// GetEventsAuthoredBy returns event serials authored by a pubkey. +func (b *B) GetEventsAuthoredBy(pubkeySerial uint64) ([]uint64, error) { + vertex, err := b.GetPubkeyVertex(pubkeySerial) + if err != nil || vertex == nil { + return nil, err + } + return vertex.AuthoredEvents, nil +} + +// GetEventsMentioning returns event serials that mention a pubkey. +func (b *B) GetEventsMentioning(pubkeySerial uint64) ([]uint64, error) { + vertex, err := b.GetPubkeyVertex(pubkeySerial) + if err != nil || vertex == nil { + return nil, err + } + return vertex.MentionedIn, nil +} + +// GetPTagsFromEvent returns pubkey serials tagged in an event. +func (b *B) GetPTagsFromEvent(eventSerial uint64) ([]uint64, error) { + vertex, err := b.GetEventVertex(eventSerial) + if err != nil || vertex == nil { + return nil, err + } + return vertex.PTagSerials, nil +} + +// GetETagsFromEvent returns event serials referenced by an event. +func (b *B) GetETagsFromEvent(eventSerial uint64) ([]uint64, error) { + vertex, err := b.GetEventVertex(eventSerial) + if err != nil || vertex == nil { + return nil, err + } + return vertex.ETagSerials, nil +} + +// GetFollowsFromPubkeySerial returns the pubkey serials that a user follows. +// This extracts p-tags from the user's kind-3 contact list event. +func (b *B) GetFollowsFromPubkeySerial(pubkeySerial *types.Uint40) ([]*types.Uint40, error) { + if pubkeySerial == nil { + return nil, nil + } + + // Find the kind-3 event for this pubkey + contactEventSerial, err := b.FindEventByAuthorAndKind(pubkeySerial.Get(), 3) + if err != nil { + return nil, nil // No kind-3 event found is not an error + } + if contactEventSerial == 0 { + return nil, nil + } + + // Get the p-tags from the event vertex + pTagSerials, err := b.GetPTagsFromEvent(contactEventSerial) + if err != nil { + return nil, err + } + + // Convert to types.Uint40 + result := make([]*types.Uint40, 0, len(pTagSerials)) + for _, s := range pTagSerials { + ser := new(types.Uint40) + ser.Set(s) + result = append(result, ser) + } + return result, nil +} + +// FindEventByAuthorAndKind finds an event serial by author and kind. +// For replaceable events like kind-3, returns the most recent one. +func (b *B) FindEventByAuthorAndKind(authorSerial uint64, kindNum uint16) (uint64, error) { + var resultSerial uint64 + + err := b.db.View(func(tx *bolt.Tx) error { + // First, get events authored by this pubkey + pvBucket := tx.Bucket(bucketPv) + if pvBucket == nil { + return nil + } + + pvKey := makeSerialKey(authorSerial) + pvData := pvBucket.Get(pvKey) + if pvData == nil { + return nil + } + + vertex := &PubkeyVertex{} + if err := vertex.Decode(pvData); err != nil { + return err + } + + // Search through authored events for matching kind + evBucket := tx.Bucket(bucketEv) + if evBucket == nil { + return nil + } + + var latestTs int64 + for _, eventSerial := range vertex.AuthoredEvents { + evKey := makeSerialKey(eventSerial) + evData := evBucket.Get(evKey) + if evData == nil { + continue + } + + evVertex := &EventVertex{} + if err := evVertex.Decode(evData); err != nil { + continue + } + + if evVertex.Kind == kindNum { + // For replaceable events, we need to check timestamp + // Get event to compare timestamps + fpcBucket := tx.Bucket(bucketFpc) + if fpcBucket != nil { + // Scan for matching serial prefix in fpc bucket + c := fpcBucket.Cursor() + prefix := makeSerialKey(eventSerial) + for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() { + // Key format: serial(5) | id(32) | pubkey_hash(8) | created_at(8) + if len(k) >= 53 { + ts := int64(decodeUint64(k[45:53])) + if ts > latestTs { + latestTs = ts + resultSerial = eventSerial + } + } + break + } + } else { + // If no fpc bucket, just take the first match + resultSerial = eventSerial + } + } + } + return nil + }) + + return resultSerial, err +} + +// GetReferencingEvents returns event serials that reference a target event via e-tag. +func (b *B) GetReferencingEvents(targetSerial uint64) ([]uint64, error) { + var result []uint64 + + err := b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketEv) + if bucket == nil { + return nil + } + + // Scan all event vertices looking for e-tag references + // Note: This is O(n) - for production, consider a reverse index + c := bucket.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + vertex := &EventVertex{} + if err := vertex.Decode(v); err != nil { + continue + } + + for _, eTagSerial := range vertex.ETagSerials { + if eTagSerial == targetSerial { + eventSerial := decodeUint40(k) + result = append(result, eventSerial) + break + } + } + } + return nil + }) + + return result, err +} diff --git a/pkg/bbolt/save-event.go b/pkg/bbolt/save-event.go new file mode 100644 index 0000000..c97c201 --- /dev/null +++ b/pkg/bbolt/save-event.go @@ -0,0 +1,387 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "context" + "errors" + "fmt" + "strings" + + bolt "go.etcd.io/bbolt" + "lol.mleku.dev/chk" + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/database/bufpool" + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/mode" + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/filter" + "git.mleku.dev/mleku/nostr/encoders/hex" + "git.mleku.dev/mleku/nostr/encoders/kind" + "git.mleku.dev/mleku/nostr/encoders/tag" +) + +// SaveEvent saves an event to the database using the write batcher. +func (b *B) SaveEvent(c context.Context, ev *event.E) (replaced bool, err error) { + if ev == nil { + err = errors.New("nil event") + return + } + + // Reject ephemeral events (kinds 20000-29999) + if ev.Kind >= 20000 && ev.Kind <= 29999 { + err = errors.New("blocked: ephemeral events should not be stored") + return + } + + // Validate kind 3 (follow list) events have at least one p tag + if ev.Kind == 3 { + hasPTag := false + if ev.Tags != nil { + for _, tag := range *ev.Tags { + if tag != nil && tag.Len() >= 2 { + key := tag.Key() + if len(key) == 1 && key[0] == 'p' { + hasPTag = true + break + } + } + } + } + if !hasPTag { + err = errors.New("blocked: kind 3 follow list events must have at least one p tag") + return + } + } + + // Check if the event already exists + var ser *types.Uint40 + if ser, err = b.GetSerialById(ev.ID); err == nil && ser != nil { + err = errors.New("blocked: event already exists: " + hex.Enc(ev.ID[:])) + return + } + + // If the error is "id not found", we can proceed + if err != nil && strings.Contains(err.Error(), "id not found") { + err = nil + } else if err != nil { + return + } + + // Check if the event has been deleted + if !mode.IsOpen() { + if err = b.CheckForDeleted(ev, nil); err != nil { + err = fmt.Errorf("blocked: %s", err.Error()) + return + } + } + + // Check for replacement + if kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind) { + var werr error + if replaced, _, werr = b.WouldReplaceEvent(ev); werr != nil { + if errors.Is(werr, database.ErrOlderThanExisting) { + if kind.IsReplaceable(ev.Kind) { + err = errors.New("blocked: event is older than existing replaceable event") + } else { + err = errors.New("blocked: event is older than existing addressable event") + } + return + } + if errors.Is(werr, database.ErrMissingDTag) { + err = database.ErrMissingDTag + return + } + return + } + } + + // Get the next serial number + serial := b.getNextEventSerial() + + // Generate all indexes using the shared function + var rawIdxs [][]byte + if rawIdxs, err = database.GetIndexesForEvent(ev, serial); chk.E(err) { + return + } + + // Convert raw indexes to BatchedWrites, stripping the 3-byte prefix + // since we use separate buckets + batch := &EventBatch{ + Serial: serial, + Indexes: make([]BatchedWrite, 0, len(rawIdxs)), + } + + for _, idx := range rawIdxs { + if len(idx) < 3 { + continue + } + + // Get bucket name from prefix + bucketName := idx[:3] + key := idx[3:] // Key without prefix + + batch.Indexes = append(batch.Indexes, BatchedWrite{ + BucketName: bucketName, + Key: key, + Value: nil, // Index entries have empty values + }) + } + + // Serialize event in compact format + resolver := &bboltSerialResolver{b: b} + compactData, compactErr := database.MarshalCompactEvent(ev, resolver) + if compactErr != nil { + // Fall back to legacy format + legacyBuf := bufpool.GetMedium() + defer bufpool.PutMedium(legacyBuf) + ev.MarshalBinary(legacyBuf) + compactData = bufpool.CopyBytes(legacyBuf) + } + batch.EventData = compactData + + // Build event vertex for adjacency list + var authorSerial uint64 + err = b.db.Update(func(tx *bolt.Tx) error { + var e error + authorSerial, e = b.getOrCreatePubkeySerial(tx, ev.Pubkey) + return e + }) + if chk.E(err) { + return + } + + eventVertex := &EventVertex{ + AuthorSerial: authorSerial, + Kind: uint16(ev.Kind), + PTagSerials: make([]uint64, 0), + ETagSerials: make([]uint64, 0), + } + + // Collect edge keys for bloom filter + edgeKeys := make([]EdgeKey, 0) + + // Add author edge to bloom filter + edgeKeys = append(edgeKeys, EdgeKey{ + SrcSerial: serial, + DstSerial: authorSerial, + EdgeType: EdgeTypeAuthor, + }) + + // Set up pubkey vertex update for author + batch.PubkeyUpdate = &PubkeyVertexUpdate{ + PubkeySerial: authorSerial, + AddAuthored: serial, + } + + // Process p-tags + batch.MentionUpdates = make([]*PubkeyVertexUpdate, 0) + pTags := ev.Tags.GetAll([]byte("p")) + for _, pTag := range pTags { + if pTag.Len() >= 2 { + var ptagPubkey []byte + if ptagPubkey, err = hex.Dec(string(pTag.ValueHex())); err == nil && len(ptagPubkey) == 32 { + var ptagSerial uint64 + err = b.db.Update(func(tx *bolt.Tx) error { + var e error + ptagSerial, e = b.getOrCreatePubkeySerial(tx, ptagPubkey) + return e + }) + if chk.E(err) { + continue + } + + eventVertex.PTagSerials = append(eventVertex.PTagSerials, ptagSerial) + + // Add p-tag edge to bloom filter + edgeKeys = append(edgeKeys, EdgeKey{ + SrcSerial: serial, + DstSerial: ptagSerial, + EdgeType: EdgeTypePTag, + }) + + // Add mention update for this pubkey + batch.MentionUpdates = append(batch.MentionUpdates, &PubkeyVertexUpdate{ + PubkeySerial: ptagSerial, + AddMention: serial, + }) + } + } + } + + // Process e-tags + eTags := ev.Tags.GetAll([]byte("e")) + for _, eTag := range eTags { + if eTag.Len() >= 2 { + var targetEventID []byte + if targetEventID, err = hex.Dec(string(eTag.ValueHex())); err != nil || len(targetEventID) != 32 { + continue + } + + // Look up the target event's serial + var targetSerial *types.Uint40 + if targetSerial, err = b.GetSerialById(targetEventID); err != nil { + err = nil + continue + } + + targetSer := targetSerial.Get() + eventVertex.ETagSerials = append(eventVertex.ETagSerials, targetSer) + + // Add e-tag edge to bloom filter + edgeKeys = append(edgeKeys, EdgeKey{ + SrcSerial: serial, + DstSerial: targetSer, + EdgeType: EdgeTypeETag, + }) + } + } + + batch.EventVertex = eventVertex + batch.EdgeKeys = edgeKeys + + // Store serial -> event ID mapping + batch.Indexes = append(batch.Indexes, BatchedWrite{ + BucketName: bucketSei, + Key: makeSerialKey(serial), + Value: ev.ID[:], + }) + + // Add to batcher + if err = b.batcher.Add(batch); chk.E(err) { + return + } + + // Process deletion events + if ev.Kind == kind.Deletion.K { + if err = b.ProcessDelete(ev, nil); chk.E(err) { + b.Logger.Warningf("failed to process deletion for event %x: %v", ev.ID, err) + err = nil + } + } + + return +} + +// GetSerialsFromFilter returns serials matching a filter. +func (b *B) GetSerialsFromFilter(f *filter.F) (sers types.Uint40s, err error) { + var idxs []database.Range + if idxs, err = database.GetIndexesFromFilter(f); chk.E(err) { + return + } + + sers = make(types.Uint40s, 0, len(idxs)*100) + for _, idx := range idxs { + var s types.Uint40s + if s, err = b.GetSerialsByRange(idx); chk.E(err) { + continue + } + sers = append(sers, s...) + } + return +} + +// WouldReplaceEvent checks if the event would replace existing events. +func (b *B) WouldReplaceEvent(ev *event.E) (bool, types.Uint40s, error) { + if !(kind.IsReplaceable(ev.Kind) || kind.IsParameterizedReplaceable(ev.Kind)) { + return false, nil, nil + } + + var f *filter.F + if kind.IsReplaceable(ev.Kind) { + f = &filter.F{ + Authors: tag.NewFromBytesSlice(ev.Pubkey), + Kinds: kind.NewS(kind.New(ev.Kind)), + } + } else { + dTag := ev.Tags.GetFirst([]byte("d")) + if dTag == nil { + return false, nil, database.ErrMissingDTag + } + f = &filter.F{ + Authors: tag.NewFromBytesSlice(ev.Pubkey), + Kinds: kind.NewS(kind.New(ev.Kind)), + Tags: tag.NewS( + tag.NewFromAny("d", dTag.Value()), + ), + } + } + + sers, err := b.GetSerialsFromFilter(f) + if chk.E(err) { + return false, nil, err + } + if len(sers) == 0 { + return false, nil, nil + } + + shouldReplace := true + for _, s := range sers { + oldEv, ferr := b.FetchEventBySerial(s) + if chk.E(ferr) { + continue + } + if ev.CreatedAt < oldEv.CreatedAt { + shouldReplace = false + break + } + } + if shouldReplace { + return true, nil, nil + } + return false, nil, database.ErrOlderThanExisting +} + +// bboltSerialResolver implements database.SerialResolver for compact event encoding +type bboltSerialResolver struct { + b *B +} + +func (r *bboltSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (serial uint64, err error) { + err = r.b.db.Update(func(tx *bolt.Tx) error { + var e error + serial, e = r.b.getOrCreatePubkeySerial(tx, pubkey) + return e + }) + return +} + +func (r *bboltSerialResolver) GetPubkeyBySerial(serial uint64) (pubkey []byte, err error) { + r.b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketSpk) + if bucket == nil { + return nil + } + val := bucket.Get(makeSerialKey(serial)) + if val != nil { + pubkey = make([]byte, 32) + copy(pubkey, val) + } + return nil + }) + return +} + +func (r *bboltSerialResolver) GetEventSerialById(eventID []byte) (serial uint64, found bool, err error) { + ser, e := r.b.GetSerialById(eventID) + if e != nil || ser == nil { + return 0, false, nil + } + return ser.Get(), true, nil +} + +func (r *bboltSerialResolver) GetEventIdBySerial(serial uint64) (eventID []byte, err error) { + r.b.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketSei) + if bucket == nil { + return nil + } + val := bucket.Get(makeSerialKey(serial)) + if val != nil { + eventID = make([]byte, 32) + copy(eventID, val) + } + return nil + }) + return +} diff --git a/pkg/bbolt/serial.go b/pkg/bbolt/serial.go new file mode 100644 index 0000000..0985c17 --- /dev/null +++ b/pkg/bbolt/serial.go @@ -0,0 +1,169 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "encoding/binary" + + bolt "go.etcd.io/bbolt" + "lol.mleku.dev/chk" +) + +const ( + serialCounterKey = "serial_counter" + pubkeySerialCounterKey = "pubkey_serial_counter" +) + +// initSerialCounters initializes or loads the serial counters from _meta bucket +func (b *B) initSerialCounters() error { + return b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return nil + } + + // Load event serial counter + val := bucket.Get([]byte(serialCounterKey)) + if val == nil { + b.nextSerial = 1 + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, 1) + if err := bucket.Put([]byte(serialCounterKey), buf); err != nil { + return err + } + } else { + b.nextSerial = binary.BigEndian.Uint64(val) + } + + // Load pubkey serial counter + val = bucket.Get([]byte(pubkeySerialCounterKey)) + if val == nil { + b.nextPubkeySeq = 1 + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, 1) + if err := bucket.Put([]byte(pubkeySerialCounterKey), buf); err != nil { + return err + } + } else { + b.nextPubkeySeq = binary.BigEndian.Uint64(val) + } + + return nil + }) +} + +// persistSerialCounters saves the current serial counters to disk +func (b *B) persistSerialCounters() error { + b.serialMu.Lock() + eventSerial := b.nextSerial + pubkeySerial := b.nextPubkeySeq + b.serialMu.Unlock() + + return b.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(bucketMeta) + if bucket == nil { + return nil + } + + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, eventSerial) + if err := bucket.Put([]byte(serialCounterKey), buf); chk.E(err) { + return err + } + + binary.BigEndian.PutUint64(buf, pubkeySerial) + if err := bucket.Put([]byte(pubkeySerialCounterKey), buf); chk.E(err) { + return err + } + + return nil + }) +} + +// getNextEventSerial returns the next event serial number (thread-safe) +func (b *B) getNextEventSerial() uint64 { + b.serialMu.Lock() + defer b.serialMu.Unlock() + + serial := b.nextSerial + b.nextSerial++ + + // Persist every 1000 to reduce disk writes + if b.nextSerial%1000 == 0 { + go func() { + if err := b.persistSerialCounters(); chk.E(err) { + b.Logger.Warningf("bbolt: failed to persist serial counters: %v", err) + } + }() + } + + return serial +} + +// getNextPubkeySerial returns the next pubkey serial number (thread-safe) +func (b *B) getNextPubkeySerial() uint64 { + b.serialMu.Lock() + defer b.serialMu.Unlock() + + serial := b.nextPubkeySeq + b.nextPubkeySeq++ + + // Persist every 1000 to reduce disk writes + if b.nextPubkeySeq%1000 == 0 { + go func() { + if err := b.persistSerialCounters(); chk.E(err) { + b.Logger.Warningf("bbolt: failed to persist serial counters: %v", err) + } + }() + } + + return serial +} + +// getOrCreatePubkeySerial gets or creates a serial for a pubkey +func (b *B) getOrCreatePubkeySerial(tx *bolt.Tx, pubkey []byte) (uint64, error) { + pksBucket := tx.Bucket(bucketPks) + spkBucket := tx.Bucket(bucketSpk) + + if pksBucket == nil || spkBucket == nil { + return 0, nil + } + + // Check if pubkey already has a serial + pubkeyHash := hashPubkey(pubkey) + val := pksBucket.Get(pubkeyHash) + if val != nil { + return decodeUint40(val), nil + } + + // Create new serial + serial := b.getNextPubkeySerial() + serialKey := makeSerialKey(serial) + + // Store pubkey_hash -> serial + serialBuf := make([]byte, 5) + encodeUint40(serial, serialBuf) + if err := pksBucket.Put(pubkeyHash, serialBuf); err != nil { + return 0, err + } + + // Store serial -> full pubkey + fullPubkey := make([]byte, 32) + copy(fullPubkey, pubkey) + if err := spkBucket.Put(serialKey, fullPubkey); err != nil { + return 0, err + } + + return serial, nil +} + +// getPubkeyBySerial retrieves the full 32-byte pubkey from a serial +func (b *B) getPubkeyBySerial(tx *bolt.Tx, serial uint64) ([]byte, error) { + spkBucket := tx.Bucket(bucketSpk) + if spkBucket == nil { + return nil, nil + } + + serialKey := makeSerialKey(serial) + return spkBucket.Get(serialKey), nil +} diff --git a/pkg/bbolt/stubs.go b/pkg/bbolt/stubs.go new file mode 100644 index 0000000..9e56503 --- /dev/null +++ b/pkg/bbolt/stubs.go @@ -0,0 +1,254 @@ +//go:build !(js && wasm) + +package bbolt + +import ( + "context" + "errors" + "io" + "time" + + "next.orly.dev/pkg/database" + "next.orly.dev/pkg/database/indexes/types" + "next.orly.dev/pkg/interfaces/store" + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/filter" +) + +// This file contains stub implementations for interface methods that will be +// implemented fully in later phases. It allows the code to compile while +// we implement the core functionality. + +var errNotImplemented = errors.New("bbolt: not implemented yet") + +// QueryEvents queries events matching a filter. +func (b *B) QueryEvents(c context.Context, f *filter.F) (evs event.S, err error) { + // Get serials matching filter + var serials types.Uint40s + if serials, err = b.GetSerialsFromFilter(f); err != nil { + return + } + + // Fetch events by serials + evs = make(event.S, 0, len(serials)) + for _, ser := range serials { + ev, e := b.FetchEventBySerial(ser) + if e == nil && ev != nil { + evs = append(evs, ev) + } + } + return +} + +// QueryAllVersions queries all versions of events matching a filter. +func (b *B) QueryAllVersions(c context.Context, f *filter.F) (evs event.S, err error) { + return b.QueryEvents(c, f) +} + +// QueryEventsWithOptions queries events with additional options. +func (b *B) QueryEventsWithOptions(c context.Context, f *filter.F, includeDeleteEvents bool, showAllVersions bool) (evs event.S, err error) { + return b.QueryEvents(c, f) +} + +// QueryDeleteEventsByTargetId queries delete events targeting a specific event. +func (b *B) QueryDeleteEventsByTargetId(c context.Context, targetEventId []byte) (evs event.S, err error) { + return nil, errNotImplemented +} + +// QueryForSerials queries and returns only the serials. +func (b *B) QueryForSerials(c context.Context, f *filter.F) (serials types.Uint40s, err error) { + return b.GetSerialsFromFilter(f) +} + +// QueryForIds queries and returns event ID/pubkey/timestamp tuples. +func (b *B) QueryForIds(c context.Context, f *filter.F) (idPkTs []*store.IdPkTs, err error) { + var serials types.Uint40s + if serials, err = b.GetSerialsFromFilter(f); err != nil { + return + } + return b.GetFullIdPubkeyBySerials(serials) +} + +// DeleteEvent deletes an event by ID. +func (b *B) DeleteEvent(c context.Context, eid []byte) error { + return errNotImplemented +} + +// DeleteEventBySerial deletes an event by serial. +func (b *B) DeleteEventBySerial(c context.Context, ser *types.Uint40, ev *event.E) error { + return errNotImplemented +} + +// DeleteExpired deletes expired events. +func (b *B) DeleteExpired() { + // TODO: Implement +} + +// ProcessDelete processes a deletion event. +func (b *B) ProcessDelete(ev *event.E, admins [][]byte) error { + return errNotImplemented +} + +// CheckForDeleted checks if an event has been deleted. +func (b *B) CheckForDeleted(ev *event.E, admins [][]byte) error { + return nil // Not deleted by default +} + +// Import imports events from a reader. +func (b *B) Import(rr io.Reader) { + // TODO: Implement +} + +// Export exports events to a writer. +func (b *B) Export(c context.Context, w io.Writer, pubkeys ...[]byte) { + // TODO: Implement +} + +// ImportEventsFromReader imports events from a JSONL reader. +func (b *B) ImportEventsFromReader(ctx context.Context, rr io.Reader) error { + return errNotImplemented +} + +// ImportEventsFromStrings imports events from JSON strings. +func (b *B) ImportEventsFromStrings(ctx context.Context, eventJSONs []string, policyManager interface { + CheckPolicy(action string, ev *event.E, pubkey []byte, remote string) (bool, error) +}) error { + return errNotImplemented +} + + + +// GetSubscription gets a user's subscription. +func (b *B) GetSubscription(pubkey []byte) (*database.Subscription, error) { + return nil, errNotImplemented +} + +// IsSubscriptionActive checks if a subscription is active. +func (b *B) IsSubscriptionActive(pubkey []byte) (bool, error) { + return false, errNotImplemented +} + +// ExtendSubscription extends a subscription. +func (b *B) ExtendSubscription(pubkey []byte, days int) error { + return errNotImplemented +} + +// RecordPayment records a payment. +func (b *B) RecordPayment(pubkey []byte, amount int64, invoice, preimage string) error { + return errNotImplemented +} + +// GetPaymentHistory gets payment history. +func (b *B) GetPaymentHistory(pubkey []byte) ([]database.Payment, error) { + return nil, errNotImplemented +} + +// ExtendBlossomSubscription extends a Blossom subscription. +func (b *B) ExtendBlossomSubscription(pubkey []byte, tier string, storageMB int64, daysExtended int) error { + return errNotImplemented +} + +// GetBlossomStorageQuota gets Blossom storage quota. +func (b *B) GetBlossomStorageQuota(pubkey []byte) (quotaMB int64, err error) { + return 0, errNotImplemented +} + +// IsFirstTimeUser checks if this is a first-time user. +func (b *B) IsFirstTimeUser(pubkey []byte) (bool, error) { + return true, nil +} + +// AddNIP43Member adds a NIP-43 member. +func (b *B) AddNIP43Member(pubkey []byte, inviteCode string) error { + return errNotImplemented +} + +// RemoveNIP43Member removes a NIP-43 member. +func (b *B) RemoveNIP43Member(pubkey []byte) error { + return errNotImplemented +} + +// IsNIP43Member checks if pubkey is a NIP-43 member. +func (b *B) IsNIP43Member(pubkey []byte) (isMember bool, err error) { + return false, errNotImplemented +} + +// GetNIP43Membership gets NIP-43 membership details. +func (b *B) GetNIP43Membership(pubkey []byte) (*database.NIP43Membership, error) { + return nil, errNotImplemented +} + +// GetAllNIP43Members gets all NIP-43 members. +func (b *B) GetAllNIP43Members() ([][]byte, error) { + return nil, errNotImplemented +} + +// StoreInviteCode stores an invite code. +func (b *B) StoreInviteCode(code string, expiresAt time.Time) error { + return errNotImplemented +} + +// ValidateInviteCode validates an invite code. +func (b *B) ValidateInviteCode(code string) (valid bool, err error) { + return false, errNotImplemented +} + +// DeleteInviteCode deletes an invite code. +func (b *B) DeleteInviteCode(code string) error { + return errNotImplemented +} + +// PublishNIP43MembershipEvent publishes a NIP-43 membership event. +func (b *B) PublishNIP43MembershipEvent(kind int, pubkey []byte) error { + return errNotImplemented +} + +// RunMigrations runs database migrations. +func (b *B) RunMigrations() { + // TODO: Implement if needed +} + +// GetCachedJSON gets cached JSON for a filter (stub - no caching in bbolt). +func (b *B) GetCachedJSON(f *filter.F) ([][]byte, bool) { + return nil, false +} + +// CacheMarshaledJSON caches JSON for a filter (stub - no caching in bbolt). +func (b *B) CacheMarshaledJSON(f *filter.F, marshaledJSON [][]byte) { + // No-op: BBolt doesn't use query cache to save RAM +} + +// GetCachedEvents gets cached events for a filter (stub - no caching in bbolt). +func (b *B) GetCachedEvents(f *filter.F) (event.S, bool) { + return nil, false +} + +// CacheEvents caches events for a filter (stub - no caching in bbolt). +func (b *B) CacheEvents(f *filter.F, events event.S) { + // No-op: BBolt doesn't use query cache to save RAM +} + +// InvalidateQueryCache invalidates the query cache (stub - no caching in bbolt). +func (b *B) InvalidateQueryCache() { + // No-op +} + +// RecordEventAccess records an event access. +func (b *B) RecordEventAccess(serial uint64, connectionID string) error { + return nil // TODO: Implement if needed +} + +// GetEventAccessInfo gets event access information. +func (b *B) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) { + return 0, 0, errNotImplemented +} + +// GetLeastAccessedEvents gets least accessed events. +func (b *B) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) { + return nil, errNotImplemented +} + +// EventIdsBySerial gets event IDs by serial range. +func (b *B) EventIdsBySerial(start uint64, count int) (evs []uint64, err error) { + return nil, errNotImplemented +} diff --git a/pkg/database/factory.go b/pkg/database/factory.go index 72577ad..25b68c1 100644 --- a/pkg/database/factory.go +++ b/pkg/database/factory.go @@ -41,6 +41,14 @@ type DatabaseConfig struct { Neo4jFetchSize int // ORLY_NEO4J_FETCH_SIZE - max records per fetch batch (default: 1000) Neo4jMaxTxRetrySeconds int // ORLY_NEO4J_MAX_TX_RETRY_SEC - max transaction retry time (default: 30) Neo4jQueryResultLimit int // ORLY_NEO4J_QUERY_RESULT_LIMIT - max results per query (default: 10000, 0=unlimited) + + // BBolt-specific settings (optimized for HDD) + BboltBatchMaxEvents int // ORLY_BBOLT_BATCH_MAX_EVENTS - max events per batch (default: 5000) + BboltBatchMaxBytes int64 // ORLY_BBOLT_BATCH_MAX_MB * 1024 * 1024 (default: 128MB) + BboltFlushTimeout time.Duration // ORLY_BBOLT_BATCH_FLUSH_SEC * time.Second (default: 30s) + BboltBloomSizeMB int // ORLY_BBOLT_BLOOM_SIZE_MB - bloom filter size (default: 16MB) + BboltNoSync bool // ORLY_BBOLT_NO_SYNC - disable fsync (DANGEROUS) + BboltMmapSize int // ORLY_BBOLT_MMAP_SIZE_GB * 1024 * 1024 * 1024 (default: 8GB) } // NewDatabase creates a database instance based on the specified type. @@ -84,8 +92,14 @@ func NewDatabaseWithConfig( return nil, fmt.Errorf("wasmdb database backend not available (import _ \"next.orly.dev/pkg/wasmdb\")") } return newWasmDBDatabase(ctx, cancel, cfg) + case "bbolt", "bolt": + // Use the bbolt implementation (B+tree, optimized for HDD) + if newBboltDatabase == nil { + return nil, fmt.Errorf("bbolt database backend not available (import _ \"next.orly.dev/pkg/bbolt\")") + } + return newBboltDatabase(ctx, cancel, cfg) default: - return nil, fmt.Errorf("unsupported database type: %s (supported: badger, neo4j, wasmdb)", dbType) + return nil, fmt.Errorf("unsupported database type: %s (supported: badger, neo4j, wasmdb, bbolt)", dbType) } } @@ -108,3 +122,13 @@ var newWasmDBDatabase func(context.Context, context.CancelFunc, *DatabaseConfig) func RegisterWasmDBFactory(factory func(context.Context, context.CancelFunc, *DatabaseConfig) (Database, error)) { newWasmDBDatabase = factory } + +// newBboltDatabase creates a bbolt database instance +// This is defined here to avoid import cycles +var newBboltDatabase func(context.Context, context.CancelFunc, *DatabaseConfig) (Database, error) + +// RegisterBboltFactory registers the bbolt database factory +// This is called from the bbolt package's init() function +func RegisterBboltFactory(factory func(context.Context, context.CancelFunc, *DatabaseConfig) (Database, error)) { + newBboltDatabase = factory +} diff --git a/pkg/version/version b/pkg/version/version index 1a765e8..4af8a6a 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.47.1 +v0.48.0