Browse Source

Fix negentropy sync only seeing 100 events and ACL startup timeout (v0.55.1)

- Remove debug 100 iteration limit from GetSerialsByRange
  The limit was a debug safety check that should have been removed.
  This was capping all range queries to 100 results, causing negentropy
  sync to only see 100 events instead of all events.

- Fix ACL service startup to listen immediately before Configure
  The ACL server now starts the gRPC listener immediately after database
  ready, then runs Configure in the background. This prevents the
  launcher from timing out while waiting for ACL to load follow lists.

- Add SetReady method to ACL service for proper ready signaling

Files modified:
- pkg/database/get-serials-by-range.go: Remove debug iteration limit
- cmd/orly-acl/main.go: Start gRPC server before Configure
- cmd/orly-acl/service.go: Add ready field and SetReady method
- pkg/version/version: Bump to v0.55.1

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
main
woikos 4 months ago
parent
commit
71129a98d2
No known key found for this signature in database
  1. 117
      cmd/orly-acl/main.go
  2. 27
      cmd/orly-acl/service.go
  3. 14
      pkg/database/get-serials-by-range.go
  4. 2
      pkg/version/version

117
cmd/orly-acl/main.go

@ -77,6 +77,34 @@ func main() {
<-db.Ready() <-db.Ready()
log.I.F("database ready") log.I.F("database ready")
// Create gRPC server EARLY so launcher can detect it's alive
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(16<<20), // 16MB
grpc.MaxSendMsgSize(16<<20), // 16MB
)
// Register ACL service (will be configured below)
service := NewACLService(cfg, db)
orlyaclv1.RegisterACLServiceServer(grpcServer, service)
// Register reflection for debugging with grpcurl
reflection.Register(grpcServer)
// Start listening immediately so launcher knows we're alive
lis, err := net.Listen("tcp", cfg.Listen)
if chk.E(err) {
log.E.F("failed to listen on %s: %v", cfg.Listen, err)
os.Exit(1)
}
log.I.F("gRPC ACL server listening on %s", cfg.Listen)
// Start serving in background while we configure
go func() {
if err := grpcServer.Serve(lis); err != nil {
log.E.F("gRPC server error: %v", err)
}
}()
// Create app config for ACL configuration // Create app config for ACL configuration
appCfg := &config.C{ appCfg := &config.C{
Owners: cfg.GetOwners(), Owners: cfg.GetOwners(),
@ -89,79 +117,54 @@ func main() {
FollowsThrottleMaxDelay: cfg.FollowsThrottleMaxDelay, FollowsThrottleMaxDelay: cfg.FollowsThrottleMaxDelay,
} }
// Set ACL mode and configure the registry // Set ACL mode and configure the registry (may take time to load follow lists)
acl.Registry.SetMode(cfg.ACLMode) acl.Registry.SetMode(cfg.ACLMode)
if err := acl.Registry.Configure(appCfg, db, ctx); chk.E(err) { if err := acl.Registry.Configure(appCfg, db, ctx); chk.E(err) {
log.E.F("failed to configure ACL: %v", err) log.E.F("failed to configure ACL: %v", err)
os.Exit(1) os.Exit(1)
} }
// Mark service as ready now that configuration is complete
service.SetReady(true)
// Start the syncer goroutine for background operations // Start the syncer goroutine for background operations
acl.Registry.Syncer() acl.Registry.Syncer()
log.I.F("ACL syncer started for mode: %s", cfg.ACLMode) log.I.F("ACL syncer started for mode: %s", cfg.ACLMode)
// Create gRPC server // Handle graceful shutdown - block until signal received
grpcServer := grpc.NewServer( sigs := make(chan os.Signal, 1)
grpc.MaxRecvMsgSize(16<<20), // 16MB signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
grpc.MaxSendMsgSize(16<<20), // 16MB sig := <-sigs
) log.I.F("received signal %v, shutting down...", sig)
// Register ACL service // Cancel context to stop all operations
service := NewACLService(cfg, db) cancel()
orlyaclv1.RegisterACLServiceServer(grpcServer, service)
// Register reflection for debugging with grpcurl // Gracefully stop gRPC server with timeout
reflection.Register(grpcServer) stopped := make(chan struct{})
go func() {
grpcServer.GracefulStop()
close(stopped)
}()
// Start listening select {
lis, err := net.Listen("tcp", cfg.Listen) case <-stopped:
if chk.E(err) { log.I.F("gRPC server stopped gracefully")
log.E.F("failed to listen on %s: %v", cfg.Listen, err) case <-time.After(5 * time.Second):
os.Exit(1) log.W.F("gRPC graceful stop timed out, forcing stop")
grpcServer.Stop()
} }
log.I.F("gRPC ACL server listening on %s", cfg.Listen)
// Handle graceful shutdown // Sync and close database (only for direct Badger)
go func() { if cfg.DBType != "grpc" {
sigs := make(chan os.Signal, 1) log.I.F("syncing database...")
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) if err := db.Sync(); chk.E(err) {
sig := <-sigs log.W.F("failed to sync database: %v", err)
log.I.F("received signal %v, shutting down...", sig)
// Cancel context to stop all operations
cancel()
// Gracefully stop gRPC server with timeout
stopped := make(chan struct{})
go func() {
grpcServer.GracefulStop()
close(stopped)
}()
select {
case <-stopped:
log.I.F("gRPC server stopped gracefully")
case <-time.After(5 * time.Second):
log.W.F("gRPC graceful stop timed out, forcing stop")
grpcServer.Stop()
} }
log.I.F("closing database...")
// Sync and close database (only for direct Badger) if err := db.Close(); chk.E(err) {
if cfg.DBType != "grpc" { log.W.F("failed to close database: %v", err)
log.I.F("syncing database...")
if err := db.Sync(); chk.E(err) {
log.W.F("failed to sync database: %v", err)
}
log.I.F("closing database...")
if err := db.Close(); chk.E(err) {
log.W.F("failed to close database: %v", err)
}
} }
log.I.F("shutdown complete")
}()
// Serve gRPC
if err := grpcServer.Serve(lis); err != nil {
log.E.F("gRPC server error: %v", err)
} }
log.I.F("shutdown complete")
} }

