Browse Source

Add unified binary architecture with database health check (v0.55.4)

- Add unified cmd/orly/ binary with subcommand routing (db, acl, sync, launcher, relay)
- Implement database driver registry with --driver flag pattern
- Add database health check function (orly db health) to scan for integrity issues
- Add database repair function (orly db repair) to fix missing sei mappings
- Create ACL driver registry (follows, managed, curating modes)
- Create sync driver registry (negentropy, cluster, distributed, relaygroup)
- Optimize ACL follows mode with parallel goroutines and connection pooling
- Add unified binary build targets to Makefile

Files modified:
- Makefile: Add orly-unified build targets
- pkg/version/version: Bump to v0.55.4
- pkg/acl/follows.go: Optimize with goroutines and connection pooling
- cmd/orly/: New unified binary entry point and subcommands
- pkg/database/registry.go: Database driver registry
- pkg/database/health.go: Health check implementation
- pkg/database/repair.go: Repair implementation
- pkg/acl/registry.go: ACL driver registry
- pkg/sync/registry.go: Sync driver registry

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
main v0.55.4
woikos 4 months ago
parent
commit
92955824cc
No known key found for this signature in database
  1. 31
      Makefile
  2. 116
      cmd/orly/acl/acl.go
  3. 237
      cmd/orly/db/db.go
  4. 102
      cmd/orly/db/health.go
  5. 110
      cmd/orly/db/repair.go
  6. 58
      cmd/orly/launcher/launcher.go
  7. 136
      cmd/orly/main.go
  8. 67
      cmd/orly/relay/relay.go
  9. 114
      cmd/orly/sync/sync.go
  10. 114
      pkg/acl/follows.go
  11. 23
      pkg/acl/register_curating.go
  12. 23
      pkg/acl/register_follows.go
  13. 23
      pkg/acl/register_managed.go
  14. 120
      pkg/acl/registry.go
  15. 474
      pkg/database/health.go
  16. 18
      pkg/database/register_badger.go
  17. 93
      pkg/database/registry.go
  18. 613
      pkg/database/repair.go
  19. 21
      pkg/sync/register_cluster.go
  20. 21
      pkg/sync/register_distributed.go
  21. 30
      pkg/sync/register_negentropy.go
  22. 21
      pkg/sync/register_relaygroup.go
  23. 129
      pkg/sync/registry.go
  24. 2
      pkg/version/version

31
Makefile

@ -4,6 +4,7 @@
.PHONY: all-split arm64-split install-split .PHONY: all-split arm64-split install-split
.PHONY: orly-sync-negentropy all-sync arm64-sync .PHONY: orly-sync-negentropy all-sync arm64-sync
.PHONY: quick-deploy quick-deploy-restart deploy-both deploy-both-restart deploy-new list-releases rollback .PHONY: quick-deploy quick-deploy-restart deploy-both deploy-both-restart deploy-new list-releases rollback
.PHONY: orly-unified orly-unified-full arm64-unified install-unified
# Build flags # Build flags
CGO_ENABLED ?= 0 CGO_ENABLED ?= 0
@ -29,6 +30,9 @@ ORLY_ACL_FOLLOWS = $(BIN_DIR)/orly-acl-follows
ORLY_ACL_MANAGED = $(BIN_DIR)/orly-acl-managed ORLY_ACL_MANAGED = $(BIN_DIR)/orly-acl-managed
ORLY_ACL_CURATION = $(BIN_DIR)/orly-acl-curation ORLY_ACL_CURATION = $(BIN_DIR)/orly-acl-curation
# Unified binary (new architecture)
ORLY_UNIFIED = $(BIN_DIR)/orly-unified
# === Default Targets (Legacy) === # === Default Targets (Legacy) ===
# Default target: build everything (legacy monolithic) # Default target: build everything (legacy monolithic)
@ -89,6 +93,21 @@ orly-acl-curation:
orly-launcher: orly-launcher:
$(BUILD_FLAGS) go build -o $(ORLY_LAUNCHER) ./cmd/orly-launcher $(BUILD_FLAGS) go build -o $(ORLY_LAUNCHER) ./cmd/orly-launcher
# === Unified Binary (New Architecture) ===
# Unified binary with Badger driver (minimal, for most deployments)
orly-unified:
$(BUILD_FLAGS) go build -o $(ORLY_UNIFIED) ./cmd/orly
# Build unified binary for ARM64
arm64-unified:
$(MAKE) GOOS=linux GOARCH=arm64 orly-unified
# Install unified binary
install-unified: orly-unified
mkdir -p ~/.local/bin
cp $(ORLY_UNIFIED) ~/.local/bin/
# Generate protobuf code # Generate protobuf code
proto: proto:
cd proto && buf generate cd proto && buf generate
@ -123,6 +142,7 @@ clean:
rm -f $(ORLY_DB_BADGER) $(ORLY_DB_NEO4J) rm -f $(ORLY_DB_BADGER) $(ORLY_DB_NEO4J)
rm -f $(ORLY_ACL_FOLLOWS) $(ORLY_ACL_MANAGED) $(ORLY_ACL_CURATION) rm -f $(ORLY_ACL_FOLLOWS) $(ORLY_ACL_MANAGED) $(ORLY_ACL_CURATION)
rm -f $(ORLY_SYNC_NEGENTROPY) rm -f $(ORLY_SYNC_NEGENTROPY)
rm -f $(ORLY_UNIFIED)
rm -f orly-db-arm64 orly-acl-arm64 orly-launcher-arm64 next.orly.dev rm -f orly-db-arm64 orly-acl-arm64 orly-launcher-arm64 next.orly.dev
rm -rf build-arm64 rm -rf build-arm64
@ -246,6 +266,17 @@ help:
@echo " all-sync - Build all including sync services" @echo " all-sync - Build all including sync services"
@echo " arm64-sync - Cross-compile sync services for ARM64" @echo " arm64-sync - Cross-compile sync services for ARM64"
@echo "" @echo ""
@echo " Unified Binary (New Architecture):"
@echo " orly-unified - Build unified binary with subcommands"
@echo " arm64-unified - Cross-compile unified binary for ARM64"
@echo " install-unified - Install unified binary to ~/.local/bin/"
@echo ""
@echo " Unified Binary Usage:"
@echo " orly-unified db --driver=badger - Run Badger database server"
@echo " orly-unified db health - Run database health check"
@echo " orly-unified db repair --dry-run - Preview database repairs"
@echo " orly-unified acl --driver=follows - Run follows ACL server"
@echo ""
@echo " Quick Deployment (symlink-based):" @echo " Quick Deployment (symlink-based):"
@echo " quick-deploy - Build ARM64 and deploy to relay.orly.dev" @echo " quick-deploy - Build ARM64 and deploy to relay.orly.dev"
@echo " quick-deploy-restart - Build, deploy, and restart service" @echo " quick-deploy-restart - Build, deploy, and restart service"

116
cmd/orly/acl/acl.go

@ -0,0 +1,116 @@
//go:build !(js && wasm)
// Package acl implements the "orly acl" subcommand for ACL server operations.
package acl
import (
"fmt"
"os"
"strings"
"lol.mleku.dev/log"
"next.orly.dev/pkg/acl"
)
// Run executes the acl subcommand.
func Run(args []string) {
var driver string
var listDrivers bool
var showHelp bool
for i := 0; i < len(args); i++ {
arg := args[i]
if strings.HasPrefix(arg, "--driver=") {
driver = strings.TrimPrefix(arg, "--driver=")
} else if arg == "--driver" && i+1 < len(args) {
driver = args[i+1]
i++
} else if arg == "--list-drivers" || arg == "-l" {
listDrivers = true
} else if arg == "--help" || arg == "-h" {
showHelp = true
}
}
if showHelp {
printACLHelp()
return
}
if listDrivers {
drivers := acl.ListDriversWithInfo()
if len(drivers) == 0 {
fmt.Println("No ACL drivers available.")
fmt.Println("Build with appropriate tags to include drivers.")
return
}
fmt.Println("Available ACL drivers:")
for _, d := range drivers {
fmt.Printf(" %-10s - %s\n", d.Name, d.Description)
}
return
}
if driver == "" {
// Check if any driver is registered
drivers := acl.ListDrivers()
if len(drivers) == 0 {
fmt.Fprintln(os.Stderr, "error: no ACL drivers available")
os.Exit(1)
}
if len(drivers) == 1 {
// Use the only available driver
driver = drivers[0]
log.I.F("using default ACL driver: %s", driver)
} else {
fmt.Fprintln(os.Stderr, "error: --driver required (multiple drivers available)")
fmt.Fprintf(os.Stderr, "available: %s\n", strings.Join(drivers, ", "))
os.Exit(1)
}
}
// Check if driver is available
if !acl.HasDriver(driver) {
fmt.Fprintf(os.Stderr, "error: ACL driver %q not available\n", driver)
fmt.Fprintf(os.Stderr, "available: %s\n", strings.Join(acl.ListDrivers(), ", "))
os.Exit(1)
}
runACLServer(driver, args)
}
func runACLServer(driver string, args []string) {
log.I.F("ACL server with driver=%s not yet implemented via unified binary", driver)
log.I.F("Use the standalone binary: orly-acl-%s", driver)
os.Exit(1)
}
func printACLHelp() {
fmt.Println(`orly acl - ACL server operations
Usage:
orly acl --driver=NAME [options]
Options:
--driver=NAME Select ACL driver (follows, managed, curation)
--list-drivers List available ACL drivers
--help, -h Show this help message
Drivers:
follows Whitelist based on admin follow lists
managed NIP-86 fine-grained access control
curation Rate-limited trust tier system
Environment variables:
ORLY_ACL_LISTEN gRPC server listen address
ORLY_ACL_LOG_LEVEL Logging level
ORLY_ACL_DB_TYPE Database type (grpc or badger)
ORLY_ACL_GRPC_DB_SERVER gRPC database server address
ORLY_OWNERS Comma-separated owner npubs
ORLY_ADMINS Comma-separated admin npubs
Examples:
orly acl --driver=follows Run follows ACL server
orly acl --list-drivers List available drivers`)
}

237
cmd/orly/db/db.go

