diff --git a/.claude/commands/release.md b/.claude/commands/release.md index b91815a..48e0246 100644 --- a/.claude/commands/release.md +++ b/.claude/commands/release.md @@ -56,7 +56,14 @@ If no argument provided, default to `patch`. ``` Note: setcap must be re-applied after each binary rebuild to allow binding to ports 80/443. -12. **Report completion** with the new version and commit hash +12. **Build and deploy monolithic binary** (ARM64): + Build the unified `orly` binary which includes all subcommands (db, acl, sync, launcher, relay) in a single binary: + ```bash + ssh relay.orly.dev 'cd ~/src/next.orly.dev && GOPATH=$HOME CGO_ENABLED=0 ~/go/bin/go build -o ~/.local/bin/orly ./cmd/orly && ~/.local/bin/orly version' + ``` + This provides the unified binary for split-mode deployments where a single binary can run any component via subcommands. + +13. **Report completion** with the new version and commit hash ## Important: - Do NOT push to github remote (only origin and gitea) diff --git a/app/config/config.go b/app/config/config.go index fb84741..20a93bb 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -120,20 +120,20 @@ type C struct { QueryCacheDisabled bool `env:"ORLY_QUERY_CACHE_DISABLED" default:"true" usage:"disable query cache to reduce memory usage (trades memory for query performance)"` // gRPC database client settings (only used when ORLY_DB_TYPE=grpc) - GRPCServerAddress string `env:"ORLY_GRPC_SERVER" usage:"address of remote gRPC database server (only used when ORLY_DB_TYPE=grpc)"` + GRPCServerAddress string `env:"ORLY_GRPC_SERVER" default:"127.0.0.1:50051" usage:"address of remote gRPC database server (only used when ORLY_DB_TYPE=grpc)"` GRPCConnectTimeout time.Duration `env:"ORLY_GRPC_CONNECT_TIMEOUT" default:"10s" usage:"gRPC connection timeout (only used when ORLY_DB_TYPE=grpc)"` // gRPC ACL client settings (only used when ORLY_ACL_TYPE=grpc) ACLType string `env:"ORLY_ACL_TYPE" default:"local" usage:"ACL backend: local (in-process) or grpc (remote ACL server)"` - GRPCACLServerAddress string `env:"ORLY_GRPC_ACL_SERVER" usage:"address of remote gRPC ACL server (only used when ORLY_ACL_TYPE=grpc)"` + GRPCACLServerAddress string `env:"ORLY_GRPC_ACL_SERVER" default:"127.0.0.1:50052" usage:"address of remote gRPC ACL server (only used when ORLY_ACL_TYPE=grpc)"` GRPCACLConnectTimeout time.Duration `env:"ORLY_GRPC_ACL_TIMEOUT" default:"10s" usage:"gRPC ACL connection timeout (only used when ORLY_ACL_TYPE=grpc)"` // gRPC Sync client settings (only used when ORLY_SYNC_TYPE=grpc) SyncType string `env:"ORLY_SYNC_TYPE" default:"local" usage:"sync backend: local (in-process) or grpc (remote sync services)"` - GRPCSyncDistributedAddress string `env:"ORLY_GRPC_SYNC_DISTRIBUTED" usage:"address of gRPC distributed sync server"` - GRPCSyncClusterAddress string `env:"ORLY_GRPC_SYNC_CLUSTER" usage:"address of gRPC cluster sync server"` - GRPCSyncRelayGroupAddress string `env:"ORLY_GRPC_SYNC_RELAYGROUP" usage:"address of gRPC relay group server"` - GRPCSyncNegentropyAddress string `env:"ORLY_GRPC_SYNC_NEGENTROPY" usage:"address of gRPC negentropy server"` + GRPCSyncDistributedAddress string `env:"ORLY_GRPC_SYNC_DISTRIBUTED" default:"127.0.0.1:50053" usage:"address of gRPC distributed sync server"` + GRPCSyncClusterAddress string `env:"ORLY_GRPC_SYNC_CLUSTER" default:"127.0.0.1:50054" usage:"address of gRPC cluster sync server"` + GRPCSyncRelayGroupAddress string `env:"ORLY_GRPC_SYNC_RELAYGROUP" default:"127.0.0.1:50055" usage:"address of gRPC relay group server"` + GRPCSyncNegentropyAddress string `env:"ORLY_GRPC_SYNC_NEGENTROPY" default:"127.0.0.1:50056" usage:"address of gRPC negentropy server"` GRPCSyncConnectTimeout time.Duration `env:"ORLY_GRPC_SYNC_TIMEOUT" default:"10s" usage:"gRPC sync connection timeout"` NegentropyEnabled bool `env:"ORLY_NEGENTROPY_ENABLED" default:"false" usage:"enable NIP-77 negentropy set reconciliation"` diff --git a/cmd/orly/acl/acl.go b/cmd/orly/acl/acl.go index 4bd3385..a8fe134 100644 --- a/cmd/orly/acl/acl.go +++ b/cmd/orly/acl/acl.go @@ -4,14 +4,60 @@ package acl import ( + "context" "fmt" "os" + "path/filepath" "strings" + "time" + "go-simpler.org/env" + "lol.mleku.dev" + "lol.mleku.dev/chk" "lol.mleku.dev/log" + "next.orly.dev/pkg/acl" + "next.orly.dev/pkg/acl/server" + "next.orly.dev/pkg/database" + databasegrpc "next.orly.dev/pkg/database/grpc" ) +// Config holds the ACL server configuration. +type Config struct { + // Listen is the gRPC server listen address + Listen string `env:"ORLY_ACL_LISTEN" default:"127.0.0.1:50052" usage:"gRPC server listen address"` + + // LogLevel is the logging level + LogLevel string `env:"ORLY_ACL_LOG_LEVEL" default:"info" usage:"log level (trace, debug, info, warn, error)"` + + // Database configuration + DBType string `env:"ORLY_ACL_DB_TYPE" default:"grpc" usage:"database type: badger or grpc"` + GRPCDBServer string `env:"ORLY_ACL_GRPC_DB_SERVER" usage:"gRPC database server address (when DB_TYPE=grpc)"` + DataDir string `env:"ORLY_DATA_DIR" usage:"database data directory (when DB_TYPE=badger)"` + + // Badger configuration (when DB_TYPE=badger) + BlockCacheMB int `env:"ORLY_DB_BLOCK_CACHE_MB" default:"256" usage:"block cache size in MB"` + IndexCacheMB int `env:"ORLY_DB_INDEX_CACHE_MB" default:"128" usage:"index cache size in MB"` + ZSTDLevel int `env:"ORLY_DB_ZSTD_LEVEL" default:"3" usage:"ZSTD compression level"` + QueryCacheSizeMB int `env:"ORLY_DB_QUERY_CACHE_SIZE_MB" default:"64" usage:"query cache size in MB"` + QueryCacheMaxAge time.Duration `env:"ORLY_DB_QUERY_CACHE_MAX_AGE" default:"5m" usage:"query cache max age"` + QueryCacheDisabled bool `env:"ORLY_DB_QUERY_CACHE_DISABLED" default:"false" usage:"disable query cache"` + SerialCachePubkeys int `env:"ORLY_SERIAL_CACHE_PUBKEYS" default:"100000" usage:"serial cache pubkeys capacity"` + SerialCacheEventIds int `env:"ORLY_SERIAL_CACHE_EVENT_IDS" default:"500000" usage:"serial cache event IDs capacity"` + + // ACL configuration + Owners string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs"` + Admins string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"` + BootstrapRelays string `env:"ORLY_BOOTSTRAP_RELAYS" usage:"comma-separated list of bootstrap relays"` + RelayAddresses string `env:"ORLY_RELAY_ADDRESSES" usage:"comma-separated list of relay addresses (self)"` + + // Follows ACL configuration + FollowListFrequency time.Duration `env:"ORLY_FOLLOW_LIST_FREQUENCY" default:"1h" usage:"follow list sync frequency"` + FollowsThrottleEnabled bool `env:"ORLY_FOLLOWS_THROTTLE_ENABLED" default:"false" usage:"enable progressive throttle for non-followed users"` + FollowsThrottlePerEvent time.Duration `env:"ORLY_FOLLOWS_THROTTLE_PER_EVENT" default:"25ms" usage:"throttle delay increment per event"` + FollowsThrottleMaxDelay time.Duration `env:"ORLY_FOLLOWS_THROTTLE_MAX_DELAY" default:"60s" usage:"maximum throttle delay"` +} + // Run executes the acl subcommand. func Run(args []string) { var driver string @@ -77,13 +123,126 @@ func Run(args []string) { os.Exit(1) } - runACLServer(driver, args) + runACLServer(driver) } -func runACLServer(driver string, args []string) { - log.I.F("ACL server with driver=%s not yet implemented via unified binary", driver) - log.I.F("Use the standalone binary: orly-acl-%s", driver) - os.Exit(1) +func runACLServer(driver string) { + cfg := loadConfig() + + // Set log level + lol.SetLogLevel(cfg.LogLevel) + log.I.F("orly acl --driver=%s starting with log level: %s", driver, cfg.LogLevel) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize database (direct Badger or gRPC client) + var db database.Database + var err error + var ownsDB bool + + if cfg.DBType == "grpc" { + // Use gRPC database client + log.I.F("connecting to gRPC database server at %s", cfg.GRPCDBServer) + db, err = databasegrpc.New(ctx, &databasegrpc.ClientConfig{ + ServerAddress: cfg.GRPCDBServer, + ConnectTimeout: 30 * time.Second, + }) + if chk.E(err) { + log.E.F("failed to connect to gRPC database: %v", err) + os.Exit(1) + } + ownsDB = false // gRPC client doesn't own the database + } else { + // Use direct Badger database + dbCfg := &database.DatabaseConfig{ + DataDir: cfg.DataDir, + LogLevel: cfg.LogLevel, + BlockCacheMB: cfg.BlockCacheMB, + IndexCacheMB: cfg.IndexCacheMB, + QueryCacheSizeMB: cfg.QueryCacheSizeMB, + QueryCacheMaxAge: cfg.QueryCacheMaxAge, + QueryCacheDisabled: cfg.QueryCacheDisabled, + SerialCachePubkeys: cfg.SerialCachePubkeys, + SerialCacheEventIds: cfg.SerialCacheEventIds, + ZSTDLevel: cfg.ZSTDLevel, + } + + log.I.F("initializing Badger database at %s", cfg.DataDir) + db, err = database.NewWithConfig(ctx, cancel, dbCfg) + if chk.E(err) { + log.E.F("failed to initialize database: %v", err) + os.Exit(1) + } + ownsDB = true + } + + // Wait for database to be ready + log.I.F("waiting for database to be ready...") + <-db.Ready() + log.I.F("database ready") + + // Create server config + serverCfg := &server.Config{ + Listen: cfg.Listen, + ACLMode: driver, // Use the driver name as the ACL mode + LogLevel: cfg.LogLevel, + Owners: splitList(cfg.Owners), + Admins: splitList(cfg.Admins), + BootstrapRelays: splitList(cfg.BootstrapRelays), + RelayAddresses: splitList(cfg.RelayAddresses), + FollowListFrequency: cfg.FollowListFrequency, + FollowsThrottleEnabled: cfg.FollowsThrottleEnabled, + FollowsThrottlePerEvent: cfg.FollowsThrottlePerEvent, + FollowsThrottleMaxDelay: cfg.FollowsThrottleMaxDelay, + } + + // Create and configure server + srv := server.New(db, serverCfg, ownsDB) + if err := srv.ConfigureACL(ctx); chk.E(err) { + log.E.F("failed to configure ACL: %v", err) + os.Exit(1) + } + + // Start server + if err := srv.ListenAndServe(ctx, cancel); err != nil { + log.E.F("gRPC server error: %v", err) + } +} + +func loadConfig() *Config { + cfg := &Config{} + if err := env.Load(cfg, nil); chk.E(err) { + log.E.F("failed to load config: %v", err) + os.Exit(1) + } + + // Set default data directory if not specified + if cfg.DataDir == "" { + home, err := os.UserHomeDir() + if chk.E(err) { + log.E.F("failed to get home directory: %v", err) + os.Exit(1) + } + cfg.DataDir = filepath.Join(home, ".local", "share", "ORLY") + } + + // Ensure data directory exists (for badger mode) + if cfg.DBType == "badger" { + if err := os.MkdirAll(cfg.DataDir, 0700); chk.E(err) { + log.E.F("failed to create data directory %s: %v", cfg.DataDir, err) + os.Exit(1) + } + } + + return cfg +} + +func splitList(s string) []string { + if s == "" { + return nil + } + return strings.Split(s, ",") } func printACLHelp() { @@ -93,24 +252,28 @@ Usage: orly acl --driver=NAME [options] Options: - --driver=NAME Select ACL driver (follows, managed, curation) + --driver=NAME Select ACL driver (follows, managed, curating) --list-drivers List available ACL drivers --help, -h Show this help message Drivers: follows Whitelist based on admin follow lists managed NIP-86 fine-grained access control - curation Rate-limited trust tier system + curating Rate-limited trust tier system Environment variables: - ORLY_ACL_LISTEN gRPC server listen address + ORLY_ACL_LISTEN gRPC server listen address (default: 127.0.0.1:50052) ORLY_ACL_LOG_LEVEL Logging level - ORLY_ACL_DB_TYPE Database type (grpc or badger) - ORLY_ACL_GRPC_DB_SERVER gRPC database server address + ORLY_ACL_DB_TYPE Database type: badger or grpc + ORLY_ACL_GRPC_DB_SERVER gRPC database server address (when DB_TYPE=grpc) + ORLY_DATA_DIR Database data directory (when DB_TYPE=badger) ORLY_OWNERS Comma-separated owner npubs ORLY_ADMINS Comma-separated admin npubs + ORLY_BOOTSTRAP_RELAYS Comma-separated bootstrap relays + ORLY_FOLLOW_LIST_FREQUENCY Follow list sync frequency (default: 1h) Examples: orly acl --driver=follows Run follows ACL server + orly acl --driver=managed Run managed ACL server (NIP-86) orly acl --list-drivers List available drivers`) } diff --git a/cmd/orly/relay/relay.go b/cmd/orly/relay/relay.go index f333d6e..b63357c 100644 --- a/cmd/orly/relay/relay.go +++ b/cmd/orly/relay/relay.go @@ -5,9 +5,12 @@ package relay import ( "fmt" - "os" + "lol.mleku.dev/chk" "lol.mleku.dev/log" + "next.orly.dev/app/config" + relaycore "next.orly.dev/pkg/relay" + "next.orly.dev/pkg/version" ) // Run executes the relay subcommand. @@ -25,11 +28,17 @@ func Run(args []string) { return } - // For now, redirect to the main binary - // In the future, this will contain the relay logic directly - log.I.F("Relay not yet implemented via unified binary subcommand") - log.I.F("Use the main binary: orly (in project root)") - os.Exit(1) + // Load configuration + cfg, err := config.New() + if chk.T(err) { + return + } + log.I.F("starting %s %s (unified binary)", cfg.AppName, version.V) + + // Use shared relay startup logic + if err := relaycore.RunWithSignals(cfg); err != nil { + log.F.F("relay error: %v", err) + } } func printRelayHelp() { @@ -51,17 +60,27 @@ The relay is the main Nostr server that: Environment variables: ORLY_PORT Server port (default: 3334) ORLY_LOG_LEVEL Logging level - ORLY_DB_TYPE Database type (badger, neo4j, grpc) - ORLY_ACL_MODE ACL mode (none, follows, managed) + ORLY_DB_TYPE Database type: badger, neo4j, grpc (default: badger) + ORLY_ACL_MODE ACL mode: none, follows, managed, curating (default: none) ORLY_TLS_DOMAINS Let's Encrypt domains ORLY_AUTH_TO_WRITE Require auth for writes -For split-mode deployment: - ORLY_DB_TYPE=grpc Use gRPC database server - ORLY_DB_GRPC_SERVER gRPC database server address - ORLY_ACL_GRPC_SERVER gRPC ACL server address +gRPC Backend Configuration (for split-mode deployment): + ORLY_DB_TYPE=grpc Use remote gRPC database server + ORLY_GRPC_SERVER Database server address (default: 127.0.0.1:50051) + + ORLY_ACL_TYPE=grpc Use remote gRPC ACL server + ORLY_GRPC_ACL_SERVER ACL server address (default: 127.0.0.1:50052) + + ORLY_SYNC_TYPE=grpc Use remote gRPC sync services + ORLY_GRPC_SYNC_NEGENTROPY Negentropy server address (default: 127.0.0.1:50056) + ORLY_GRPC_SYNC_DISTRIBUTED Distributed sync address (default: 127.0.0.1:50053) + ORLY_GRPC_SYNC_CLUSTER Cluster sync address (default: 127.0.0.1:50054) -Example: - orly Start the relay (default mode) - orly relay Start the relay (explicit)`) +Examples: + orly Start relay (monolithic mode) + orly relay Start relay (explicit) + ORLY_DB_TYPE=grpc orly Connect to gRPC database at 127.0.0.1:50051 + ORLY_ACL_TYPE=grpc orly Connect to gRPC ACL at 127.0.0.1:50052 + ORLY_DB_TYPE=grpc ORLY_ACL_TYPE=grpc orly Full split mode`) } diff --git a/main.go b/main.go index 5ad46e5..f9445eb 100644 --- a/main.go +++ b/main.go @@ -3,45 +3,30 @@ package main import ( "context" "fmt" - "net/http" - pp "net/http/pprof" "os" "os/exec" - "os/signal" "path/filepath" "runtime" - "runtime/debug" "strings" "sync" - "syscall" "time" "github.com/adrg/xdg" - "github.com/pkg/profile" "golang.org/x/term" "lol.mleku.dev/chk" "lol.mleku.dev/log" "next.orly.dev/app" "next.orly.dev/app/branding" "next.orly.dev/app/config" - "next.orly.dev/pkg/acl" - aclgrpc "next.orly.dev/pkg/acl/grpc" - negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc" "git.mleku.dev/mleku/nostr/crypto/keys" "git.mleku.dev/mleku/nostr/encoders/bech32encoding" "next.orly.dev/pkg/database" - _ "next.orly.dev/pkg/database/grpc" // Import for grpc factory registration - neo4jdb "next.orly.dev/pkg/neo4j" // Import for neo4j factory and type "git.mleku.dev/mleku/nostr/encoders/hex" - "next.orly.dev/pkg/ratelimit" - "next.orly.dev/pkg/utils/interrupt" + "next.orly.dev/pkg/relay" "next.orly.dev/pkg/version" ) func main() { - runtime.GOMAXPROCS(128) - debug.SetGCPercent(10) - // Handle 'version' subcommand early, before any other initialization if config.VersionRequested() { fmt.Println(version.V) @@ -348,513 +333,16 @@ func main() { fmt.Println("") } - // Ensure profiling is stopped on interrupts (SIGINT/SIGTERM) as well as on normal exit - var profileStopOnce sync.Once - profileStop := func() {} - switch cfg.Pprof { - case "cpu": - if cfg.PprofPath != "" { - prof := profile.Start( - profile.CPUProfile, profile.ProfilePath(cfg.PprofPath), - ) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("cpu profiling stopped and flushed") - }, - ) - } - defer profileStop() - } else { - prof := profile.Start(profile.CPUProfile) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("cpu profiling stopped and flushed") - }, - ) - } - defer profileStop() - } - case "memory": - if cfg.PprofPath != "" { - prof := profile.Start( - profile.MemProfile, profile.MemProfileRate(32), - profile.ProfilePath(cfg.PprofPath), - ) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("memory profiling stopped and flushed") - }, - ) - } - defer profileStop() - } else { - prof := profile.Start(profile.MemProfile) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("memory profiling stopped and flushed") - }, - ) - } - defer profileStop() - } - case "allocation": - if cfg.PprofPath != "" { - prof := profile.Start( - profile.MemProfileAllocs, profile.MemProfileRate(32), - profile.ProfilePath(cfg.PprofPath), - ) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("allocation profiling stopped and flushed") - }, - ) - } - defer profileStop() - } else { - prof := profile.Start(profile.MemProfileAllocs) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("allocation profiling stopped and flushed") - }, - ) - } - defer profileStop() - } - case "heap": - if cfg.PprofPath != "" { - prof := profile.Start( - profile.MemProfileHeap, profile.ProfilePath(cfg.PprofPath), - ) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("heap profiling stopped and flushed") - }, - ) - } - defer profileStop() - } else { - prof := profile.Start(profile.MemProfileHeap) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("heap profiling stopped and flushed") - }, - ) - } - defer profileStop() - } - case "mutex": - if cfg.PprofPath != "" { - prof := profile.Start( - profile.MutexProfile, profile.ProfilePath(cfg.PprofPath), - ) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("mutex profiling stopped and flushed") - }, - ) - } - defer profileStop() - } else { - prof := profile.Start(profile.MutexProfile) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("mutex profiling stopped and flushed") - }, - ) - } - defer profileStop() - } - case "threadcreate": - if cfg.PprofPath != "" { - prof := profile.Start( - profile.ThreadcreationProfile, - profile.ProfilePath(cfg.PprofPath), - ) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("threadcreate profiling stopped and flushed") - }, - ) - } - defer profileStop() - } else { - prof := profile.Start(profile.ThreadcreationProfile) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("threadcreate profiling stopped and flushed") - }, - ) - } - defer profileStop() - } - case "goroutine": - if cfg.PprofPath != "" { - prof := profile.Start( - profile.GoroutineProfile, profile.ProfilePath(cfg.PprofPath), - ) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("goroutine profiling stopped and flushed") - }, - ) - } - defer profileStop() - } else { - prof := profile.Start(profile.GoroutineProfile) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("goroutine profiling stopped and flushed") - }, - ) - } - defer profileStop() - } - case "block": - if cfg.PprofPath != "" { - prof := profile.Start( - profile.BlockProfile, profile.ProfilePath(cfg.PprofPath), - ) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("block profiling stopped and flushed") - }, - ) - } - defer profileStop() - } else { - prof := profile.Start(profile.BlockProfile) - profileStop = func() { - profileStopOnce.Do( - func() { - prof.Stop() - log.I.F("block profiling stopped and flushed") - }, - ) - } - defer profileStop() - } - - } - - // Register a handler so profiling is stopped when an interrupt is received - interrupt.AddHandler( - func() { - log.I.F("interrupt received: stopping profiling") - profileStop() - }, - ) - ctx, cancel := context.WithCancel(context.Background()) - var db database.Database - log.I.F("initializing %s database at %s", cfg.DBType, cfg.DataDir) - if db, err = database.NewDatabaseWithConfig( - ctx, cancel, cfg.DBType, makeDatabaseConfig(cfg), - ); chk.E(err) { - os.Exit(1) + // Start the relay using shared startup logic + if err := relay.RunWithSignals(cfg); err != nil { + log.F.F("relay error: %v", err) } - log.I.F("%s database initialized successfully", cfg.DBType) - - // Initialize ACL - either remote gRPC or in-process - aclType, aclServerAddr, aclConnTimeout := cfg.GetGRPCACLConfigValues() - if aclType == "grpc" { - // Use remote ACL server via gRPC - log.I.F("connecting to gRPC ACL server at %s", aclServerAddr) - aclClient, aclErr := aclgrpc.New(ctx, &aclgrpc.ClientConfig{ - ServerAddress: aclServerAddr, - ConnectTimeout: aclConnTimeout, - }) - if chk.E(aclErr) { - log.E.F("failed to connect to gRPC ACL server: %v", aclErr) - os.Exit(1) - } - // Wait for ACL server to be ready - select { - case <-aclClient.Ready(): - log.I.F("gRPC ACL client connected, mode: %s", aclClient.Type()) - case <-time.After(30 * time.Second): - log.E.F("timeout waiting for gRPC ACL server") - os.Exit(1) - } - // Register and activate the gRPC client as the ACL backend - acl.Registry.RegisterAndActivate(aclClient) - } else { - // Use in-process ACL (existing behavior) - acl.Registry.SetMode(cfg.ACLMode) - if err = acl.Registry.Configure(cfg, db, ctx); chk.E(err) { - os.Exit(1) - } - acl.Registry.Syncer() - } - - // Initialize negentropy client if enabled (gRPC mode) - syncType, _, _, _, negentropyAddr, syncTimeout, negentropyEnabled := cfg.GetGRPCSyncConfigValues() - if negentropyEnabled && syncType == "grpc" && negentropyAddr != "" { - log.I.F("connecting to gRPC negentropy server at %s", negentropyAddr) - negClient, negErr := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{ - ServerAddress: negentropyAddr, - ConnectTimeout: syncTimeout, - }) - if negErr != nil { - log.W.F("failed to connect to gRPC negentropy server: %v (NIP-77 disabled)", negErr) - } else { - // Wait for negentropy server to be ready - select { - case <-negClient.Ready(): - log.I.F("gRPC negentropy client connected") - app.SetNegentropyClient(negClient) - case <-time.After(30 * time.Second): - log.W.F("timeout waiting for gRPC negentropy server (NIP-77 disabled)") - } - } - } else if negentropyEnabled { - log.I.F("negentropy enabled but sync type is %q, skipping gRPC client", syncType) - } - - // Create rate limiter if enabled - var limiter *ratelimit.Limiter - rateLimitEnabled, targetMB, - writeKp, writeKi, writeKd, - readKp, readKi, readKd, - maxWriteMs, maxReadMs, - writeTarget, readTarget, - emergencyThreshold, recoveryThreshold, - emergencyMaxMs := cfg.GetRateLimitConfigValues() - - if rateLimitEnabled { - // Auto-detect memory target if set to 0 (default) - if targetMB == 0 { - var memErr error - targetMB, memErr = ratelimit.CalculateTargetMemoryMB(targetMB) - if memErr != nil { - log.F.F("FATAL: %v", memErr) - log.F.F("There is not enough memory to run this relay in this environment.") - log.F.F("Available: %dMB, Required minimum: %dMB", - ratelimit.DetectAvailableMemoryMB(), ratelimit.MinimumMemoryMB) - os.Exit(1) - } - stats := ratelimit.GetMemoryStats(targetMB) - // Calculate what 66% would be to determine if we hit the cap - calculated66 := int(float64(stats.AvailableMB) * ratelimit.AutoDetectMemoryFraction) - if calculated66 > ratelimit.DefaultMaxMemoryMB { - log.I.F("memory auto-detected: total=%dMB, available=%dMB, target=%dMB (capped at default max, 66%% would be %dMB)", - stats.TotalMB, stats.AvailableMB, targetMB, calculated66) - } else { - log.I.F("memory auto-detected: total=%dMB, available=%dMB, target=%dMB (66%% of available)", - stats.TotalMB, stats.AvailableMB, targetMB) - } - } else { - // Validate explicitly configured target - _, memErr := ratelimit.CalculateTargetMemoryMB(targetMB) - if memErr != nil { - log.F.F("FATAL: %v", memErr) - log.F.F("Configured target memory %dMB is below minimum required %dMB.", - targetMB, ratelimit.MinimumMemoryMB) - os.Exit(1) - } - } - - rlConfig := ratelimit.NewConfigFromValues( - rateLimitEnabled, targetMB, - writeKp, writeKi, writeKd, - readKp, readKi, readKd, - maxWriteMs, maxReadMs, - writeTarget, readTarget, - emergencyThreshold, recoveryThreshold, - emergencyMaxMs, - ) - - // Create appropriate monitor based on database type - if badgerDB, ok := db.(*database.D); ok { - limiter = ratelimit.NewBadgerLimiter(rlConfig, badgerDB.DB) - // Set the rate limiter on the database for import operations - badgerDB.SetRateLimiter(limiter) - log.I.F("rate limiter configured for Badger backend (target: %dMB)", targetMB) - } else if n4jDB, ok := db.(*neo4jdb.N); ok { - // Create Neo4j rate limiter with access to driver and querySem - limiter = ratelimit.NewNeo4jLimiter( - rlConfig, - n4jDB.Driver(), - n4jDB.QuerySem(), - n4jDB.MaxConcurrentQueries(), - ) - log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB) - } else { - // For other backends, create a disabled limiter - limiter = ratelimit.NewDisabledLimiter() - log.I.F("rate limiter disabled for unknown backend") - } - } else { - limiter = ratelimit.NewDisabledLimiter() - } - - // Start HTTP pprof server if enabled - if cfg.PprofHTTP { - pprofAddr := fmt.Sprintf("%s:%d", cfg.Listen, 6060) - pprofMux := http.NewServeMux() - pprofMux.HandleFunc("/debug/pprof/", pp.Index) - pprofMux.HandleFunc("/debug/pprof/cmdline", pp.Cmdline) - pprofMux.HandleFunc("/debug/pprof/profile", pp.Profile) - pprofMux.HandleFunc("/debug/pprof/symbol", pp.Symbol) - pprofMux.HandleFunc("/debug/pprof/trace", pp.Trace) - for _, p := range []string{ - "allocs", "block", "goroutine", "heap", "mutex", "threadcreate", - } { - pprofMux.Handle("/debug/pprof/"+p, pp.Handler(p)) - } - ppSrv := &http.Server{Addr: pprofAddr, Handler: pprofMux} - go func() { - log.I.F("pprof server listening on %s", pprofAddr) - if err := ppSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.E.F("pprof server error: %v", err) - } - }() - go func() { - <-ctx.Done() - shutdownCtx, cancelShutdown := context.WithTimeout( - context.Background(), 2*time.Second, - ) - defer cancelShutdown() - _ = ppSrv.Shutdown(shutdownCtx) - }() - } - - // Start health check HTTP server if configured - var healthSrv *http.Server - if cfg.HealthPort > 0 { - mux := http.NewServeMux() - mux.HandleFunc( - "/healthz", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte("ok")) - log.I.F("health check ok") - }, - ) - // Optional shutdown endpoint to gracefully stop the process so profiling defers run - if cfg.EnableShutdown { - mux.HandleFunc( - "/shutdown", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte("shutting down")) - log.I.F("shutdown requested via /shutdown; sending SIGINT to self") - go func() { - p, _ := os.FindProcess(os.Getpid()) - _ = p.Signal(os.Interrupt) - }() - }, - ) - } - healthSrv = &http.Server{ - Addr: fmt.Sprintf( - "%s:%d", cfg.Listen, cfg.HealthPort, - ), Handler: mux, - } - go func() { - log.I.F("health check server listening on %s", healthSrv.Addr) - if err := healthSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.E.F("health server error: %v", err) - } - }() - go func() { - <-ctx.Done() - shutdownCtx, cancelShutdown := context.WithTimeout( - context.Background(), 2*time.Second, - ) - defer cancelShutdown() - _ = healthSrv.Shutdown(shutdownCtx) - }() - } - - quit := app.Run(ctx, cfg, db, limiter) - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) - for { - select { - case <-sigs: - fmt.Printf("\r") - log.I.F("received shutdown signal, starting graceful shutdown") - cancel() // This will trigger HTTP server shutdown - <-quit // Wait for HTTP server to shut down - chk.E(db.Close()) - log.I.F("exiting") - return - case <-quit: - log.I.F("application quit signal received") - cancel() - chk.E(db.Close()) - log.I.F("exiting") - return - } - } - // log.I.F("exiting") } // makeDatabaseConfig creates a database.DatabaseConfig from the app config. -// This helper function extracts all database-specific configuration values -// and constructs the appropriate struct for the database package. +// Delegates to the shared relay package implementation. func makeDatabaseConfig(cfg *config.C) *database.DatabaseConfig { - dataDir, logLevel, - blockCacheMB, indexCacheMB, queryCacheSizeMB, - queryCacheMaxAge, - queryCacheDisabled, - serialCachePubkeys, serialCacheEventIds, - zstdLevel, - neo4jURI, neo4jUser, neo4jPassword, - neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit := cfg.GetDatabaseConfigValues() - - // Get gRPC client configuration - grpcServerAddress, grpcConnectTimeout := cfg.GetGRPCConfigValues() - - return &database.DatabaseConfig{ - DataDir: dataDir, - LogLevel: logLevel, - BlockCacheMB: blockCacheMB, - IndexCacheMB: indexCacheMB, - QueryCacheSizeMB: queryCacheSizeMB, - QueryCacheMaxAge: queryCacheMaxAge, - QueryCacheDisabled: queryCacheDisabled, - SerialCachePubkeys: serialCachePubkeys, - SerialCacheEventIds: serialCacheEventIds, - ZSTDLevel: zstdLevel, - Neo4jURI: neo4jURI, - Neo4jUser: neo4jUser, - Neo4jPassword: neo4jPassword, - Neo4jMaxConnPoolSize: neo4jMaxConnPoolSize, - Neo4jFetchSize: neo4jFetchSize, - Neo4jMaxTxRetrySeconds: neo4jMaxTxRetrySeconds, - Neo4jQueryResultLimit: neo4jQueryResultLimit, - // gRPC client settings - GRPCServerAddress: grpcServerAddress, - GRPCConnectTimeout: grpcConnectTimeout, - } + return relay.MakeDatabaseConfig(cfg) } // openBrowser opens the specified URL in the default browser. diff --git a/pkg/relay/startup.go b/pkg/relay/startup.go new file mode 100644 index 0000000..2ccddbd --- /dev/null +++ b/pkg/relay/startup.go @@ -0,0 +1,517 @@ +//go:build !(js && wasm) + +// Package relay provides shared startup logic for running the ORLY relay. +// This allows both the root binary and the unified binary to share +// the same initialization code for monolithic deployments. +package relay + +import ( + "context" + "fmt" + "net/http" + pp "net/http/pprof" + "os" + "os/signal" + "runtime" + "runtime/debug" + "sync" + "syscall" + "time" + + "github.com/pkg/profile" + "lol.mleku.dev/chk" + "lol.mleku.dev/log" + "next.orly.dev/app" + "next.orly.dev/app/config" + "next.orly.dev/pkg/acl" + aclgrpc "next.orly.dev/pkg/acl/grpc" + "next.orly.dev/pkg/database" + _ "next.orly.dev/pkg/database/grpc" // Import for grpc factory registration + neo4jdb "next.orly.dev/pkg/neo4j" + "next.orly.dev/pkg/ratelimit" + negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc" + "next.orly.dev/pkg/utils/interrupt" +) + +// StartupResult holds the initialized components from Startup. +type StartupResult struct { + Ctx context.Context + Cancel context.CancelFunc + DB database.Database + Limiter *ratelimit.Limiter + Quit chan struct{} +} + +// Startup initializes the database, ACL, rate limiter, and starts the relay server. +// It returns a StartupResult containing all initialized components. +func Startup(cfg *config.C) (*StartupResult, error) { + runtime.GOMAXPROCS(128) + debug.SetGCPercent(10) + + // Setup profiling + profileStop := setupProfiling(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + + // Initialize database + log.I.F("initializing %s database at %s", cfg.DBType, cfg.DataDir) + db, err := database.NewDatabaseWithConfig( + ctx, cancel, cfg.DBType, MakeDatabaseConfig(cfg), + ) + if chk.E(err) { + cancel() + return nil, fmt.Errorf("failed to initialize database: %w", err) + } + log.I.F("%s database initialized successfully", cfg.DBType) + + // Initialize ACL + if err := initializeACL(ctx, cfg, db); err != nil { + db.Close() + cancel() + return nil, fmt.Errorf("failed to initialize ACL: %w", err) + } + + // Initialize negentropy client if enabled + initializeNegentropy(ctx, cfg) + + // Create rate limiter + limiter := createRateLimiter(cfg, db) + + // Start pprof HTTP server if enabled + startPprofServer(ctx, cfg) + + // Start health check server if configured + startHealthServer(ctx, cfg) + + // Start the relay + quit := app.Run(ctx, cfg, db, limiter) + + // Store profileStop for cleanup + result := &StartupResult{ + Ctx: ctx, + Cancel: cancel, + DB: db, + Limiter: limiter, + Quit: quit, + } + + // Register profile stop handler + interrupt.AddHandler(func() { + log.I.F("interrupt received: stopping profiling") + profileStop() + }) + + return result, nil +} + +// RunWithSignals calls Startup and blocks on signals, handling graceful shutdown. +// This is the main entry point for running the relay as a standalone process. +func RunWithSignals(cfg *config.C) error { + result, err := Startup(cfg) + if err != nil { + return err + } + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) + + for { + select { + case <-sigs: + fmt.Printf("\r") + log.I.F("received shutdown signal, starting graceful shutdown") + result.Cancel() + <-result.Quit + chk.E(result.DB.Close()) + log.I.F("exiting") + return nil + case <-result.Quit: + log.I.F("application quit signal received") + result.Cancel() + chk.E(result.DB.Close()) + log.I.F("exiting") + return nil + } + } +} + +// setupProfiling configures profiling based on config and returns a stop function. +func setupProfiling(cfg *config.C) func() { + var profileStopOnce sync.Once + profileStop := func() {} + + switch cfg.Pprof { + case "cpu": + var prof interface{ Stop() } + if cfg.PprofPath != "" { + prof = profile.Start(profile.CPUProfile, profile.ProfilePath(cfg.PprofPath)) + } else { + prof = profile.Start(profile.CPUProfile) + } + profileStop = func() { + profileStopOnce.Do(func() { + prof.Stop() + log.I.F("cpu profiling stopped and flushed") + }) + } + + case "memory": + var prof interface{ Stop() } + if cfg.PprofPath != "" { + prof = profile.Start(profile.MemProfile, profile.MemProfileRate(32), profile.ProfilePath(cfg.PprofPath)) + } else { + prof = profile.Start(profile.MemProfile) + } + profileStop = func() { + profileStopOnce.Do(func() { + prof.Stop() + log.I.F("memory profiling stopped and flushed") + }) + } + + case "allocation": + var prof interface{ Stop() } + if cfg.PprofPath != "" { + prof = profile.Start(profile.MemProfileAllocs, profile.MemProfileRate(32), profile.ProfilePath(cfg.PprofPath)) + } else { + prof = profile.Start(profile.MemProfileAllocs) + } + profileStop = func() { + profileStopOnce.Do(func() { + prof.Stop() + log.I.F("allocation profiling stopped and flushed") + }) + } + + case "heap": + var prof interface{ Stop() } + if cfg.PprofPath != "" { + prof = profile.Start(profile.MemProfileHeap, profile.ProfilePath(cfg.PprofPath)) + } else { + prof = profile.Start(profile.MemProfileHeap) + } + profileStop = func() { + profileStopOnce.Do(func() { + prof.Stop() + log.I.F("heap profiling stopped and flushed") + }) + } + + case "mutex": + var prof interface{ Stop() } + if cfg.PprofPath != "" { + prof = profile.Start(profile.MutexProfile, profile.ProfilePath(cfg.PprofPath)) + } else { + prof = profile.Start(profile.MutexProfile) + } + profileStop = func() { + profileStopOnce.Do(func() { + prof.Stop() + log.I.F("mutex profiling stopped and flushed") + }) + } + + case "threadcreate": + var prof interface{ Stop() } + if cfg.PprofPath != "" { + prof = profile.Start(profile.ThreadcreationProfile, profile.ProfilePath(cfg.PprofPath)) + } else { + prof = profile.Start(profile.ThreadcreationProfile) + } + profileStop = func() { + profileStopOnce.Do(func() { + prof.Stop() + log.I.F("threadcreate profiling stopped and flushed") + }) + } + + case "goroutine": + var prof interface{ Stop() } + if cfg.PprofPath != "" { + prof = profile.Start(profile.GoroutineProfile, profile.ProfilePath(cfg.PprofPath)) + } else { + prof = profile.Start(profile.GoroutineProfile) + } + profileStop = func() { + profileStopOnce.Do(func() { + prof.Stop() + log.I.F("goroutine profiling stopped and flushed") + }) + } + + case "block": + var prof interface{ Stop() } + if cfg.PprofPath != "" { + prof = profile.Start(profile.BlockProfile, profile.ProfilePath(cfg.PprofPath)) + } else { + prof = profile.Start(profile.BlockProfile) + } + profileStop = func() { + profileStopOnce.Do(func() { + prof.Stop() + log.I.F("block profiling stopped and flushed") + }) + } + } + + return profileStop +} + +// initializeACL sets up ACL - either remote gRPC or in-process. +func initializeACL(ctx context.Context, cfg *config.C, db database.Database) error { + aclType, aclServerAddr, aclConnTimeout := cfg.GetGRPCACLConfigValues() + + if aclType == "grpc" { + // Use remote ACL server via gRPC + log.I.F("connecting to gRPC ACL server at %s", aclServerAddr) + aclClient, err := aclgrpc.New(ctx, &aclgrpc.ClientConfig{ + ServerAddress: aclServerAddr, + ConnectTimeout: aclConnTimeout, + }) + if chk.E(err) { + return fmt.Errorf("failed to connect to gRPC ACL server: %w", err) + } + + // Wait for ACL server to be ready + select { + case <-aclClient.Ready(): + log.I.F("gRPC ACL client connected, mode: %s", aclClient.Type()) + case <-time.After(30 * time.Second): + return fmt.Errorf("timeout waiting for gRPC ACL server") + } + + // Register and activate the gRPC client as the ACL backend + acl.Registry.RegisterAndActivate(aclClient) + } else { + // Use in-process ACL + acl.Registry.SetMode(cfg.ACLMode) + if err := acl.Registry.Configure(cfg, db, ctx); chk.E(err) { + return err + } + acl.Registry.Syncer() + } + + return nil +} + +// initializeNegentropy sets up the negentropy gRPC client if enabled. +func initializeNegentropy(ctx context.Context, cfg *config.C) { + syncType, _, _, _, negentropyAddr, syncTimeout, negentropyEnabled := cfg.GetGRPCSyncConfigValues() + + if negentropyEnabled && syncType == "grpc" && negentropyAddr != "" { + log.I.F("connecting to gRPC negentropy server at %s", negentropyAddr) + negClient, err := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{ + ServerAddress: negentropyAddr, + ConnectTimeout: syncTimeout, + }) + if err != nil { + log.W.F("failed to connect to gRPC negentropy server: %v (NIP-77 disabled)", err) + return + } + + // Wait for negentropy server to be ready + select { + case <-negClient.Ready(): + log.I.F("gRPC negentropy client connected") + app.SetNegentropyClient(negClient) + case <-time.After(30 * time.Second): + log.W.F("timeout waiting for gRPC negentropy server (NIP-77 disabled)") + } + } else if negentropyEnabled { + log.I.F("negentropy enabled but sync type is %q, skipping gRPC client", syncType) + } +} + +// createRateLimiter creates and configures the rate limiter. +func createRateLimiter(cfg *config.C, db database.Database) *ratelimit.Limiter { + rateLimitEnabled, targetMB, + writeKp, writeKi, writeKd, + readKp, readKi, readKd, + maxWriteMs, maxReadMs, + writeTarget, readTarget, + emergencyThreshold, recoveryThreshold, + emergencyMaxMs := cfg.GetRateLimitConfigValues() + + if !rateLimitEnabled { + return ratelimit.NewDisabledLimiter() + } + + // Auto-detect memory target if set to 0 + if targetMB == 0 { + var err error + targetMB, err = ratelimit.CalculateTargetMemoryMB(targetMB) + if err != nil { + log.F.F("FATAL: %v", err) + log.F.F("There is not enough memory to run this relay in this environment.") + log.F.F("Available: %dMB, Required minimum: %dMB", + ratelimit.DetectAvailableMemoryMB(), ratelimit.MinimumMemoryMB) + os.Exit(1) + } + stats := ratelimit.GetMemoryStats(targetMB) + calculated66 := int(float64(stats.AvailableMB) * ratelimit.AutoDetectMemoryFraction) + if calculated66 > ratelimit.DefaultMaxMemoryMB { + log.I.F("memory auto-detected: total=%dMB, available=%dMB, target=%dMB (capped at default max, 66%% would be %dMB)", + stats.TotalMB, stats.AvailableMB, targetMB, calculated66) + } else { + log.I.F("memory auto-detected: total=%dMB, available=%dMB, target=%dMB (66%% of available)", + stats.TotalMB, stats.AvailableMB, targetMB) + } + } else { + // Validate explicitly configured target + _, err := ratelimit.CalculateTargetMemoryMB(targetMB) + if err != nil { + log.F.F("FATAL: %v", err) + log.F.F("Configured target memory %dMB is below minimum required %dMB.", + targetMB, ratelimit.MinimumMemoryMB) + os.Exit(1) + } + } + + rlConfig := ratelimit.NewConfigFromValues( + rateLimitEnabled, targetMB, + writeKp, writeKi, writeKd, + readKp, readKi, readKd, + maxWriteMs, maxReadMs, + writeTarget, readTarget, + emergencyThreshold, recoveryThreshold, + emergencyMaxMs, + ) + + // Create appropriate monitor based on database type + if badgerDB, ok := db.(*database.D); ok { + limiter := ratelimit.NewBadgerLimiter(rlConfig, badgerDB.DB) + badgerDB.SetRateLimiter(limiter) + log.I.F("rate limiter configured for Badger backend (target: %dMB)", targetMB) + return limiter + } + + if n4jDB, ok := db.(*neo4jdb.N); ok { + limiter := ratelimit.NewNeo4jLimiter( + rlConfig, + n4jDB.Driver(), + n4jDB.QuerySem(), + n4jDB.MaxConcurrentQueries(), + ) + log.I.F("rate limiter configured for Neo4j backend (target: %dMB)", targetMB) + return limiter + } + + // For other backends, create a disabled limiter + log.I.F("rate limiter disabled for unknown backend") + return ratelimit.NewDisabledLimiter() +} + +// startPprofServer starts the HTTP pprof server if enabled. +func startPprofServer(ctx context.Context, cfg *config.C) { + if !cfg.PprofHTTP { + return + } + + pprofAddr := fmt.Sprintf("%s:%d", cfg.Listen, 6060) + pprofMux := http.NewServeMux() + pprofMux.HandleFunc("/debug/pprof/", pp.Index) + pprofMux.HandleFunc("/debug/pprof/cmdline", pp.Cmdline) + pprofMux.HandleFunc("/debug/pprof/profile", pp.Profile) + pprofMux.HandleFunc("/debug/pprof/symbol", pp.Symbol) + pprofMux.HandleFunc("/debug/pprof/trace", pp.Trace) + for _, p := range []string{"allocs", "block", "goroutine", "heap", "mutex", "threadcreate"} { + pprofMux.Handle("/debug/pprof/"+p, pp.Handler(p)) + } + + ppSrv := &http.Server{Addr: pprofAddr, Handler: pprofMux} + go func() { + log.I.F("pprof server listening on %s", pprofAddr) + if err := ppSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.E.F("pprof server error: %v", err) + } + }() + + go func() { + <-ctx.Done() + shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelShutdown() + _ = ppSrv.Shutdown(shutdownCtx) + }() +} + +// startHealthServer starts the health check HTTP server if configured. +func startHealthServer(ctx context.Context, cfg *config.C) { + if cfg.HealthPort <= 0 { + return + } + + mux := http.NewServeMux() + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + log.I.F("health check ok") + }) + + // Optional shutdown endpoint + if cfg.EnableShutdown { + mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("shutting down")) + log.I.F("shutdown requested via /shutdown; sending SIGINT to self") + go func() { + p, _ := os.FindProcess(os.Getpid()) + _ = p.Signal(os.Interrupt) + }() + }) + } + + healthSrv := &http.Server{ + Addr: fmt.Sprintf("%s:%d", cfg.Listen, cfg.HealthPort), + Handler: mux, + } + + go func() { + log.I.F("health check server listening on %s", healthSrv.Addr) + if err := healthSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.E.F("health server error: %v", err) + } + }() + + go func() { + <-ctx.Done() + shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelShutdown() + _ = healthSrv.Shutdown(shutdownCtx) + }() +} + +// MakeDatabaseConfig creates a database.DatabaseConfig from the app config. +func MakeDatabaseConfig(cfg *config.C) *database.DatabaseConfig { + dataDir, logLevel, + blockCacheMB, indexCacheMB, queryCacheSizeMB, + queryCacheMaxAge, + queryCacheDisabled, + serialCachePubkeys, serialCacheEventIds, + zstdLevel, + neo4jURI, neo4jUser, neo4jPassword, + neo4jMaxConnPoolSize, neo4jFetchSize, neo4jMaxTxRetrySeconds, neo4jQueryResultLimit := cfg.GetDatabaseConfigValues() + + grpcServerAddress, grpcConnectTimeout := cfg.GetGRPCConfigValues() + + return &database.DatabaseConfig{ + DataDir: dataDir, + LogLevel: logLevel, + BlockCacheMB: blockCacheMB, + IndexCacheMB: indexCacheMB, + QueryCacheSizeMB: queryCacheSizeMB, + QueryCacheMaxAge: queryCacheMaxAge, + QueryCacheDisabled: queryCacheDisabled, + SerialCachePubkeys: serialCachePubkeys, + SerialCacheEventIds: serialCacheEventIds, + ZSTDLevel: zstdLevel, + Neo4jURI: neo4jURI, + Neo4jUser: neo4jUser, + Neo4jPassword: neo4jPassword, + Neo4jMaxConnPoolSize: neo4jMaxConnPoolSize, + Neo4jFetchSize: neo4jFetchSize, + Neo4jMaxTxRetrySeconds: neo4jMaxTxRetrySeconds, + Neo4jQueryResultLimit: neo4jQueryResultLimit, + GRPCServerAddress: grpcServerAddress, + GRPCConnectTimeout: grpcConnectTimeout, + } +} diff --git a/pkg/version/version b/pkg/version/version index b398141..7e16869 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.56.9 +v0.57.0