27
cmd/orly-acl/service.go

@ -17,15 +17,25 @@ import (
// ACLService implements the orlyaclv1.ACLServiceServer interface. // ACLService implements the orlyaclv1.ACLServiceServer interface.
type ACLService struct { type ACLService struct {
orlyaclv1.UnimplementedACLServiceServer orlyaclv1.UnimplementedACLServiceServer
cfg *Config cfg *Config
db database.Database db database.Database
ready bool
} }
// NewACLService creates a new ACL service. // NewACLService creates a new ACL service.
func NewACLService(cfg *Config, db database.Database) *ACLService { func NewACLService(cfg *Config, db database.Database) *ACLService {
return &ACLService{ return &ACLService{
cfg: cfg, cfg: cfg,
db: db, db: db,
ready: false, // Not ready until Configure completes
}
}
// SetReady marks the service as ready (or not ready).
func (s *ACLService) SetReady(ready bool) {
s.ready = ready
if ready {
log.I.F("ACL service is now ready")
} }
} }
@ -60,13 +70,8 @@ func (s *ACLService) GetMode(ctx context.Context, req *orlyaclv1.Empty) (*orlyac
} }
func (s *ACLService) Ready(ctx context.Context, req *orlyaclv1.Empty) (*orlyaclv1.ReadyResponse, error) { func (s *ACLService) Ready(ctx context.Context, req *orlyaclv1.Empty) (*orlyaclv1.ReadyResponse, error) {
// Check if database is ready // Check if service is fully configured and ready
select { return &orlyaclv1.ReadyResponse{Ready: s.ready}, nil
case <-s.db.Ready():
return &orlyaclv1.ReadyResponse{Ready: true}, nil
default:
return &orlyaclv1.ReadyResponse{Ready: false}, nil
}
} }
// === Follows ACL Methods === // === Follows ACL Methods ===

14
pkg/database/get-serials-by-range.go

@ -8,7 +8,6 @@ import (
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/pkg/database/indexes/types" "next.orly.dev/pkg/database/indexes/types"
) )
@ -33,27 +32,15 @@ func (d *D) GetSerialsByRange(idx Range) (
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
endBoundary = append(endBoundary, 0xff) endBoundary = append(endBoundary, 0xff)
} }
iterCount := 0
it.Seek(endBoundary) it.Seek(endBoundary)
// log.T.F("GetSerialsByRange: iterator valid=%v, sought to endBoundary", it.Valid())
for it.Valid() { for it.Valid() {
iterCount++
if iterCount > 100 {
// Safety limit to prevent infinite loops in debugging
log.T.F("GetSerialsByRange: hit safety limit of 100 iterations")
break
}
item := it.Item() item := it.Item()
var key []byte var key []byte
key = item.Key() key = item.Key()
keyWithoutSerial := key[:len(key)-5] keyWithoutSerial := key[:len(key)-5]
cmp := bytes.Compare(keyWithoutSerial, idx.Start) cmp := bytes.Compare(keyWithoutSerial, idx.Start)
// log.T.F("GetSerialsByRange: iter %d, key prefix matches=%v, cmp=%d", iterCount, bytes.HasPrefix(key, idx.Start[:len(idx.Start)-8]), cmp)
if cmp < 0 { if cmp < 0 {
// didn't find it within the timestamp range // didn't find it within the timestamp range
// log.T.F("GetSerialsByRange: key out of range (cmp=%d), stopping iteration", cmp)
// log.T.F(" keyWithoutSerial len=%d: %x", len(keyWithoutSerial), keyWithoutSerial)
// log.T.F(" idx.Start len=%d: %x", len(idx.Start), idx.Start)
return return
} }
ser := new(types.Uint40) ser := new(types.Uint40)
@ -64,7 +51,6 @@ func (d *D) GetSerialsByRange(idx Range) (
sers = append(sers, ser) sers = append(sers, ser)
it.Next() it.Next()
} }
// log.T.F("GetSerialsByRange: iteration complete, found %d serials", len(sers))
return return
}, },
); chk.E(err) { ); chk.E(err) {

2
pkg/version/version

@ -1 +1 @@
v0.55.0 v0.55.1

Loading…
Cancel
Save