@ -0,0 +1,237 @@
//go:build !(js && wasm)
// Package db implements the "orly db" subcommand for database operations.
package db
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"go-simpler.org/env"
"lol.mleku.dev"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/server"
)
// Config holds the database server configuration.
type Config struct {
// Listen is the gRPC server listen address
Listen string `env:"ORLY_DB_LISTEN" default:"127.0.0.1:50051" usage:"gRPC server listen address"`
// DataDir is the database data directory
DataDir string `env:"ORLY_DATA_DIR" usage:"database data directory"`
// LogLevel is the logging level
LogLevel string `env:"ORLY_DB_LOG_LEVEL" default:"info" usage:"log level (trace, debug, info, warn, error)"`
// Badger configuration
BlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"1024" usage:"block cache size in MB"`
IndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"512" usage:"index cache size in MB"`
ZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"3" usage:"ZSTD compression level (1-19)"`
// Query cache configuration
QueryCacheSizeMB int `env:"ORLY_DB_QUERY_CACHE_SIZE_MB" default:"256" usage:"query cache size in MB"`
QueryCacheMaxAge time.Duration `env:"ORLY_DB_QUERY_CACHE_MAX_AGE" default:"5m" usage:"query cache max age"`
QueryCacheDisabled bool `env:"ORLY_DB_QUERY_CACHE_DISABLED" default:"false" usage:"disable query cache"`
// Serial cache configuration
SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"serial cache pubkeys capacity"`
SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"serial cache event IDs capacity"`
// gRPC server configuration
StreamBatchSize int `env:"ORLY_DB_STREAM_BATCH_SIZE" default:"100" usage:"events per stream batch"`
}
// Run executes the db subcommand.
func Run(args []string) {
var driver string
var listDrivers bool
var showHelp bool
// Parse args for subcommands and flags
for i := 0; i < len(args); i++ {
arg := args[i]
if strings.HasPrefix(arg, "--driver=") {
driver = strings.TrimPrefix(arg, "--driver=")
} else if arg == "--driver" && i+1 < len(args) {
driver = args[i+1]
i++
} else if arg == "--list-drivers" || arg == "-l" {
listDrivers = true
} else if arg == "--help" || arg == "-h" {
showHelp = true
} else if arg == "health" {
runHealth(args[i+1:])
return
} else if arg == "repair" {
runRepair(args[i+1:])
return
}
}
if showHelp {
printDBHelp()
return
}
if listDrivers {
drivers := database.ListDriversWithInfo()
if len(drivers) == 0 {
fmt.Println("No database drivers available.")
fmt.Println("Build with appropriate tags: -tags=badger or -tags=neo4j or -tags=all")
return
}
fmt.Println("Available database drivers:")
for _, d := range drivers {
fmt.Printf(" %-10s - %s\n", d.Name, d.Description)
}
return
}
if driver == "" {
// Check if any driver is registered
drivers := database.ListDrivers()
if len(drivers) == 0 {
fmt.Fprintln(os.Stderr, "error: no database drivers available")
fmt.Fprintln(os.Stderr, "build with: -tags=badger or -tags=neo4j")
os.Exit(1)
}
if len(drivers) == 1 {
// Use the only available driver
driver = drivers[0]
log.I.F("using default driver: %s", driver)
} else {
fmt.Fprintln(os.Stderr, "error: --driver required (multiple drivers available)")
fmt.Fprintf(os.Stderr, "available: %s\n", strings.Join(drivers, ", "))
os.Exit(1)
}
}
// Check if driver is available
if !database.HasDriver(driver) {
fmt.Fprintf(os.Stderr, "error: driver %q not available\n", driver)
fmt.Fprintf(os.Stderr, "available: %s\n", strings.Join(database.ListDrivers(), ", "))
os.Exit(1)
}
runServer(driver)
}
func runServer(driver string) {
cfg := loadConfig()
// Set log level
lol.SetLogLevel(cfg.LogLevel)
log.I.F("orly db --driver=%s starting with log level: %s", driver, cfg.LogLevel)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create database configuration
dbCfg := &database.DatabaseConfig{
DataDir: cfg.DataDir,
LogLevel: cfg.LogLevel,
BlockCacheMB: cfg.BlockCacheMB,
IndexCacheMB: cfg.IndexCacheMB,
QueryCacheSizeMB: cfg.QueryCacheSizeMB,
QueryCacheMaxAge: cfg.QueryCacheMaxAge,
QueryCacheDisabled: cfg.QueryCacheDisabled,
SerialCachePubkeys: cfg.SerialCachePubkeys,
SerialCacheEventIds: cfg.SerialCacheEventIds,
ZSTDLevel: cfg.ZSTDLevel,
}
// Initialize database using the driver registry
log.I.F("initializing %s database at %s", driver, cfg.DataDir)
db, err := database.NewFromDriver(ctx, cancel, driver, dbCfg)
if chk.E(err) {
log.E.F("failed to initialize database: %v", err)
os.Exit(1)
}
// Wait for database to be ready
log.I.F("waiting for database to be ready...")
<-db.Ready()
log.I.F("database ready")
// Create and start gRPC server
serverCfg := &server.Config{
Listen: cfg.Listen,
LogLevel: cfg.LogLevel,
StreamBatchSize: cfg.StreamBatchSize,
}
srv := server.New(db, serverCfg)
if err := srv.ListenAndServe(ctx, cancel); err != nil {
log.E.F("gRPC server error: %v", err)
}
}
func loadConfig() *Config {
cfg := &Config{}
if err := env.Load(cfg, nil); chk.E(err) {
log.E.F("failed to load config: %v", err)
os.Exit(1)
}
// Set default data directory if not specified
if cfg.DataDir == "" {
home, err := os.UserHomeDir()
if chk.E(err) {
log.E.F("failed to get home directory: %v", err)
os.Exit(1)
}
cfg.DataDir = filepath.Join(home, ".local", "share", "ORLY")
}
// Ensure data directory exists
if err := os.MkdirAll(cfg.DataDir, 0700); chk.E(err) {
log.E.F("failed to create data directory %s: %v", cfg.DataDir, err)
os.Exit(1)
}
return cfg
}
func printDBHelp() {
fmt.Println(`orly db - Database server operations
Usage:
orly db [options]
orly db health [options]
orly db repair [options]
Options:
--driver=NAME Select database driver (badger, neo4j)
--list-drivers List available database drivers
--help, -h Show this help message
Subcommands:
health Run database health check
repair Repair database issues
Environment variables:
ORLY_DATA_DIR Database data directory
ORLY_DB_LISTEN gRPC server listen address
ORLY_DB_LOG_LEVEL Logging level
ORLY_DB_BLOCK_CACHE_MB Block cache size in MB
ORLY_DB_INDEX_CACHE_MB Index cache size in MB
ORLY_DB_ZSTD_LEVEL ZSTD compression level (1-19)
ORLY_DB_QUERY_CACHE_SIZE_MB Query cache size in MB
ORLY_DB_QUERY_CACHE_MAX_AGE Query cache max age
ORLY_DB_QUERY_CACHE_DISABLED Disable query cache
Examples:
orly db --driver=badger Run Badger database server
orly db --list-drivers List available drivers
orly db health Check database health
orly db repair --dry-run Preview repairs without applying`)
}

102
cmd/orly/db/health.go

@ -0,0 +1,102 @@
//go:build !(js && wasm)
package db
import (
"context"
"fmt"
"os"
"lol.mleku.dev"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
)
func runHealth(args []string) {
var showHelp bool
for _, arg := range args {
if arg == "--help" || arg == "-h" {
showHelp = true
}
}
if showHelp {
printHealthHelp()
return
}
cfg := loadConfig()
lol.SetLogLevel(cfg.LogLevel)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create database configuration
dbCfg := &database.DatabaseConfig{
DataDir: cfg.DataDir,
LogLevel: cfg.LogLevel,
BlockCacheMB: cfg.BlockCacheMB,
IndexCacheMB: cfg.IndexCacheMB,
QueryCacheSizeMB: cfg.QueryCacheSizeMB,
QueryCacheMaxAge: cfg.QueryCacheMaxAge,
QueryCacheDisabled: cfg.QueryCacheDisabled,
SerialCachePubkeys: cfg.SerialCachePubkeys,
SerialCacheEventIds: cfg.SerialCacheEventIds,
ZSTDLevel: cfg.ZSTDLevel,
}
// Initialize database directly (health check is Badger-specific)
log.I.F("initializing Badger database at %s for health check", cfg.DataDir)
db, err := database.NewWithConfig(ctx, cancel, dbCfg)
if chk.E(err) {
log.E.F("failed to initialize database: %v", err)
os.Exit(1)
}
defer db.Close()
// Wait for database to be ready
<-db.Ready()
// Run health check
report, err := db.HealthCheck(os.Stdout)
if err != nil {
log.E.F("health check failed: %v", err)
os.Exit(1)
}
fmt.Println()
fmt.Println(report.String())
// Exit with non-zero if health score is critical
if report.HealthScore < 50 {
os.Exit(2)
}
}
func printHealthHelp() {
fmt.Println(`orly db health - Database health check
Usage:
orly db health [options]
Options:
--help, -h Show this help message
Environment variables:
ORLY_DATA_DIR Database data directory
ORLY_DB_LOG_LEVEL Logging level
The health check scans the database for integrity issues:
- Missing serial->eventID mappings (sei)
- Orphaned serial->eventID mappings
- Pubkey serial inconsistencies
- Orphaned index entries
Exit codes:
0 - Database is healthy (score >= 50)
1 - Error running health check
2 - Database has critical issues (score < 50)`)
}

110
cmd/orly/db/repair.go

@ -0,0 +1,110 @@
//go:build !(js && wasm)
package db
import (
"context"
"fmt"
"os"
"lol.mleku.dev"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database"
)
func runRepair(args []string) {
var dryRun bool
var showHelp bool
for _, arg := range args {
if arg == "--dry-run" || arg == "-n" {
dryRun = true
} else if arg == "--help" || arg == "-h" {
showHelp = true
}
}
if showHelp {
printRepairHelp()
return
}
cfg := loadConfig()
lol.SetLogLevel(cfg.LogLevel)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create database configuration
dbCfg := &database.DatabaseConfig{
DataDir: cfg.DataDir,
LogLevel: cfg.LogLevel,
BlockCacheMB: cfg.BlockCacheMB,
IndexCacheMB: cfg.IndexCacheMB,
QueryCacheSizeMB: cfg.QueryCacheSizeMB,
QueryCacheMaxAge: cfg.QueryCacheMaxAge,
QueryCacheDisabled: cfg.QueryCacheDisabled,
SerialCachePubkeys: cfg.SerialCachePubkeys,
SerialCacheEventIds: cfg.SerialCacheEventIds,
ZSTDLevel: cfg.ZSTDLevel,
}
// Initialize database directly (repair is Badger-specific)
log.I.F("initializing Badger database at %s for repair", cfg.DataDir)
db, err := database.NewWithConfig(ctx, cancel, dbCfg)
if chk.E(err) {
log.E.F("failed to initialize database: %v", err)
os.Exit(1)
}
defer db.Close()
// Wait for database to be ready
<-db.Ready()
// Run repair
opts := &database.RepairOptions{
DryRun: dryRun,
FixMissingSei: true,
RemoveOrphanedSei: true,
FixPubkeyMappings: true,
Progress: os.Stdout,
}
report, err := db.Repair(ctx, opts)
if err != nil {
log.E.F("repair failed: %v", err)
os.Exit(1)
}
fmt.Println()
fmt.Println(report.String())
if len(report.Errors) > 0 {
os.Exit(1)
}
}
func printRepairHelp() {
fmt.Println(`orly db repair - Database repair
Usage:
orly db repair [options]
Options:
--dry-run, -n Preview repairs without making changes
--help, -h Show this help message
Environment variables:
ORLY_DATA_DIR Database data directory
ORLY_DB_LOG_LEVEL Logging level
The repair operation fixes integrity issues found by health check:
- Rebuilds missing serial->eventID mappings from compact events
- Removes orphaned serial->eventID mappings
- Reports pubkey serial inconsistencies
WARNING: Always backup your database before running repair.
Use --dry-run to preview changes first.`)
}

58
cmd/orly/launcher/launcher.go

@ -0,0 +1,58 @@
//go:build !(js && wasm)
// Package launcher implements the "orly launcher" subcommand for process supervision.
package launcher
import (
"fmt"
"os"
"lol.mleku.dev/log"
)
// Run executes the launcher subcommand.
func Run(args []string) {
var showHelp bool
for _, arg := range args {
if arg == "--help" || arg == "-h" {
showHelp = true
}
}
if showHelp {
printLauncherHelp()
return
}
// For now, redirect to standalone binary
log.I.F("Launcher not yet implemented via unified binary")
log.I.F("Use the standalone binary: orly-launcher")
os.Exit(1)
}
func printLauncherHelp() {
fmt.Println(`orly launcher - Process supervisor
Usage:
orly launcher [options]
Options:
--help, -h Show this help message
The launcher supervises the split-mode deployment:
- Starts and monitors orly-db (database server)
- Starts and monitors orly-acl (ACL server)
- Starts and monitors orly (main relay)
- Handles graceful shutdown and restart
Environment variables:
ORLY_LAUNCHER_DB_ENABLED Enable database subprocess
ORLY_LAUNCHER_DB_LISTEN Database server listen address
ORLY_LAUNCHER_ACL_ENABLED Enable ACL subprocess
ORLY_LAUNCHER_ACL_LISTEN ACL server listen address
ORLY_LAUNCHER_ACL_MODE ACL mode (follows, managed, curation)
Example:
orly launcher Start process supervisor`)
}

136
cmd/orly/main.go

@ -0,0 +1,136 @@
// orly is a unified binary for the ORLY Nostr relay system.
// It provides subcommands for running database servers, ACL servers,
// sync services, and the main relay.
//
// Usage:
//
// orly [command] [options]
//
// Commands:
//
// db - Database server (requires --driver flag)
// acl - ACL server (requires --driver flag)
// sync - Sync service (requires --driver flag)
// launcher - Process supervisor
// relay - Main relay (default if no command specified)
// version - Show version information
// help - Show help
//
// Examples:
//
// orly # Run the main relay (default)
// orly relay # Run the main relay explicitly
// orly db --driver=badger # Run Badger database server
// orly db --list-drivers # List available database drivers
// orly db health # Run database health check
// orly db repair # Repair database issues
// orly db repair --dry-run # Preview repairs without applying
// orly acl --driver=follows # Run follows ACL server
// orly sync --driver=negentropy # Run negentropy sync service
// orly launcher # Run process supervisor
package main
import (
"fmt"
"os"
"next.orly.dev/cmd/orly/acl"
"next.orly.dev/cmd/orly/db"
"next.orly.dev/cmd/orly/launcher"
"next.orly.dev/cmd/orly/relay"
"next.orly.dev/cmd/orly/sync"
)
// Version information (set by build flags)
var (
Version = "dev"
Commit = "unknown"
BuildDate = "unknown"
)
func main() {
if len(os.Args) < 2 {
// Default: run the relay
relay.Run(os.Args[1:])
return
}
switch os.Args[1] {
case "db":
db.Run(os.Args[2:])
case "acl":
acl.Run(os.Args[2:])
case "sync":
sync.Run(os.Args[2:])
case "launcher":
launcher.Run(os.Args[2:])
case "relay":
relay.Run(os.Args[2:])
case "version", "-v", "--version":
printVersion()
case "help", "-h", "--help":
printHelp()
default:
// Check if it's a flag (starts with -)
if os.Args[1][0] == '-' {
// Treat as relay arguments
relay.Run(os.Args[1:])
} else {
fmt.Fprintf(os.Stderr, "unknown command: %s\n\n", os.Args[1])
printHelp()
os.Exit(1)
}
}
}
func printVersion() {
fmt.Printf("orly %s\n", Version)
fmt.Printf(" commit: %s\n", Commit)
fmt.Printf(" built: %s\n", BuildDate)
}
func printHelp() {
fmt.Println(`orly - ORLY Nostr Relay System
Usage:
orly [command] [options]
Commands:
db Database server operations
--driver=NAME Select database driver (badger, neo4j)
--list-drivers List available database drivers
health Run database health check
repair Repair database issues (--dry-run for preview)
acl ACL server operations
--driver=NAME Select ACL driver (follows, managed, curation)
--list-drivers List available ACL drivers
sync Sync service operations
--driver=NAME Select sync driver (negentropy, cluster, distributed)
--list-drivers List available sync drivers
launcher Process supervisor for split-mode deployment
relay Main relay server (default if no command given)
version Show version information
help Show this help message
Examples:
orly Run the main relay (default)
orly db --driver=badger Run Badger database server
orly db health Check database health
orly db repair --dry-run Preview database repairs
orly acl --driver=follows Run follows ACL server
orly sync --driver=negentropy Run negentropy sync service
orly launcher Run process supervisor
Environment variables:
ORLY_DATA_DIR Database data directory
ORLY_DB_LISTEN Database server listen address
ORLY_ACL_LISTEN ACL server listen address
ORLY_LOG_LEVEL Logging level (trace, debug, info, warn, error)
See 'orly [command] --help' for command-specific options.`)
}

67
cmd/orly/relay/relay.go

@ -0,0 +1,67 @@
//go:build !(js && wasm)
// Package relay implements the "orly relay" subcommand (the default command).
package relay
import (
"fmt"
"os"
"lol.mleku.dev/log"
)
// Run executes the relay subcommand.
func Run(args []string) {
var showHelp bool
for _, arg := range args {
if arg == "--help" || arg == "-h" {
showHelp = true
}
}
if showHelp {
printRelayHelp()
return
}
// For now, redirect to the main binary
// In the future, this will contain the relay logic directly
log.I.F("Relay not yet implemented via unified binary subcommand")
log.I.F("Use the main binary: orly (in project root)")
os.Exit(1)
}
func printRelayHelp() {
fmt.Println(`orly relay - Main Nostr relay server
Usage:
orly relay [options]
orly [options] (relay is the default command)
Options:
--help, -h Show this help message
The relay is the main Nostr server that:
- Accepts WebSocket connections from clients
- Processes EVENT, REQ, and other Nostr messages
- Stores events in the database (direct or via gRPC)
- Enforces ACL policies (direct or via gRPC)
Environment variables:
ORLY_PORT Server port (default: 3334)
ORLY_LOG_LEVEL Logging level
ORLY_DB_TYPE Database type (badger, neo4j, grpc)
ORLY_ACL_MODE ACL mode (none, follows, managed)
ORLY_TLS_DOMAINS Let's Encrypt domains
ORLY_AUTH_TO_WRITE Require auth for writes
For split-mode deployment:
ORLY_DB_TYPE=grpc Use gRPC database server
ORLY_DB_GRPC_SERVER gRPC database server address
ORLY_ACL_GRPC_SERVER gRPC ACL server address
Example:
orly Start the relay (default mode)
orly relay Start the relay (explicit)`)
}

114
cmd/orly/sync/sync.go

@ -0,0 +1,114 @@
//go:build !(js && wasm)
// Package sync implements the "orly sync" subcommand for sync service operations.
package sync
import (
"fmt"
"os"
"strings"
"lol.mleku.dev/log"
pkgsync "next.orly.dev/pkg/sync"
)
// Run executes the sync subcommand.
func Run(args []string) {
var driver string
var listDrivers bool
var showHelp bool
for i := 0; i < len(args); i++ {
arg := args[i]
if strings.HasPrefix(arg, "--driver=") {
driver = strings.TrimPrefix(arg, "--driver=")
} else if arg == "--driver" && i+1 < len(args) {
driver = args[i+1]
i++
} else if arg == "--list-drivers" || arg == "-l" {
listDrivers = true
} else if arg == "--help" || arg == "-h" {
showHelp = true
}
}
if showHelp {
printSyncHelp()
return
}
if listDrivers {
drivers := pkgsync.ListDriversWithInfo()
if len(drivers) == 0 {
fmt.Println("No sync drivers available.")
fmt.Println("Build with appropriate tags to include drivers.")
return
}
fmt.Println("Available sync drivers:")
for _, d := range drivers {
fmt.Printf(" %-12s - %s\n", d.Name, d.Description)
}
return
}
if driver == "" {
// Check if any driver is registered
drivers := pkgsync.ListDrivers()
if len(drivers) == 0 {
fmt.Fprintln(os.Stderr, "error: no sync drivers available")
os.Exit(1)
}
if len(drivers) == 1 {
// Use the only available driver
driver = drivers[0]
log.I.F("using default sync driver: %s", driver)
} else {
fmt.Fprintln(os.Stderr, "error: --driver required (multiple drivers available)")
fmt.Fprintf(os.Stderr, "available: %s\n", strings.Join(drivers, ", "))
os.Exit(1)
}
}
// Check if driver is available
if !pkgsync.HasDriver(driver) {
fmt.Fprintf(os.Stderr, "error: sync driver %q not available\n", driver)
fmt.Fprintf(os.Stderr, "available: %s\n", strings.Join(pkgsync.ListDrivers(), ", "))
os.Exit(1)
}
runSyncService(driver, args)
}
func runSyncService(driver string, args []string) {
log.I.F("Sync service with driver=%s not yet implemented via unified binary", driver)
log.I.F("Use the standalone binary: orly-sync-%s", driver)
os.Exit(1)
}
func printSyncHelp() {
fmt.Println(`orly sync - Sync service operations
Usage:
orly sync --driver=NAME [options]
Options:
--driver=NAME Select sync driver (negentropy, cluster, distributed)
--list-drivers List available sync drivers
--help, -h Show this help message
Drivers:
negentropy NIP-77 negentropy set reconciliation
cluster Cluster-based synchronization
distributed Distributed synchronization
Environment variables:
ORLY_SYNC_LISTEN gRPC server listen address
ORLY_SYNC_LOG_LEVEL Logging level
ORLY_SYNC_DB_TYPE Database type (grpc or badger)
ORLY_SYNC_TARGET_RELAYS Comma-separated target relay URLs
Examples:
orly sync --driver=negentropy Run negentropy sync service
orly sync --list-drivers List available drivers`)
}

114
pkg/acl/follows.go

@ -1,7 +1,6 @@
package acl package acl
import ( import (
"bytes"
"context" "context"
"encoding/hex" "encoding/hex"
"net/http" "net/http"
@ -27,7 +26,6 @@ import (
"git.mleku.dev/mleku/nostr/encoders/kind" "git.mleku.dev/mleku/nostr/encoders/kind"
"git.mleku.dev/mleku/nostr/encoders/tag" "git.mleku.dev/mleku/nostr/encoders/tag"
"next.orly.dev/pkg/protocol/publish" "next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/utils"
"git.mleku.dev/mleku/nostr/utils/normalize" "git.mleku.dev/mleku/nostr/utils/normalize"
"git.mleku.dev/mleku/nostr/utils/values" "git.mleku.dev/mleku/nostr/utils/values"
) )
@ -41,6 +39,10 @@ type Follows struct {
admins [][]byte admins [][]byte
owners [][]byte owners [][]byte
follows [][]byte follows [][]byte
// Map-based caches for O(1) lookups (hex pubkey -> struct{})
ownersSet map[string]struct{}
adminsSet map[string]struct{}
followsSet map[string]struct{}
// Track last follow list fetch time // Track last follow list fetch time
lastFollowListFetch time.Time lastFollowListFetch time.Time
// Callback for external notification of follow list changes // Callback for external notification of follow list changes
@ -73,6 +75,16 @@ func (f *Follows) Configure(cfg ...any) (err error) {
err = errorf.E("both config and database must be set") err = errorf.E("both config and database must be set")
return return
} }
// Build all lists in local variables WITHOUT holding the lock
// This prevents blocking GetAccessLevel calls during slow database I/O
var newOwners [][]byte
newOwnersSet := make(map[string]struct{})
var newAdmins [][]byte
newAdminsSet := make(map[string]struct{})
var newFollows [][]byte
newFollowsSet := make(map[string]struct{})
// add owners list // add owners list
for _, owner := range f.cfg.Owners { for _, owner := range f.cfg.Owners {
var own []byte var own []byte
@ -81,23 +93,21 @@ func (f *Follows) Configure(cfg ...any) (err error) {
} else { } else {
own = o own = o
} }
f.owners = append(f.owners, own) newOwners = append(newOwners, own)
newOwnersSet[hex.EncodeToString(own)] = struct{}{}
} }
// find admin follow lists
f.followsMx.Lock() // find admin follow lists (database I/O happens here, but no lock held)
defer f.followsMx.Unlock()
// log.I.F("finding admins")
f.follows, f.admins = nil, nil
for _, admin := range f.cfg.Admins { for _, admin := range f.cfg.Admins {
// log.I.F("%s", admin)
var adm []byte var adm []byte
if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) { if a, e := bech32encoding.NpubOrHexToPublicKeyBinary(admin); chk.E(e) {
continue continue
} else { } else {
adm = a adm = a
} }
// log.I.F("admin: %0x", adm) newAdmins = append(newAdmins, adm)
f.admins = append(f.admins, adm) newAdminsSet[hex.EncodeToString(adm)] = struct{}{}
fl := &filter.F{ fl := &filter.F{
Authors: tag.NewFromAny(adm), Authors: tag.NewFromAny(adm),
Kinds: kind.NewS(kind.New(kind.FollowList.K)), Kinds: kind.NewS(kind.New(kind.FollowList.K)),
@ -120,20 +130,35 @@ func (f *Follows) Configure(cfg ...any) (err error) {
if ev, err = f.db.FetchEventBySerial(s); chk.E(err) { if ev, err = f.db.FetchEventBySerial(s); chk.E(err) {
continue continue
} }
// log.I.F("admin follow list:\n%s", ev.Serialize())
for _, v := range ev.Tags.GetAll([]byte("p")) { for _, v := range ev.Tags.GetAll([]byte("p")) {
// log.I.F("adding follow: %s", v.ValueHex())
// ValueHex() automatically handles both binary and hex storage formats // ValueHex() automatically handles both binary and hex storage formats
if b, e := hex.DecodeString(string(v.ValueHex())); chk.E(e) { if b, e := hex.DecodeString(string(v.ValueHex())); chk.E(e) {
continue continue
} else { } else {
f.follows = append(f.follows, b) hexKey := hex.EncodeToString(b)
if _, exists := newFollowsSet[hexKey]; !exists {
newFollows = append(newFollows, b)
newFollowsSet[hexKey] = struct{}{}
}
} }
} }
} }
} }
} }
// Now acquire the lock ONLY for the quick swap operation
f.followsMx.Lock()
f.owners = newOwners
f.ownersSet = newOwnersSet
f.admins = newAdmins
f.adminsSet = newAdminsSet
f.follows = newFollows
f.followsSet = newFollowsSet
f.followsMx.Unlock()
log.I.F("follows ACL configured: %d owners, %d admins, %d follows",
len(newOwners), len(newAdmins), len(newFollows))
// Initialize progressive throttle if enabled // Initialize progressive throttle if enabled
if f.cfg.FollowsThrottleEnabled { if f.cfg.FollowsThrottleEnabled {
perEvent := f.cfg.FollowsThrottlePerEvent perEvent := f.cfg.FollowsThrottlePerEvent
@ -153,23 +178,28 @@ func (f *Follows) Configure(cfg ...any) (err error) {
} }
func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) { func (f *Follows) GetAccessLevel(pub []byte, address string) (level string) {
pubHex := hex.EncodeToString(pub)
f.followsMx.RLock() f.followsMx.RLock()
defer f.followsMx.RUnlock() defer f.followsMx.RUnlock()
for _, v := range f.owners {
if utils.FastEqual(v, pub) { // O(1) map lookups instead of O(n) linear scans
if f.ownersSet != nil {
if _, ok := f.ownersSet[pubHex]; ok {
return "owner" return "owner"
} }
} }
for _, v := range f.admins { if f.adminsSet != nil {
if utils.FastEqual(v, pub) { if _, ok := f.adminsSet[pubHex]; ok {
return "admin" return "admin"
} }
} }
for _, v := range f.follows { if f.followsSet != nil {
if utils.FastEqual(v, pub) { if _, ok := f.followsSet[pubHex]; ok {
return "write" return "write"
} }
} }
if f.cfg == nil { if f.cfg == nil {
return "write" return "write"
} }
@ -194,31 +224,32 @@ func (f *Follows) GetThrottleDelay(pubkey []byte, ip string) time.Duration {
return 0 return 0
} }
// Check if user is exempt from throttling pubkeyHex := hex.EncodeToString(pubkey)
// Check if user is exempt from throttling using O(1) map lookups
f.followsMx.RLock() f.followsMx.RLock()
defer f.followsMx.RUnlock() defer f.followsMx.RUnlock()
// Owners bypass throttle // Owners bypass throttle
for _, v := range f.owners { if f.ownersSet != nil {
if utils.FastEqual(v, pubkey) { if _, ok := f.ownersSet[pubkeyHex]; ok {
return 0 return 0
} }
} }
// Admins bypass throttle // Admins bypass throttle
for _, v := range f.admins { if f.adminsSet != nil {
if utils.FastEqual(v, pubkey) { if _, ok := f.adminsSet[pubkeyHex]; ok {
return 0 return 0
} }
} }
// Followed users bypass throttle // Followed users bypass throttle
for _, v := range f.follows { if f.followsSet != nil {
if utils.FastEqual(v, pubkey) { if _, ok := f.followsSet[pubkeyHex]; ok {
return 0 return 0
} }
} }
// Non-followed users get throttled // Non-followed users get throttled
pubkeyHex := hex.EncodeToString(pubkey)
return f.throttle.GetDelay(ip, pubkeyHex) return f.throttle.GetDelay(ip, pubkeyHex)
} }
@ -723,13 +754,14 @@ func (f *Follows) GetFollowedPubkeys() [][]byte {
// isAdminPubkey checks if a pubkey belongs to an admin // isAdminPubkey checks if a pubkey belongs to an admin
func (f *Follows) isAdminPubkey(pubkey []byte) bool { func (f *Follows) isAdminPubkey(pubkey []byte) bool {
pubkeyHex := hex.EncodeToString(pubkey)
f.followsMx.RLock() f.followsMx.RLock()
defer f.followsMx.RUnlock() defer f.followsMx.RUnlock()
for _, admin := range f.admins { if f.adminsSet != nil {
if utils.FastEqual(admin, pubkey) { _, ok := f.adminsSet[pubkeyHex]
return true return ok
}
} }
return false return false
} }
@ -773,19 +805,27 @@ func (f *Follows) AddFollow(pub []byte) {
if len(pub) == 0 { if len(pub) == 0 {
return return
} }
pubHex := hex.EncodeToString(pub)
f.followsMx.Lock() f.followsMx.Lock()
defer f.followsMx.Unlock() defer f.followsMx.Unlock()
for _, p := range f.follows {
if bytes.Equal(p, pub) { // Use map for O(1) duplicate detection
return if f.followsSet == nil {
} f.followsSet = make(map[string]struct{})
}
if _, exists := f.followsSet[pubHex]; exists {
return
} }
b := make([]byte, len(pub)) b := make([]byte, len(pub))
copy(b, pub) copy(b, pub)
f.follows = append(f.follows, b) f.follows = append(f.follows, b)
f.followsSet[pubHex] = struct{}{}
log.I.F( log.I.F(
"follows syncer: added new followed pubkey: %s", "follows syncer: added new followed pubkey: %s",
hex.EncodeToString(pub), pubHex,
) )
// notify external listeners (e.g., spider) // notify external listeners (e.g., spider)
if f.onFollowListUpdate != nil { if f.onFollowListUpdate != nil {

23
pkg/acl/register_curating.go

@ -0,0 +1,23 @@
//go:build !(js && wasm)
package acl
import (
"context"
"next.orly.dev/pkg/database"
)
func init() {
// Register the Curating driver with the driver registry.
RegisterDriver("curating", "Rate-limited trust tier system", curatingFactory)
}
// curatingFactory creates a new Curating ACL instance.
func curatingFactory(ctx context.Context, db database.Database, cfg *DriverConfig) (I, error) {
// Create a new Curating instance
c := new(Curating)
// The Curating ACL will be configured via the Configure method
// which is called by the ACL server after creation.
return c, nil
}

23
pkg/acl/register_follows.go

@ -0,0 +1,23 @@
//go:build !(js && wasm)
package acl
import (
"context"
"next.orly.dev/pkg/database"
)
func init() {
// Register the Follows driver with the driver registry.
RegisterDriver("follows", "Whitelist based on admin follow lists", followsFactory)
}
// followsFactory creates a new Follows ACL instance.
func followsFactory(ctx context.Context, db database.Database, cfg *DriverConfig) (I, error) {
// Create a new Follows instance
f := new(Follows)
// The Follows ACL will be configured via the Configure method
// which is called by the ACL server after creation.
return f, nil
}

23
pkg/acl/register_managed.go

@ -0,0 +1,23 @@
//go:build !(js && wasm)
package acl
import (
"context"
"next.orly.dev/pkg/database"
)
func init() {
// Register the Managed driver with the driver registry.
RegisterDriver("managed", "NIP-86 fine-grained access control", managedFactory)
}
// managedFactory creates a new Managed ACL instance.
func managedFactory(ctx context.Context, db database.Database, cfg *DriverConfig) (I, error) {
// Create a new Managed instance
m := new(Managed)
// The Managed ACL will be configured via the Configure method
// which is called by the ACL server after creation.
return m, nil
}

120
pkg/acl/registry.go

@ -0,0 +1,120 @@
//go:build !(js && wasm)
package acl
import (
"context"
"sort"
"sync"
"lol.mleku.dev/errorf"
"next.orly.dev/pkg/database"
)
// DriverFactory is the signature for ACL driver factory functions.
type DriverFactory func(ctx context.Context, db database.Database, cfg *DriverConfig) (I, error)
// DriverConfig holds configuration for ACL drivers.
type DriverConfig struct {
// Common settings
LogLevel string
Owners []string
Admins []string
BootstrapRelays []string
RelayAddresses []string
// Follows-specific settings
FollowListFrequency string
FollowsThrottleEnabled bool
FollowsThrottlePerEvent string
FollowsThrottleMaxDelay string
}
// DriverInfo contains metadata about a registered ACL driver.
type DriverInfo struct {
Name string
Description string
Factory DriverFactory
}
// I is the ACL interface that drivers must implement.
// This is re-exported from the interfaces package for convenience.
type I interface {
Configure(cfg ...any) (err error)
GetAccessLevel(pub []byte, address string) (level string)
GetACLInfo() (name, description, documentation string)
Syncer()
Type() string
}
var (
driversMu sync.RWMutex
drivers = make(map[string]*DriverInfo)
)
// RegisterDriver registers an ACL driver with the given name and factory.
// This is typically called from init() in the driver package.
func RegisterDriver(name, description string, factory DriverFactory) {
driversMu.Lock()
defer driversMu.Unlock()
drivers[name] = &DriverInfo{
Name: name,
Description: description,
Factory: factory,
}
}
// GetDriver returns the factory for the named driver, or nil if not found.
func GetDriver(name string) DriverFactory {
driversMu.RLock()
defer driversMu.RUnlock()
if info, ok := drivers[name]; ok {
return info.Factory
}
return nil
}
// HasDriver returns true if the named driver is registered.
func HasDriver(name string) bool {
driversMu.RLock()
defer driversMu.RUnlock()
_, ok := drivers[name]
return ok
}
// ListDrivers returns a sorted list of registered driver names.
func ListDrivers() []string {
driversMu.RLock()
defer driversMu.RUnlock()
names := make([]string, 0, len(drivers))
for name := range drivers {
names = append(names, name)
}
sort.Strings(names)
return names
}
// ListDriversWithInfo returns information about all registered drivers.
func ListDriversWithInfo() []*DriverInfo {
driversMu.RLock()
defer driversMu.RUnlock()
infos := make([]*DriverInfo, 0, len(drivers))
for _, info := range drivers {
infos = append(infos, info)
}
// Sort by name for consistent output
sort.Slice(infos, func(i, j int) bool {
return infos[i].Name < infos[j].Name
})
return infos
}
// NewFromDriver creates an ACL using the named driver.
// Returns an error if the driver is not registered.
func NewFromDriver(ctx context.Context, driverName string, db database.Database, cfg *DriverConfig) (I, error) {
factory := GetDriver(driverName)
if factory == nil {
return nil, errorf.E("ACL driver %q not available; registered: %v", driverName, ListDrivers())
}
return factory(ctx, db, cfg)
}

474
pkg/database/health.go

@ -0,0 +1,474 @@
//go:build !(js && wasm)
package database
import (
"bytes"
"fmt"
"io"
"time"
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes"
"next.orly.dev/pkg/database/indexes/types"
)
// HealthReport contains the results of a database health check.
type HealthReport struct {
// Scan metadata
ScanStarted time.Time
ScanDuration time.Duration
// Event counts
CompactEvents int64 // Events stored in compact format (cmp)
LegacyEvents int64 // Events in legacy format (evt)
SmallEvents int64 // Small inline events (sev)
TotalEvents int64 // Total events
SerialIdCount int64 // Serial to EventID mappings (sei)
// Pubkey serial counts
PubkeySerials int64 // pks entries (pubkey hash -> serial)
SerialPubkeys int64 // spk entries (serial -> pubkey)
// Graph edge counts
EventPubkeyEdges int64 // epg entries
PubkeyEventEdges int64 // peg entries
EventEventEdges int64 // eeg entries
GraphEventEdges int64 // gee entries
// Index counts
KindIndexes int64 // kc- entries
PubkeyIndexes int64 // pc- entries
TagIndexes int64 // tc- entries
WordIndexes int64 // wrd entries
IdIndexes int64 // eid entries
// Issues found
MissingSerialEventIds int64 // cmp entries without corresponding sei
OrphanedSerialEventIds int64 // sei entries without corresponding cmp
PubkeySerialMismatches int64 // pks without matching spk or vice versa
OrphanedIndexes int64 // Index entries pointing to non-existent events
// Sample of missing sei serials (for debugging)
MissingSeiSamples []uint64
// Health score (0-100)
HealthScore int
}
// String returns a human-readable health report.
func (r *HealthReport) String() string {
var buf bytes.Buffer
fmt.Fprintln(&buf, "Database Health Report")
fmt.Fprintln(&buf, "======================")
fmt.Fprintf(&buf, "Scan duration: %v\n\n", r.ScanDuration)
fmt.Fprintln(&buf, "Event Storage:")
fmt.Fprintf(&buf, " Compact events (cmp): %d\n", r.CompactEvents)
fmt.Fprintf(&buf, " Legacy events (evt): %d\n", r.LegacyEvents)
fmt.Fprintf(&buf, " Small events (sev): %d\n", r.SmallEvents)
fmt.Fprintf(&buf, " Total events: %d\n", r.TotalEvents)
fmt.Fprintf(&buf, " Serial->ID maps (sei): %d\n\n", r.SerialIdCount)
fmt.Fprintln(&buf, "Pubkey Mappings:")
fmt.Fprintf(&buf, " Pubkey serials (pks): %d\n", r.PubkeySerials)
fmt.Fprintf(&buf, " Serial pubkeys (spk): %d\n\n", r.SerialPubkeys)
fmt.Fprintln(&buf, "Graph Edges:")
fmt.Fprintf(&buf, " Event->Pubkey (epg): %d\n", r.EventPubkeyEdges)
fmt.Fprintf(&buf, " Pubkey->Event (peg): %d\n", r.PubkeyEventEdges)
fmt.Fprintf(&buf, " Event->Event (eeg): %d\n", r.EventEventEdges)
fmt.Fprintf(&buf, " Event<-Event (gee): %d\n\n", r.GraphEventEdges)
fmt.Fprintln(&buf, "Search Indexes:")
fmt.Fprintf(&buf, " Kind indexes (kc-): %d\n", r.KindIndexes)
fmt.Fprintf(&buf, " Pubkey indexes (pc-): %d\n", r.PubkeyIndexes)
fmt.Fprintf(&buf, " Tag indexes (tc-): %d\n", r.TagIndexes)
fmt.Fprintf(&buf, " Word indexes (wrd): %d\n", r.WordIndexes)
fmt.Fprintf(&buf, " ID indexes (eid): %d\n\n", r.IdIndexes)
fmt.Fprintln(&buf, "Issues Found:")
fmt.Fprintf(&buf, " Missing sei mappings: %d", r.MissingSerialEventIds)
if r.MissingSerialEventIds > 0 {
fmt.Fprint(&buf, " (CRITICAL)")
}
fmt.Fprintln(&buf)
fmt.Fprintf(&buf, " Orphaned sei mappings: %d\n", r.OrphanedSerialEventIds)
fmt.Fprintf(&buf, " Pubkey serial mismatch: %d\n", r.PubkeySerialMismatches)
fmt.Fprintf(&buf, " Orphaned indexes: %d\n\n", r.OrphanedIndexes)
if len(r.MissingSeiSamples) > 0 {
fmt.Fprintln(&buf, "Sample missing sei serials:")
for i, s := range r.MissingSeiSamples {
if i >= 10 {
fmt.Fprintf(&buf, " ... and %d more\n", len(r.MissingSeiSamples)-10)
break
}
fmt.Fprintf(&buf, " - %d\n", s)
}
fmt.Fprintln(&buf)
}
fmt.Fprintf(&buf, "Health Score: %d/100\n", r.HealthScore)
if r.HealthScore < 50 {
fmt.Fprintln(&buf, "\n⚠ Database has critical issues. Run 'orly db repair' to fix.")
} else if r.HealthScore < 80 {
fmt.Fprintln(&buf, "\n⚠ Database has some issues. Consider running 'orly db repair'.")
} else {
fmt.Fprintln(&buf, "\n✓ Database is healthy.")
}
return buf.String()
}
// HealthCheck performs a comprehensive health check of the database.
// It scans all index prefixes and verifies referential integrity.
func (d *D) HealthCheck(progress io.Writer) (report *HealthReport, err error) {
report = &HealthReport{
ScanStarted: time.Now(),
MissingSeiSamples: make([]uint64, 0, 100),
}
if progress != nil {
fmt.Fprintln(progress, "Starting database health check...")
}
// Build prefix buffers for all index types
cmpPrf := buildPrefix(indexes.CompactEventEnc(nil))
seiPrf := buildPrefix(indexes.SerialEventIdEnc(nil))
evtPrf := buildPrefix(indexes.EventEnc(nil))
sevPrf := buildPrefix(indexes.SmallEventEnc(nil))
pksPrf := buildPrefix(indexes.PubkeySerialEnc(nil, nil))
spkPrf := buildPrefix(indexes.SerialPubkeyEnc(nil))
epgPrf := buildPrefix(indexes.EventPubkeyGraphEnc(nil, nil, nil, nil))
pegPrf := buildPrefix(indexes.PubkeyEventGraphEnc(nil, nil, nil, nil))
eegPrf := buildPrefix(indexes.EventEventGraphEnc(nil, nil, nil, nil))
geePrf := buildPrefix(indexes.GraphEventEventEnc(nil, nil, nil, nil))
kcPrf := buildPrefix(indexes.KindEnc(nil, nil, nil))
pcPrf := buildPrefix(indexes.PubkeyEnc(nil, nil, nil))
tcPrf := buildPrefix(indexes.TagEnc(nil, nil, nil, nil))
wrdPrf := buildPrefix(indexes.WordEnc(nil, nil))
eidPrf := buildPrefix(indexes.IdEnc(nil, nil))
// Phase 1: Count all entries with each prefix
if progress != nil {
fmt.Fprintln(progress, "Phase 1: Counting entries by prefix...")
}
err = d.View(func(txn *badger.Txn) error {
// Count compact events
report.CompactEvents = countPrefix(txn, cmpPrf)
if progress != nil {
fmt.Fprintf(progress, " Compact events (cmp): %d\n", report.CompactEvents)
}
// Count serial->eventID mappings
report.SerialIdCount = countPrefix(txn, seiPrf)
if progress != nil {
fmt.Fprintf(progress, " Serial->ID maps (sei): %d\n", report.SerialIdCount)
}
// Count legacy events
report.LegacyEvents = countPrefix(txn, evtPrf)
report.SmallEvents = countPrefix(txn, sevPrf)
report.TotalEvents = report.CompactEvents + report.LegacyEvents + report.SmallEvents
if progress != nil {
fmt.Fprintf(progress, " Legacy events (evt): %d\n", report.LegacyEvents)
fmt.Fprintf(progress, " Small events (sev): %d\n", report.SmallEvents)
}
// Count pubkey serial mappings
report.PubkeySerials = countPrefix(txn, pksPrf)
report.SerialPubkeys = countPrefix(txn, spkPrf)
if progress != nil {
fmt.Fprintf(progress, " Pubkey serials (pks): %d, (spk): %d\n", report.PubkeySerials, report.SerialPubkeys)
}
// Count graph edges
report.EventPubkeyEdges = countPrefix(txn, epgPrf)
report.PubkeyEventEdges = countPrefix(txn, pegPrf)
report.EventEventEdges = countPrefix(txn, eegPrf)
report.GraphEventEdges = countPrefix(txn, geePrf)
if progress != nil {
fmt.Fprintf(progress, " Graph edges: epg=%d, peg=%d, eeg=%d, gee=%d\n",
report.EventPubkeyEdges, report.PubkeyEventEdges, report.EventEventEdges, report.GraphEventEdges)
}
// Count search indexes
report.KindIndexes = countPrefix(txn, kcPrf)
report.PubkeyIndexes = countPrefix(txn, pcPrf)
report.TagIndexes = countPrefix(txn, tcPrf)
report.WordIndexes = countPrefix(txn, wrdPrf)
report.IdIndexes = countPrefix(txn, eidPrf)
if progress != nil {
fmt.Fprintf(progress, " Indexes: kc=%d, pc=%d, tc=%d, wrd=%d, eid=%d\n",
report.KindIndexes, report.PubkeyIndexes, report.TagIndexes, report.WordIndexes, report.IdIndexes)
}
return nil
})
if chk.E(err) {
return nil, err
}
// Phase 2: Check cmp->sei integrity (CRITICAL)
if progress != nil {
fmt.Fprintln(progress, "\nPhase 2: Checking compact event -> serial ID integrity...")
}
err = d.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf})
defer it.Close()
checked := int64(0)
for it.Rewind(); it.Valid(); it.Next() {
// Extract serial from cmp key: prefix (3 bytes) + serial (5 bytes)
key := it.Item().Key()
if len(key) < 8 {
continue
}
// Extract the serial
serial := extractSerial(key[3:8])
// Check if sei entry exists for this serial
seiKey := buildSeiKey(serial)
_, err := txn.Get(seiKey)
if err == badger.ErrKeyNotFound {
report.MissingSerialEventIds++
if len(report.MissingSeiSamples) < 100 {
report.MissingSeiSamples = append(report.MissingSeiSamples, serial)
}
} else if err != nil {
log.W.F("error checking sei for serial %d: %v", serial, err)
}
checked++
if progress != nil && checked%100000 == 0 {
fmt.Fprintf(progress, " Checked %d compact events, %d missing sei so far...\n",
checked, report.MissingSerialEventIds)
}
}
if progress != nil {
fmt.Fprintf(progress, " Checked %d compact events, found %d missing sei entries\n",
checked, report.MissingSerialEventIds)
}
return nil
})
if chk.E(err) {
return nil, err
}
// Phase 3: Check for orphaned sei entries (sei without cmp)
if progress != nil {
fmt.Fprintln(progress, "\nPhase 3: Checking for orphaned serial ID mappings...")
}
err = d.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{Prefix: seiPrf})
defer it.Close()
checked := int64(0)
for it.Rewind(); it.Valid(); it.Next() {
key := it.Item().Key()
if len(key) < 8 {
continue
}
// Extract serial from sei key
serial := extractSerial(key[3:8])
// Check if cmp entry exists for this serial
cmpKey := buildCmpKey(serial)
_, err := txn.Get(cmpKey)
if err == badger.ErrKeyNotFound {
// Also check legacy evt format
evtKey := buildEvtKey(serial)
_, err2 := txn.Get(evtKey)
if err2 == badger.ErrKeyNotFound {
report.OrphanedSerialEventIds++
}
} else if err != nil {
log.W.F("error checking cmp for serial %d: %v", serial, err)
}
checked++
if progress != nil && checked%100000 == 0 {
fmt.Fprintf(progress, " Checked %d sei entries, %d orphaned so far...\n",
checked, report.OrphanedSerialEventIds)
}
}
if progress != nil {
fmt.Fprintf(progress, " Checked %d sei entries, found %d orphaned\n",
checked, report.OrphanedSerialEventIds)
}
return nil
})
if chk.E(err) {
return nil, err
}
// Phase 4: Check pubkey serial consistency
if progress != nil {
fmt.Fprintln(progress, "\nPhase 4: Checking pubkey serial consistency...")
}
err = d.View(func(txn *badger.Txn) error {
// Check that pks count roughly matches spk count
// A small difference is acceptable due to timing, but large differences indicate corruption
diff := report.PubkeySerials - report.SerialPubkeys
if diff < 0 {
diff = -diff
}
// Allow 1% difference
threshold := report.PubkeySerials / 100
if threshold < 10 {
threshold = 10
}
if diff > threshold {
report.PubkeySerialMismatches = diff
if progress != nil {
fmt.Fprintf(progress, " Found %d pubkey serial mismatches (pks=%d, spk=%d)\n",
diff, report.PubkeySerials, report.SerialPubkeys)
}
} else if progress != nil {
fmt.Fprintln(progress, " Pubkey serial counts are consistent")
}
return nil
})
if chk.E(err) {
return nil, err
}
// Calculate health score
report.ScanDuration = time.Since(report.ScanStarted)
report.HealthScore = calculateHealthScore(report)
if progress != nil {
fmt.Fprintf(progress, "\nHealth check complete. Score: %d/100\n", report.HealthScore)
}
return report, nil
}
// buildPrefix creates a prefix buffer from an encoder.
func buildPrefix(enc *indexes.T) []byte {
buf := new(bytes.Buffer)
if err := enc.MarshalWrite(buf); err != nil {
return nil
}
// Return only the prefix part (3 bytes)
b := buf.Bytes()
if len(b) >= 3 {
return b[:3]
}
return b
}
// countPrefix counts the number of entries with the given prefix.
func countPrefix(txn *badger.Txn, prefix []byte) int64 {
it := txn.NewIterator(badger.IteratorOptions{
Prefix: prefix,
PrefetchValues: false,
})
defer it.Close()
var count int64
for it.Rewind(); it.Valid(); it.Next() {
count++
}
return count
}
// extractSerial extracts a 40-bit serial from 5 bytes (big-endian).
func extractSerial(b []byte) uint64 {
if len(b) < 5 {
return 0
}
return (uint64(b[0]) << 32) |
(uint64(b[1]) << 24) |
(uint64(b[2]) << 16) |
(uint64(b[3]) << 8) |
uint64(b[4])
}
// buildSeiKey builds a sei (serial->eventID) key for the given serial.
func buildSeiKey(serial uint64) []byte {
ser := new(types.Uint40)
ser.Set(serial)
buf := new(bytes.Buffer)
indexes.SerialEventIdEnc(ser).MarshalWrite(buf)
return buf.Bytes()
}
// buildCmpKey builds a cmp (compact event) key for the given serial.
func buildCmpKey(serial uint64) []byte {
ser := new(types.Uint40)
ser.Set(serial)
buf := new(bytes.Buffer)
indexes.CompactEventEnc(ser).MarshalWrite(buf)
return buf.Bytes()
}
// buildEvtKey builds an evt (legacy event) key for the given serial.
func buildEvtKey(serial uint64) []byte {
ser := new(types.Uint40)
ser.Set(serial)
buf := new(bytes.Buffer)
indexes.EventEnc(ser).MarshalWrite(buf)
return buf.Bytes()
}
// calculateHealthScore calculates a health score from 0-100 based on the report.
func calculateHealthScore(r *HealthReport) int {
score := 100
// Missing sei is critical - each one costs 1 point, max 50 point penalty
if r.MissingSerialEventIds > 0 {
penalty := int(r.MissingSerialEventIds)
if penalty > 50 {
penalty = 50
}
score -= penalty
}
// Orphaned sei is less critical - each one costs 0.1 points, max 20 point penalty
if r.OrphanedSerialEventIds > 0 {
penalty := int(r.OrphanedSerialEventIds / 10)
if penalty > 20 {
penalty = 20
}
score -= penalty
}
// Pubkey mismatches cost 0.5 points each, max 20 point penalty
if r.PubkeySerialMismatches > 0 {
penalty := int(r.PubkeySerialMismatches / 2)
if penalty > 20 {
penalty = 20
}
score -= penalty
}
// Orphaned indexes cost 0.01 points each, max 10 point penalty
if r.OrphanedIndexes > 0 {
penalty := int(r.OrphanedIndexes / 100)
if penalty > 10 {
penalty = 10
}
score -= penalty
}
if score < 0 {
score = 0
}
return score
}

18
pkg/database/register_badger.go

@ -0,0 +1,18 @@
//go:build !(js && wasm)
package database
import (
"context"
)
func init() {
// Register the Badger driver with the driver registry.
// This is always available on non-wasm builds.
RegisterDriver("badger", "Badger LSM database (default)", badgerFactory)
}
// badgerFactory creates a new Badger database instance.
func badgerFactory(ctx context.Context, cancel context.CancelFunc, cfg *DatabaseConfig) (Database, error) {
return NewWithConfig(ctx, cancel, cfg)
}

93
pkg/database/registry.go

@ -0,0 +1,93 @@
//go:build !(js && wasm)
package database
import (
"context"
"sort"
"sync"
"lol.mleku.dev/errorf"
)
// DriverFactory is the signature for database driver factory functions.
type DriverFactory func(ctx context.Context, cancel context.CancelFunc, cfg *DatabaseConfig) (Database, error)
// DriverInfo contains metadata about a registered driver.
type DriverInfo struct {
Name string
Description string
Factory DriverFactory
}
var (
driversMu sync.RWMutex
drivers = make(map[string]*DriverInfo)
)
// RegisterDriver registers a database driver with the given name and factory.
// This is typically called from init() in the driver package.
func RegisterDriver(name, description string, factory DriverFactory) {
driversMu.Lock()
defer driversMu.Unlock()
drivers[name] = &DriverInfo{
Name: name,
Description: description,
Factory: factory,
}
}
// GetDriver returns the factory for the named driver, or nil if not found.
func GetDriver(name string) DriverFactory {
driversMu.RLock()
defer driversMu.RUnlock()
if info, ok := drivers[name]; ok {
return info.Factory
}
return nil
}
// HasDriver returns true if the named driver is registered.
func HasDriver(name string) bool {
driversMu.RLock()
defer driversMu.RUnlock()
_, ok := drivers[name]
return ok
}
// ListDrivers returns a sorted list of registered driver names.
func ListDrivers() []string {
driversMu.RLock()
defer driversMu.RUnlock()
names := make([]string, 0, len(drivers))
for name := range drivers {
names = append(names, name)
}
sort.Strings(names)
return names
}
// ListDriversWithInfo returns information about all registered drivers.
func ListDriversWithInfo() []*DriverInfo {
driversMu.RLock()
defer driversMu.RUnlock()
infos := make([]*DriverInfo, 0, len(drivers))
for _, info := range drivers {
infos = append(infos, info)
}
// Sort by name for consistent output
sort.Slice(infos, func(i, j int) bool {
return infos[i].Name < infos[j].Name
})
return infos
}
// NewFromDriver creates a database using the named driver.
// Returns an error if the driver is not registered.
func NewFromDriver(ctx context.Context, cancel context.CancelFunc, driverName string, cfg *DatabaseConfig) (Database, error) {
factory := GetDriver(driverName)
if factory == nil {
return nil, errorf.E("database driver %q not available; registered: %v", driverName, ListDrivers())
}
return factory(ctx, cancel, cfg)
}

613
pkg/database/repair.go

@ -0,0 +1,613 @@
//go:build !(js && wasm)
package database
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"time"
"github.com/dgraph-io/badger/v4"
"git.mleku.dev/mleku/nostr/crypto/ec/schnorr"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/tag"
"git.mleku.dev/mleku/nostr/encoders/varint"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes"
"next.orly.dev/pkg/database/indexes/types"
)
// RepairReport contains the results of a database repair operation.
type RepairReport struct {
// Operation metadata
Started time.Time
Duration time.Duration
DryRun bool
// Events scanned
CompactEventsScanned int64
LegacyEventsScanned int64
// Repairs performed
SeiEntriesCreated int64 // sei mappings rebuilt from compact events
SeiEntriesRemoved int64 // orphaned sei entries removed
PubkeyMappingsFixed int64 // pubkey serial mappings fixed
OrphanedIndexesFixed int64 // orphaned index entries removed
// Errors encountered
Errors []string
}
// String returns a human-readable repair report.
func (r *RepairReport) String() string {
var buf bytes.Buffer
fmt.Fprintln(&buf, "Database Repair Report")
fmt.Fprintln(&buf, "======================")
fmt.Fprintf(&buf, "Duration: %v\n", r.Duration)
if r.DryRun {
fmt.Fprintln(&buf, "Mode: DRY RUN (no changes made)")
} else {
fmt.Fprintln(&buf, "Mode: REPAIR (changes applied)")
}
fmt.Fprintln(&buf)
fmt.Fprintln(&buf, "Events Scanned:")
fmt.Fprintf(&buf, " Compact events: %d\n", r.CompactEventsScanned)
fmt.Fprintf(&buf, " Legacy events: %d\n\n", r.LegacyEventsScanned)
fmt.Fprintln(&buf, "Repairs:")
fmt.Fprintf(&buf, " sei entries created: %d\n", r.SeiEntriesCreated)
fmt.Fprintf(&buf, " sei entries removed: %d\n", r.SeiEntriesRemoved)
fmt.Fprintf(&buf, " pubkey mappings fixed: %d\n", r.PubkeyMappingsFixed)
fmt.Fprintf(&buf, " orphaned indexes: %d\n\n", r.OrphanedIndexesFixed)
if len(r.Errors) > 0 {
fmt.Fprintln(&buf, "Errors encountered:")
for _, e := range r.Errors {
fmt.Fprintf(&buf, " - %s\n", e)
}
}
return buf.String()
}
// RepairOptions configures the repair operation.
type RepairOptions struct {
// DryRun if true, only reports what would be fixed without making changes
DryRun bool
// FixMissingSei if true, rebuilds missing sei entries from compact events
FixMissingSei bool
// RemoveOrphanedSei if true, removes sei entries without corresponding events
RemoveOrphanedSei bool
// FixPubkeyMappings if true, fixes inconsistent pubkey serial mappings
FixPubkeyMappings bool
// Progress writer for progress updates
Progress io.Writer
}
// DefaultRepairOptions returns the default repair options.
func DefaultRepairOptions() *RepairOptions {
return &RepairOptions{
DryRun: false,
FixMissingSei: true,
RemoveOrphanedSei: true,
FixPubkeyMappings: true,
}
}
// Repair performs database repair operations based on the provided options.
// It fixes integrity issues found by HealthCheck.
func (d *D) Repair(ctx context.Context, opts *RepairOptions) (report *RepairReport, err error) {
if opts == nil {
opts = DefaultRepairOptions()
}
report = &RepairReport{
Started: time.Now(),
DryRun: opts.DryRun,
Errors: make([]string, 0),
}
progress := opts.Progress
if progress != nil {
if opts.DryRun {
fmt.Fprintln(progress, "Starting database repair (DRY RUN)...")
} else {
fmt.Fprintln(progress, "Starting database repair...")
}
}
// Phase 1: Fix missing sei entries
if opts.FixMissingSei {
if progress != nil {
fmt.Fprintln(progress, "\nPhase 1: Rebuilding missing sei entries from compact events...")
}
if err = d.repairMissingSei(ctx, opts, report); err != nil {
report.Errors = append(report.Errors, fmt.Sprintf("sei repair error: %v", err))
log.E.F("sei repair error: %v", err)
}
}
// Phase 2: Remove orphaned sei entries
if opts.RemoveOrphanedSei {
if progress != nil {
fmt.Fprintln(progress, "\nPhase 2: Removing orphaned sei entries...")
}
if err = d.repairOrphanedSei(ctx, opts, report); err != nil {
report.Errors = append(report.Errors, fmt.Sprintf("orphaned sei repair error: %v", err))
log.E.F("orphaned sei repair error: %v", err)
}
}
// Phase 3: Fix pubkey serial mappings
if opts.FixPubkeyMappings {
if progress != nil {
fmt.Fprintln(progress, "\nPhase 3: Checking pubkey serial mappings...")
}
if err = d.repairPubkeyMappings(ctx, opts, report); err != nil {
report.Errors = append(report.Errors, fmt.Sprintf("pubkey mapping repair error: %v", err))
log.E.F("pubkey mapping repair error: %v", err)
}
}
report.Duration = time.Since(report.Started)
if progress != nil {
fmt.Fprintf(progress, "\nRepair complete in %v\n", report.Duration)
fmt.Fprintf(progress, " sei entries created: %d\n", report.SeiEntriesCreated)
fmt.Fprintf(progress, " sei entries removed: %d\n", report.SeiEntriesRemoved)
fmt.Fprintf(progress, " pubkey mappings fixed: %d\n", report.PubkeyMappingsFixed)
if opts.DryRun {
fmt.Fprintln(progress, "\n(DRY RUN - no changes were made)")
}
}
return report, nil
}
// repairMissingSei rebuilds sei entries for compact events that are missing them.
func (d *D) repairMissingSei(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
progress := opts.Progress
cmpPrf := buildPrefix(indexes.CompactEventEnc(nil))
seiPrf := buildPrefix(indexes.SerialEventIdEnc(nil))
// First pass: collect all serials that need sei entries
type repairItem struct {
serial uint64
eventID []byte
}
var toRepair []repairItem
err := d.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{Prefix: cmpPrf})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
item := it.Item()
key := item.Key()
if len(key) < 8 {
continue
}
serial := extractSerial(key[3:8])
report.CompactEventsScanned++
// Check if sei entry exists
seiKey := buildSeiKey(serial)
_, err := txn.Get(seiKey)
if err == badger.ErrKeyNotFound {
// Need to repair - extract event ID from compact event
var eventData []byte
err = item.Value(func(val []byte) error {
eventData = make([]byte, len(val))
copy(eventData, val)
return nil
})
if err != nil {
continue
}
// Decode compact event to get the event ID
eventID, decodeErr := extractEventIDFromCompact(eventData, serial, d)
if decodeErr != nil {
if progress != nil && report.CompactEventsScanned%10000 == 0 {
log.D.F("could not extract event ID for serial %d: %v", serial, decodeErr)
}
continue
}
toRepair = append(toRepair, repairItem{serial: serial, eventID: eventID})
}
if progress != nil && report.CompactEventsScanned%100000 == 0 {
fmt.Fprintf(progress, " Scanned %d compact events, found %d needing sei repair...\n",
report.CompactEventsScanned, len(toRepair))
}
}
return nil
})
if err != nil {
return err
}
if progress != nil {
fmt.Fprintf(progress, " Found %d compact events needing sei repair\n", len(toRepair))
}
// Also count legacy events
err = d.View(func(txn *badger.Txn) error {
evtPrf := buildPrefix(indexes.EventEnc(nil))
it := txn.NewIterator(badger.IteratorOptions{Prefix: evtPrf, PrefetchValues: false})
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
report.LegacyEventsScanned++
}
return nil
})
if err != nil {
log.W.F("error counting legacy events: %v", err)
}
// Second pass: create missing sei entries
if opts.DryRun {
report.SeiEntriesCreated = int64(len(toRepair))
if progress != nil {
fmt.Fprintf(progress, " Would create %d sei entries (dry run)\n", len(toRepair))
}
return nil
}
// Write in batches
batchSize := 1000
for i := 0; i < len(toRepair); i += batchSize {
end := i + batchSize
if end > len(toRepair) {
end = len(toRepair)
}
batch := toRepair[i:end]
wb := d.DB.NewWriteBatch()
for _, item := range batch {
seiKey := buildSeiKey(item.serial)
if err := wb.Set(seiKey, item.eventID); err != nil {
wb.Cancel()
return err
}
}
if err := wb.Flush(); err != nil {
return err
}
report.SeiEntriesCreated += int64(len(batch))
if progress != nil && i+batchSize < len(toRepair) {
fmt.Fprintf(progress, " Created %d/%d sei entries...\n", report.SeiEntriesCreated, len(toRepair))
}
}
_ = seiPrf // prevent unused warning
return nil
}
// repairOrphanedSei removes sei entries that don't have corresponding events.
func (d *D) repairOrphanedSei(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
progress := opts.Progress
seiPrf := buildPrefix(indexes.SerialEventIdEnc(nil))
cmpPrf := buildPrefix(indexes.CompactEventEnc(nil))
evtPrf := buildPrefix(indexes.EventEnc(nil))
var orphanedSerials []uint64
err := d.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.IteratorOptions{Prefix: seiPrf, PrefetchValues: false})
defer it.Close()
checked := int64(0)
for it.Rewind(); it.Valid(); it.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
key := it.Item().Key()
if len(key) < 8 {
continue
}
serial := extractSerial(key[3:8])
// Check if cmp entry exists
cmpKey := buildCmpKey(serial)
_, err := txn.Get(cmpKey)
if err == badger.ErrKeyNotFound {
// Also check legacy evt
evtKey := buildEvtKey(serial)
_, err2 := txn.Get(evtKey)
if err2 == badger.ErrKeyNotFound {
orphanedSerials = append(orphanedSerials, serial)
}
}
checked++
if progress != nil && checked%100000 == 0 {
fmt.Fprintf(progress, " Checked %d sei entries, found %d orphaned...\n",
checked, len(orphanedSerials))
}
}
return nil
})
if err != nil {
return err
}
if progress != nil {
fmt.Fprintf(progress, " Found %d orphaned sei entries\n", len(orphanedSerials))
}
if opts.DryRun {
report.SeiEntriesRemoved = int64(len(orphanedSerials))
if progress != nil {
fmt.Fprintf(progress, " Would remove %d orphaned sei entries (dry run)\n", len(orphanedSerials))
}
return nil
}
// Remove orphaned entries in batches
batchSize := 1000
for i := 0; i < len(orphanedSerials); i += batchSize {
end := i + batchSize
if end > len(orphanedSerials) {
end = len(orphanedSerials)
}
batch := orphanedSerials[i:end]
wb := d.DB.NewWriteBatch()
for _, serial := range batch {
seiKey := buildSeiKey(serial)
if err := wb.Delete(seiKey); err != nil {
wb.Cancel()
return err
}
}
if err := wb.Flush(); err != nil {
return err
}
report.SeiEntriesRemoved += int64(len(batch))
}
_ = cmpPrf // prevent unused
_ = evtPrf
return nil
}
// repairPubkeyMappings fixes inconsistent pubkey serial mappings.
func (d *D) repairPubkeyMappings(ctx context.Context, opts *RepairOptions, report *RepairReport) error {
progress := opts.Progress
pksPrf := buildPrefix(indexes.PubkeySerialEnc(nil, nil))
spkPrf := buildPrefix(indexes.SerialPubkeyEnc(nil))
// Count entries
var pksCount, spkCount int64
err := d.View(func(txn *badger.Txn) error {
pksCount = countPrefix(txn, pksPrf)
spkCount = countPrefix(txn, spkPrf)
return nil
})
if err != nil {
return err
}
if progress != nil {
fmt.Fprintf(progress, " pks entries: %d, spk entries: %d\n", pksCount, spkCount)
}
// For now, just report the mismatch
// Full repair would require scanning all events and rebuilding mappings
diff := pksCount - spkCount
if diff < 0 {
diff = -diff
}
threshold := pksCount / 100
if threshold < 10 {
threshold = 10
}
if diff > threshold {
report.PubkeyMappingsFixed = diff
if progress != nil {
fmt.Fprintf(progress, " Found %d pubkey mapping inconsistencies\n", diff)
fmt.Fprintln(progress, " Note: Full pubkey mapping repair requires event rescan (not yet implemented)")
}
} else {
if progress != nil {
fmt.Fprintln(progress, " Pubkey mappings are consistent")
}
}
return nil
}
// extractEventIDFromCompact decodes a compact event and computes its ID.
// This is used during repair when the sei entry is missing.
// The event ID is computed by hashing the serialized event content.
func extractEventIDFromCompact(data []byte, serial uint64, d *D) ([]byte, error) {
// Decode the compact event without the ID
ev, err := unmarshalCompactForRepair(data, d)
if err != nil {
return nil, fmt.Errorf("failed to decode compact event: %w", err)
}
// Compute the event ID by hashing the serialized event
eventID := ev.GetIDBytes()
if eventID == nil || len(eventID) != 32 {
return nil, fmt.Errorf("failed to compute event ID")
}
return eventID, nil
}
// unmarshalCompactForRepair decodes a compact event for repair purposes.
// Unlike UnmarshalCompactEvent, this doesn't require the event ID upfront.
func unmarshalCompactForRepair(data []byte, d *D) (*event.E, error) {
r := bytes.NewReader(data)
ev := new(event.E)
// Version byte
version, err := r.ReadByte()
if err != nil {
return nil, err
}
if version != CompactFormatVersion {
return nil, fmt.Errorf("unsupported compact format version: %d", version)
}
// Author pubkey serial (5 bytes) -> full pubkey
authorSerial, err := readUint40(r)
if err != nil {
return nil, err
}
ev.Pubkey, err = d.getPubkeyBySerial(authorSerial)
if err != nil {
return nil, fmt.Errorf("failed to get pubkey for serial %d: %w", authorSerial, err)
}
// CreatedAt (varint)
ca, err := varint.Decode(r)
if err != nil {
return nil, err
}
ev.CreatedAt = int64(ca)
// Kind (2 bytes big-endian)
if err = binary.Read(r, binary.BigEndian, &ev.Kind); err != nil {
return nil, err
}
// Tags
nTags, err := varint.Decode(r)
if err != nil {
return nil, err
}
if nTags > MaxTagsPerEvent {
return nil, ErrTooManyTags
}
if nTags > 0 {
ev.Tags = tag.NewSWithCap(int(nTags))
resolver := &repairSerialResolver{d: d}
for i := uint64(0); i < nTags; i++ {
t, err := decodeCompactTag(r, resolver)
if err != nil {
return nil, err
}
*ev.Tags = append(*ev.Tags, t)
}
}
// Content
contentLen, err := varint.Decode(r)
if err != nil {
return nil, err
}
if contentLen > MaxContentLength {
return nil, ErrContentTooLarge
}
ev.Content = make([]byte, contentLen)
if _, err = io.ReadFull(r, ev.Content); err != nil {
return nil, err
}
// Signature (64 bytes)
ev.Sig = make([]byte, schnorr.SignatureSize)
if _, err = io.ReadFull(r, ev.Sig); err != nil {
return nil, err
}
return ev, nil
}
// repairSerialResolver implements SerialResolver for repair operations.
type repairSerialResolver struct {
d *D
}
func (r *repairSerialResolver) GetOrCreatePubkeySerial(pubkey []byte) (uint64, error) {
return 0, fmt.Errorf("not supported in repair mode")
}
func (r *repairSerialResolver) GetPubkeyBySerial(serial uint64) ([]byte, error) {
return r.d.getPubkeyBySerial(serial)
}
func (r *repairSerialResolver) GetEventSerialById(eventId []byte) (uint64, bool, error) {
return 0, false, nil // Not needed for decoding
}
func (r *repairSerialResolver) GetEventIdBySerial(serial uint64) ([]byte, error) {
return r.d.getEventIdBySerial(serial)
}
// getEventIdBySerial returns the event ID for a given serial.
// This is a helper for compact event decoding.
func (d *D) getEventIdBySerial(serial uint64) ([]byte, error) {
var eventID []byte
err := d.View(func(txn *badger.Txn) error {
seiKey := buildSeiKey(serial)
item, err := txn.Get(seiKey)
if err != nil {
return err
}
return item.Value(func(val []byte) error {
eventID = make([]byte, len(val))
copy(eventID, val)
return nil
})
})
return eventID, err
}
// getPubkeyBySerial returns the pubkey for a given serial.
// This is a helper for compact event decoding.
func (d *D) getPubkeyBySerial(serial uint64) ([]byte, error) {
var pubkey []byte
err := d.View(func(txn *badger.Txn) error {
ser := new(types.Uint40)
ser.Set(serial)
spkKey := new(bytes.Buffer)
indexes.SerialPubkeyEnc(ser).MarshalWrite(spkKey)
item, err := txn.Get(spkKey.Bytes())
if err != nil {
return err
}
return item.Value(func(val []byte) error {
pubkey = make([]byte, len(val))
copy(pubkey, val)
return nil
})
})
return pubkey, err
}

21
pkg/sync/register_cluster.go

@ -0,0 +1,21 @@
//go:build !(js && wasm)
package sync
import (
"context"
"next.orly.dev/pkg/database"
)
func init() {
// Register the Cluster driver with the driver registry.
RegisterDriver("cluster", "Cluster-based replication", clusterFactory)
}
// clusterFactory creates a new Cluster sync service instance.
func clusterFactory(ctx context.Context, db database.Database, cfg *DriverConfig) (Service, error) {
// Cluster sync is implemented as a standalone service
// The actual implementation would create a cluster.Manager here
return &stubService{name: "cluster"}, nil
}

21
pkg/sync/register_distributed.go

@ -0,0 +1,21 @@
//go:build !(js && wasm)
package sync
import (
"context"
"next.orly.dev/pkg/database"
)
func init() {
// Register the Distributed driver with the driver registry.
RegisterDriver("distributed", "Distributed multi-node synchronization", distributedFactory)
}
// distributedFactory creates a new Distributed sync service instance.
func distributedFactory(ctx context.Context, db database.Database, cfg *DriverConfig) (Service, error) {
// Distributed sync is implemented as a standalone service
// The actual implementation would create a distributed.Manager here
return &stubService{name: "distributed"}, nil
}

30
pkg/sync/register_negentropy.go

@ -0,0 +1,30 @@
//go:build !(js && wasm)
package sync
import (
"context"
"next.orly.dev/pkg/database"
)
func init() {
// Register the Negentropy driver with the driver registry.
RegisterDriver("negentropy", "NIP-77 negentropy set reconciliation", negentropyFactory)
}
// negentropyFactory creates a new Negentropy sync service instance.
func negentropyFactory(ctx context.Context, db database.Database, cfg *DriverConfig) (Service, error) {
// Negentropy sync is implemented as a standalone service
// The actual implementation would create a negentropy.Service here
return &stubService{name: "negentropy"}, nil
}
// stubService is a placeholder for sync services not yet integrated
type stubService struct {
name string
}
func (s *stubService) Start() error { return nil }
func (s *stubService) Stop() error { return nil }
func (s *stubService) Type() string { return s.name }

21
pkg/sync/register_relaygroup.go

@ -0,0 +1,21 @@
//go:build !(js && wasm)
package sync
import (
"context"
"next.orly.dev/pkg/database"
)
func init() {
// Register the RelayGroup driver with the driver registry.
RegisterDriver("relaygroup", "Relay group configuration management", relaygroupFactory)
}
// relaygroupFactory creates a new RelayGroup sync service instance.
func relaygroupFactory(ctx context.Context, db database.Database, cfg *DriverConfig) (Service, error) {
// RelayGroup is implemented as a manager service
// The actual implementation would create a relaygroup.Manager here
return &stubService{name: "relaygroup"}, nil
}

129
pkg/sync/registry.go

@ -0,0 +1,129 @@
//go:build !(js && wasm)
package sync
import (
"context"
"sort"
"sync"
"lol.mleku.dev/errorf"
"next.orly.dev/pkg/database"
)
// DriverFactory is the signature for sync driver factory functions.
type DriverFactory func(ctx context.Context, db database.Database, cfg *DriverConfig) (Service, error)
// DriverConfig holds configuration for sync drivers.
type DriverConfig struct {
// Common settings
LogLevel string
// Negentropy-specific settings
TargetRelays []string
Filter string
SyncInterval string
BatchSize int
// Cluster-specific settings
AdminNpubs []string
PropagatePrivilegedEvents bool
PollInterval string
// Distributed-specific settings
NodeID string
RelayURL string
Peers []string
// Relay group settings
RelayGroups []string
}
// DriverInfo contains metadata about a registered sync driver.
type DriverInfo struct {
Name string
Description string
Factory DriverFactory
}
// Service is the interface that sync drivers must implement.
type Service interface {
// Start begins the sync service
Start() error
// Stop stops the sync service
Stop() error
// Type returns the service type name
Type() string
}
var (
driversMu sync.RWMutex
drivers = make(map[string]*DriverInfo)
)
// RegisterDriver registers a sync driver with the given name and factory.
// This is typically called from init() in the driver package.
func RegisterDriver(name, description string, factory DriverFactory) {
driversMu.Lock()
defer driversMu.Unlock()
drivers[name] = &DriverInfo{
Name: name,
Description: description,
Factory: factory,
}
}
// GetDriver returns the factory for the named driver, or nil if not found.
func GetDriver(name string) DriverFactory {
driversMu.RLock()
defer driversMu.RUnlock()
if info, ok := drivers[name]; ok {
return info.Factory
}
return nil
}
// HasDriver returns true if the named driver is registered.
func HasDriver(name string) bool {
driversMu.RLock()
defer driversMu.RUnlock()
_, ok := drivers[name]
return ok
}
// ListDrivers returns a sorted list of registered driver names.
func ListDrivers() []string {
driversMu.RLock()
defer driversMu.RUnlock()
names := make([]string, 0, len(drivers))
for name := range drivers {
names = append(names, name)
}
sort.Strings(names)
return names
}
// ListDriversWithInfo returns information about all registered drivers.
func ListDriversWithInfo() []*DriverInfo {
driversMu.RLock()
defer driversMu.RUnlock()
infos := make([]*DriverInfo, 0, len(drivers))
for _, info := range drivers {
infos = append(infos, info)
}
// Sort by name for consistent output
sort.Slice(infos, func(i, j int) bool {
return infos[i].Name < infos[j].Name
})
return infos
}
// NewFromDriver creates a sync service using the named driver.
// Returns an error if the driver is not registered.
func NewFromDriver(ctx context.Context, driverName string, db database.Database, cfg *DriverConfig) (Service, error) {
factory := GetDriver(driverName)
if factory == nil {
return nil, errorf.E("sync driver %q not available; registered: %v", driverName, ListDrivers())
}
return factory(ctx, db, cfg)
}

2
pkg/version/version

@ -1 +1 @@
v0.55.3 v0.55.4

Loading…
Cancel
Save