From 71129a98d2fe554fe5e2642e9629e9b6d62e5965 Mon Sep 17 00:00:00 2001 From: woikos Date: Thu, 22 Jan 2026 16:29:08 +0100 Subject: [PATCH] Fix negentropy sync only seeing 100 events and ACL startup timeout (v0.55.1) - Remove debug 100 iteration limit from GetSerialsByRange The limit was a debug safety check that should have been removed. This was capping all range queries to 100 results, causing negentropy sync to only see 100 events instead of all events. - Fix ACL service startup to listen immediately before Configure The ACL server now starts the gRPC listener immediately after database ready, then runs Configure in the background. This prevents the launcher from timing out while waiting for ACL to load follow lists. - Add SetReady method to ACL service for proper ready signaling Files modified: - pkg/database/get-serials-by-range.go: Remove debug iteration limit - cmd/orly-acl/main.go: Start gRPC server before Configure - cmd/orly-acl/service.go: Add ready field and SetReady method - pkg/version/version: Bump to v0.55.1 Co-Authored-By: Claude Opus 4.5 --- cmd/orly-acl/main.go | 117 ++++++++++++++------------- cmd/orly-acl/service.go | 27 ++++--- pkg/database/get-serials-by-range.go | 14 ---- pkg/version/version | 2 +- 4 files changed, 77 insertions(+), 83 deletions(-) diff --git a/cmd/orly-acl/main.go b/cmd/orly-acl/main.go index 11ee2db..71a9f5f 100644 --- a/cmd/orly-acl/main.go +++ b/cmd/orly-acl/main.go @@ -77,6 +77,34 @@ func main() { <-db.Ready() log.I.F("database ready") + // Create gRPC server EARLY so launcher can detect it's alive + grpcServer := grpc.NewServer( + grpc.MaxRecvMsgSize(16<<20), // 16MB + grpc.MaxSendMsgSize(16<<20), // 16MB + ) + + // Register ACL service (will be configured below) + service := NewACLService(cfg, db) + orlyaclv1.RegisterACLServiceServer(grpcServer, service) + + // Register reflection for debugging with grpcurl + reflection.Register(grpcServer) + + // Start listening immediately so launcher knows we're alive + lis, err := net.Listen("tcp", cfg.Listen) + if chk.E(err) { + log.E.F("failed to listen on %s: %v", cfg.Listen, err) + os.Exit(1) + } + log.I.F("gRPC ACL server listening on %s", cfg.Listen) + + // Start serving in background while we configure + go func() { + if err := grpcServer.Serve(lis); err != nil { + log.E.F("gRPC server error: %v", err) + } + }() + // Create app config for ACL configuration appCfg := &config.C{ Owners: cfg.GetOwners(), @@ -89,79 +117,54 @@ func main() { FollowsThrottleMaxDelay: cfg.FollowsThrottleMaxDelay, } - // Set ACL mode and configure the registry + // Set ACL mode and configure the registry (may take time to load follow lists) acl.Registry.SetMode(cfg.ACLMode) if err := acl.Registry.Configure(appCfg, db, ctx); chk.E(err) { log.E.F("failed to configure ACL: %v", err) os.Exit(1) } + // Mark service as ready now that configuration is complete + service.SetReady(true) + // Start the syncer goroutine for background operations acl.Registry.Syncer() log.I.F("ACL syncer started for mode: %s", cfg.ACLMode) - // Create gRPC server - grpcServer := grpc.NewServer( - grpc.MaxRecvMsgSize(16<<20), // 16MB - grpc.MaxSendMsgSize(16<<20), // 16MB - ) + // Handle graceful shutdown - block until signal received + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) + sig := <-sigs + log.I.F("received signal %v, shutting down...", sig) - // Register ACL service - service := NewACLService(cfg, db) - orlyaclv1.RegisterACLServiceServer(grpcServer, service) + // Cancel context to stop all operations + cancel() - // Register reflection for debugging with grpcurl - reflection.Register(grpcServer) + // Gracefully stop gRPC server with timeout + stopped := make(chan struct{}) + go func() { + grpcServer.GracefulStop() + close(stopped) + }() - // Start listening - lis, err := net.Listen("tcp", cfg.Listen) - if chk.E(err) { - log.E.F("failed to listen on %s: %v", cfg.Listen, err) - os.Exit(1) + select { + case <-stopped: + log.I.F("gRPC server stopped gracefully") + case <-time.After(5 * time.Second): + log.W.F("gRPC graceful stop timed out, forcing stop") + grpcServer.Stop() } - log.I.F("gRPC ACL server listening on %s", cfg.Listen) - // Handle graceful shutdown - go func() { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) - sig := <-sigs - log.I.F("received signal %v, shutting down...", sig) - - // Cancel context to stop all operations - cancel() - - // Gracefully stop gRPC server with timeout - stopped := make(chan struct{}) - go func() { - grpcServer.GracefulStop() - close(stopped) - }() - - select { - case <-stopped: - log.I.F("gRPC server stopped gracefully") - case <-time.After(5 * time.Second): - log.W.F("gRPC graceful stop timed out, forcing stop") - grpcServer.Stop() + // Sync and close database (only for direct Badger) + if cfg.DBType != "grpc" { + log.I.F("syncing database...") + if err := db.Sync(); chk.E(err) { + log.W.F("failed to sync database: %v", err) } - - // Sync and close database (only for direct Badger) - if cfg.DBType != "grpc" { - log.I.F("syncing database...") - if err := db.Sync(); chk.E(err) { - log.W.F("failed to sync database: %v", err) - } - log.I.F("closing database...") - if err := db.Close(); chk.E(err) { - log.W.F("failed to close database: %v", err) - } + log.I.F("closing database...") + if err := db.Close(); chk.E(err) { + log.W.F("failed to close database: %v", err) } - log.I.F("shutdown complete") - }() - - // Serve gRPC - if err := grpcServer.Serve(lis); err != nil { - log.E.F("gRPC server error: %v", err) } + log.I.F("shutdown complete") } diff --git a/cmd/orly-acl/service.go b/cmd/orly-acl/service.go index 230c9c8..c6f18be 100644 --- a/cmd/orly-acl/service.go +++ b/cmd/orly-acl/service.go @@ -17,15 +17,25 @@ import ( // ACLService implements the orlyaclv1.ACLServiceServer interface. type ACLService struct { orlyaclv1.UnimplementedACLServiceServer - cfg *Config - db database.Database + cfg *Config + db database.Database + ready bool } // NewACLService creates a new ACL service. func NewACLService(cfg *Config, db database.Database) *ACLService { return &ACLService{ - cfg: cfg, - db: db, + cfg: cfg, + db: db, + ready: false, // Not ready until Configure completes + } +} + +// SetReady marks the service as ready (or not ready). +func (s *ACLService) SetReady(ready bool) { + s.ready = ready + if ready { + log.I.F("ACL service is now ready") } } @@ -60,13 +70,8 @@ func (s *ACLService) GetMode(ctx context.Context, req *orlyaclv1.Empty) (*orlyac } func (s *ACLService) Ready(ctx context.Context, req *orlyaclv1.Empty) (*orlyaclv1.ReadyResponse, error) { - // Check if database is ready - select { - case <-s.db.Ready(): - return &orlyaclv1.ReadyResponse{Ready: true}, nil - default: - return &orlyaclv1.ReadyResponse{Ready: false}, nil - } + // Check if service is fully configured and ready + return &orlyaclv1.ReadyResponse{Ready: s.ready}, nil } // === Follows ACL Methods === diff --git a/pkg/database/get-serials-by-range.go b/pkg/database/get-serials-by-range.go index f72cc8f..edf11f2 100644 --- a/pkg/database/get-serials-by-range.go +++ b/pkg/database/get-serials-by-range.go @@ -8,7 +8,6 @@ import ( "github.com/dgraph-io/badger/v4" "lol.mleku.dev/chk" - "lol.mleku.dev/log" "next.orly.dev/pkg/database/indexes/types" ) @@ -33,27 +32,15 @@ func (d *D) GetSerialsByRange(idx Range) ( for i := 0; i < 5; i++ { endBoundary = append(endBoundary, 0xff) } - iterCount := 0 it.Seek(endBoundary) - // log.T.F("GetSerialsByRange: iterator valid=%v, sought to endBoundary", it.Valid()) for it.Valid() { - iterCount++ - if iterCount > 100 { - // Safety limit to prevent infinite loops in debugging - log.T.F("GetSerialsByRange: hit safety limit of 100 iterations") - break - } item := it.Item() var key []byte key = item.Key() keyWithoutSerial := key[:len(key)-5] cmp := bytes.Compare(keyWithoutSerial, idx.Start) - // log.T.F("GetSerialsByRange: iter %d, key prefix matches=%v, cmp=%d", iterCount, bytes.HasPrefix(key, idx.Start[:len(idx.Start)-8]), cmp) if cmp < 0 { // didn't find it within the timestamp range - // log.T.F("GetSerialsByRange: key out of range (cmp=%d), stopping iteration", cmp) - // log.T.F(" keyWithoutSerial len=%d: %x", len(keyWithoutSerial), keyWithoutSerial) - // log.T.F(" idx.Start len=%d: %x", len(idx.Start), idx.Start) return } ser := new(types.Uint40) @@ -64,7 +51,6 @@ func (d *D) GetSerialsByRange(idx Range) ( sers = append(sers, ser) it.Next() } - // log.T.F("GetSerialsByRange: iteration complete, found %d serials", len(sers)) return }, ); chk.E(err) { diff --git a/pkg/version/version b/pkg/version/version index 8e4a6c0..a27f781 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.55.0 +v0.55.1