From 8a2aa009d6d891ac315d8a4df27736804c42b69c Mon Sep 17 00:00:00 2001 From: woikos Date: Mon, 26 Jan 2026 16:15:40 +0100 Subject: [PATCH] Add embedded negentropy for monolithic mode NIP-77 support (v0.57.1) - Create negentropy.Handler interface in pkg/interfaces/negentropy - Add EmbeddedHandler that wraps negentropy.Manager for embedded mode - Update app/handle-negentropy.go to use interface instead of gRPC client - Update gRPC client to implement the Handler interface - Update startup.go to initialize embedded negentropy when not in gRPC mode - Enables NIP-77 set reconciliation in monolithic binary without gRPC Files modified: - pkg/interfaces/negentropy/negentropy.go: New interface definition - pkg/sync/negentropy/embedded.go: New embedded handler implementation - pkg/sync/negentropy/grpc/client.go: Implement Handler interface - app/handle-negentropy.go: Use interface type - pkg/relay/startup.go: Initialize embedded negentropy - pkg/version/version: Bump to v0.57.1 Co-Authored-By: Claude Opus 4.5 --- app/handle-negentropy.go | 32 +-- pkg/interfaces/negentropy/negentropy.go | 44 +++++ pkg/relay/startup.go | 32 ++- pkg/sync/negentropy/embedded.go | 253 ++++++++++++++++++++++++ pkg/sync/negentropy/grpc/client.go | 19 +- pkg/version/version | 2 +- 6 files changed, 345 insertions(+), 37 deletions(-) create mode 100644 pkg/interfaces/negentropy/negentropy.go create mode 100644 pkg/sync/negentropy/embedded.go diff --git a/app/handle-negentropy.go b/app/handle-negentropy.go index 7acaf74..6b866a4 100644 --- a/app/handle-negentropy.go +++ b/app/handle-negentropy.go @@ -13,8 +13,8 @@ import ( "git.mleku.dev/mleku/nostr/encoders/envelopes/eventenvelope" "git.mleku.dev/mleku/nostr/encoders/filter" "git.mleku.dev/mleku/nostr/encoders/tag" + negentropyiface "next.orly.dev/pkg/interfaces/negentropy" commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1" - negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc" ) // NIP-77 Negentropy envelope constants @@ -25,13 +25,13 @@ const ( NegErrLabel = "NEG-ERR" ) -// negentropyClient is the gRPC client for negentropy operations -// This should be set via Server.SetNegentropyClient() when using gRPC mode -var negentropyClient *negentropygrpc.Client +// negentropyHandler handles NIP-77 negentropy operations +// This can be either a gRPC client or an embedded handler +var negentropyHandler negentropyiface.Handler -// SetNegentropyClient sets the negentropy gRPC client for NIP-77 WebSocket handling -func SetNegentropyClient(client *negentropygrpc.Client) { - negentropyClient = client +// SetNegentropyHandler sets the negentropy handler for NIP-77 WebSocket handling +func SetNegentropyHandler(handler negentropyiface.Handler) { + negentropyHandler = handler } // IsNegentropyEnvelope checks if a message starts with a NEG-* envelope type @@ -63,7 +63,7 @@ func IdentifyNegentropyEnvelope(msg []byte) (envelopeType string, ok bool) { // Format: ["NEG-OPEN", subscription_id, filter, initial_message?] func (l *Listener) HandleNegOpen(msg []byte) error { log.I.F("HandleNegOpen called from %s", l.connectionID) - if negentropyClient == nil { + if negentropyHandler == nil { log.E.F("negentropy client is nil") return l.sendNegErr("", "negentropy not enabled") } @@ -109,7 +109,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { // Call gRPC service ctx := context.Background() - respMsg, haveIDs, needIDs, complete, errStr, err := negentropyClient.HandleNegOpen( + respMsg, haveIDs, needIDs, complete, errStr, err := negentropyHandler.HandleNegOpen( ctx, l.connectionID, subscriptionID, @@ -153,7 +153,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { // HandleNegMsg processes NEG-MSG messages // Format: ["NEG-MSG", subscription_id, message] func (l *Listener) HandleNegMsg(msg []byte) error { - if negentropyClient == nil { + if negentropyHandler == nil { return l.sendNegErr("", "negentropy not enabled") } @@ -187,7 +187,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error { // Call gRPC service ctx := context.Background() - respMsg, haveIDs, needIDs, complete, errStr, err := negentropyClient.HandleNegMsg( + respMsg, haveIDs, needIDs, complete, errStr, err := negentropyHandler.HandleNegMsg( ctx, l.connectionID, subscriptionID, @@ -230,7 +230,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error { // HandleNegClose processes NEG-CLOSE messages // Format: ["NEG-CLOSE", subscription_id] func (l *Listener) HandleNegClose(msg []byte) error { - if negentropyClient == nil { + if negentropyHandler == nil { return nil // Silently ignore if not enabled } @@ -252,7 +252,7 @@ func (l *Listener) HandleNegClose(msg []byte) error { // Call gRPC service to close the session ctx := context.Background() - if err := negentropyClient.HandleNegClose(ctx, l.connectionID, subscriptionID); err != nil { + if err := negentropyHandler.HandleNegClose(ctx, l.connectionID, subscriptionID); err != nil { log.E.F("NEG-CLOSE gRPC error: %v", err) } @@ -390,19 +390,19 @@ func filterToProto(f *filter.F) *commonv1.Filter { // CloseAllNegentropySessions closes all negentropy sessions for a connection // Called when a WebSocket connection is closed func (l *Listener) CloseAllNegentropySessions() { - if negentropyClient == nil { + if negentropyHandler == nil { return } ctx := context.Background() - sessions, err := negentropyClient.ListSessions(ctx) + sessions, err := negentropyHandler.ListSessions(ctx) if chk.E(err) { return } for _, sess := range sessions { if sess.ConnectionID == l.connectionID { - negentropyClient.CloseSession(ctx, l.connectionID, sess.SubscriptionID) + negentropyHandler.CloseSession(ctx, l.connectionID, sess.SubscriptionID) } } } diff --git a/pkg/interfaces/negentropy/negentropy.go b/pkg/interfaces/negentropy/negentropy.go new file mode 100644 index 0000000..a7fa177 --- /dev/null +++ b/pkg/interfaces/negentropy/negentropy.go @@ -0,0 +1,44 @@ +// Package negentropy defines the interface for NIP-77 negentropy operations. +package negentropy + +import ( + "context" + + commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1" +) + +// ClientSession represents an active client negentropy session. +type ClientSession struct { + SubscriptionID string + ConnectionID string + CreatedAt int64 + LastActivity int64 + RoundCount int32 +} + +// Handler defines the interface for handling NIP-77 negentropy messages. +// This interface is implemented by both the gRPC client and the embedded handler. +type Handler interface { + // HandleNegOpen processes a NEG-OPEN message from a client. + // Returns: message, haveIDs, needIDs, complete, errorStr, error + HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, filter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) + + // HandleNegMsg processes a NEG-MSG message from a client. + // Returns: message, haveIDs, needIDs, complete, errorStr, error + HandleNegMsg(ctx context.Context, connectionID, subscriptionID string, message []byte) ([]byte, [][]byte, [][]byte, bool, string, error) + + // HandleNegClose processes a NEG-CLOSE message from a client. + HandleNegClose(ctx context.Context, connectionID, subscriptionID string) error + + // ListSessions returns active client negentropy sessions. + ListSessions(ctx context.Context) ([]*ClientSession, error) + + // CloseSession forcefully closes a client session. + CloseSession(ctx context.Context, connectionID, subscriptionID string) error + + // Ready returns a channel that closes when the handler is ready. + Ready() <-chan struct{} + + // Close cleans up resources. + Close() error +} diff --git a/pkg/relay/startup.go b/pkg/relay/startup.go index 2ccddbd..6f7122b 100644 --- a/pkg/relay/startup.go +++ b/pkg/relay/startup.go @@ -29,6 +29,7 @@ import ( _ "next.orly.dev/pkg/database/grpc" // Import for grpc factory registration neo4jdb "next.orly.dev/pkg/neo4j" "next.orly.dev/pkg/ratelimit" + "next.orly.dev/pkg/sync/negentropy" negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc" "next.orly.dev/pkg/utils/interrupt" ) @@ -71,8 +72,8 @@ func Startup(cfg *config.C) (*StartupResult, error) { return nil, fmt.Errorf("failed to initialize ACL: %w", err) } - // Initialize negentropy client if enabled - initializeNegentropy(ctx, cfg) + // Initialize negentropy handler (embedded or gRPC client) + initializeNegentropy(ctx, cfg, db) // Create rate limiter limiter := createRateLimiter(cfg, db) @@ -294,11 +295,16 @@ func initializeACL(ctx context.Context, cfg *config.C, db database.Database) err return nil } -// initializeNegentropy sets up the negentropy gRPC client if enabled. -func initializeNegentropy(ctx context.Context, cfg *config.C) { +// initializeNegentropy sets up negentropy handling (embedded or gRPC client). +func initializeNegentropy(ctx context.Context, cfg *config.C, db database.Database) { syncType, _, _, _, negentropyAddr, syncTimeout, negentropyEnabled := cfg.GetGRPCSyncConfigValues() - if negentropyEnabled && syncType == "grpc" && negentropyAddr != "" { + if !negentropyEnabled { + return + } + + if syncType == "grpc" && negentropyAddr != "" { + // Use gRPC client to connect to remote negentropy server log.I.F("connecting to gRPC negentropy server at %s", negentropyAddr) negClient, err := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{ ServerAddress: negentropyAddr, @@ -313,12 +319,22 @@ func initializeNegentropy(ctx context.Context, cfg *config.C) { select { case <-negClient.Ready(): log.I.F("gRPC negentropy client connected") - app.SetNegentropyClient(negClient) + app.SetNegentropyHandler(negClient) case <-time.After(30 * time.Second): log.W.F("timeout waiting for gRPC negentropy server (NIP-77 disabled)") } - } else if negentropyEnabled { - log.I.F("negentropy enabled but sync type is %q, skipping gRPC client", syncType) + } else { + // Use embedded negentropy handler (monolithic mode) + log.I.F("initializing embedded negentropy handler") + negHandler := negentropy.NewEmbeddedHandler(db, &negentropy.Config{ + SyncInterval: 60 * time.Second, + FrameSize: 128 * 1024, + IDSize: 16, + ClientSessionTimeout: 5 * time.Minute, + }) + negHandler.Start() + app.SetNegentropyHandler(negHandler) + log.I.F("embedded negentropy handler initialized (NIP-77 enabled)") } } diff --git a/pkg/sync/negentropy/embedded.go b/pkg/sync/negentropy/embedded.go new file mode 100644 index 0000000..5f07da9 --- /dev/null +++ b/pkg/sync/negentropy/embedded.go @@ -0,0 +1,253 @@ +package negentropy + +import ( + "context" + "encoding/hex" + "fmt" + + "lol.mleku.dev/log" + + "git.mleku.dev/mleku/nostr/encoders/filter" + negentropylib "git.mleku.dev/mleku/nostr/negentropy" + "next.orly.dev/pkg/database" + negentropyiface "next.orly.dev/pkg/interfaces/negentropy" + commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1" +) + +// EmbeddedHandler wraps the negentropy Manager to implement the Handler interface. +// This allows negentropy to run embedded in monolithic mode without gRPC. +type EmbeddedHandler struct { + mgr *Manager + db database.Database + ready chan struct{} +} + +// NewEmbeddedHandler creates a new embedded negentropy handler. +func NewEmbeddedHandler(db database.Database, cfg *Config) *EmbeddedHandler { + h := &EmbeddedHandler{ + mgr: NewManager(db, cfg), + db: db, + ready: make(chan struct{}), + } + close(h.ready) // Immediately ready in embedded mode + return h +} + +// Start starts the background sync loop. +func (h *EmbeddedHandler) Start() { + h.mgr.Start() +} + +// Stop stops the background sync. +func (h *EmbeddedHandler) Stop() { + h.mgr.Stop() +} + +// Ready returns a channel that closes when the handler is ready. +func (h *EmbeddedHandler) Ready() <-chan struct{} { + return h.ready +} + +// Close cleans up resources. +func (h *EmbeddedHandler) Close() error { + h.mgr.Stop() + return nil +} + +// HandleNegOpen processes a NEG-OPEN message from a client. +func (h *EmbeddedHandler) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, protoFilter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) { + // Open a session for this client + session := h.mgr.OpenSession(connectionID, subscriptionID) + + // Build storage from local events matching the filter + storage, err := h.buildStorageForFilter(ctx, protoFilter) + if err != nil { + log.E.F("NEG-OPEN: failed to build storage: %v", err) + return nil, nil, nil, false, fmt.Sprintf("failed to build storage: %v", err), nil + } + + log.I.F("NEG-OPEN: built storage with %d events", storage.Size()) + + // Create negentropy instance for this session + neg := negentropylib.New(storage, negentropylib.DefaultFrameSizeLimit) + + // Store in session for later use + session.SetNegentropy(neg, storage) + + // If we have an initial message from client, process it + var respMsg []byte + var complete bool + if len(initialMessage) > 0 { + respMsg, complete, err = neg.Reconcile(initialMessage) + if err != nil { + log.E.F("NEG-OPEN: reconcile failed: %v", err) + return nil, nil, nil, false, fmt.Sprintf("reconcile failed: %v", err), nil + } + log.I.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg)) + } else { + // No initial message, start as server (initiator) + respMsg, err = neg.Start() + if err != nil { + log.E.F("NEG-OPEN: failed to start: %v", err) + return nil, nil, nil, false, fmt.Sprintf("failed to start: %v", err), nil + } + log.D.F("NEG-OPEN: started negentropy, initial msg len=%d", len(respMsg)) + } + + // Collect IDs we have that client needs (to send as events) + haveIDs := neg.CollectHaves() + var haveIDBytes [][]byte + for _, id := range haveIDs { + if decoded, err := hex.DecodeString(id); err == nil { + haveIDBytes = append(haveIDBytes, decoded) + } + } + + // Collect IDs we need from client + needIDs := neg.CollectHaveNots() + var needIDBytes [][]byte + for _, id := range needIDs { + if decoded, err := hex.DecodeString(id); err == nil { + needIDBytes = append(needIDBytes, decoded) + } + } + + log.I.F("NEG-OPEN: complete=%v, haves=%d, needs=%d, response len=%d", + complete, len(haveIDs), len(needIDs), len(respMsg)) + + return respMsg, haveIDBytes, needIDBytes, complete, "", nil +} + +// HandleNegMsg processes a NEG-MSG message from a client. +func (h *EmbeddedHandler) HandleNegMsg(ctx context.Context, connectionID, subscriptionID string, message []byte) ([]byte, [][]byte, [][]byte, bool, string, error) { + // Update session activity + h.mgr.UpdateSessionActivity(connectionID, subscriptionID) + + // Look up session + session, ok := h.mgr.GetSession(connectionID, subscriptionID) + if !ok { + return nil, nil, nil, false, "session not found", nil + } + + neg := session.GetNegentropy() + if neg == nil { + return nil, nil, nil, false, "session has no negentropy state", nil + } + + // Process the message + respMsg, complete, err := neg.Reconcile(message) + if err != nil { + log.E.F("NEG-MSG: reconcile failed: %v", err) + return nil, nil, nil, false, fmt.Sprintf("reconcile failed: %v", err), nil + } + + // Collect IDs we have that client needs + haveIDs := neg.CollectHaves() + var haveIDBytes [][]byte + for _, id := range haveIDs { + if decoded, err := hex.DecodeString(id); err == nil { + haveIDBytes = append(haveIDBytes, decoded) + } + } + + // Collect IDs we need from client + needIDs := neg.CollectHaveNots() + var needIDBytes [][]byte + for _, id := range needIDs { + if decoded, err := hex.DecodeString(id); err == nil { + needIDBytes = append(needIDBytes, decoded) + } + } + + log.I.F("NEG-MSG: complete=%v, haves=%d, needs=%d, response len=%d", + complete, len(haveIDs), len(needIDs), len(respMsg)) + + return respMsg, haveIDBytes, needIDBytes, complete, "", nil +} + +// HandleNegClose processes a NEG-CLOSE message from a client. +func (h *EmbeddedHandler) HandleNegClose(ctx context.Context, connectionID, subscriptionID string) error { + h.mgr.CloseSession(connectionID, subscriptionID) + return nil +} + +// ListSessions returns active client negentropy sessions. +func (h *EmbeddedHandler) ListSessions(ctx context.Context) ([]*negentropyiface.ClientSession, error) { + sessions := h.mgr.ListSessions() + + result := make([]*negentropyiface.ClientSession, 0, len(sessions)) + for _, sess := range sessions { + result = append(result, &negentropyiface.ClientSession{ + SubscriptionID: sess.SubscriptionID, + ConnectionID: sess.ConnectionID, + CreatedAt: sess.CreatedAt.Unix(), + LastActivity: sess.LastActivity.Unix(), + RoundCount: sess.RoundCount, + }) + } + return result, nil +} + +// CloseSession forcefully closes a client session. +func (h *EmbeddedHandler) CloseSession(ctx context.Context, connectionID, subscriptionID string) error { + if connectionID == "" { + // Close all sessions with this subscription ID + sessions := h.mgr.ListSessions() + for _, sess := range sessions { + if sess.SubscriptionID == subscriptionID { + h.mgr.CloseSession(sess.ConnectionID, sess.SubscriptionID) + } + } + } else { + h.mgr.CloseSession(connectionID, subscriptionID) + } + return nil +} + +// buildStorageForFilter creates a negentropy Vector from local events matching the filter. +func (h *EmbeddedHandler) buildStorageForFilter(ctx context.Context, protoFilter *commonv1.Filter) (*negentropylib.Vector, error) { + storage := negentropylib.NewVector() + + // Convert proto filter to nostr filter + f := protoToFilter(protoFilter) + + // If no filter provided, use a reasonable limit + if f == nil { + limit := uint(1000000) + f = &filter.F{Limit: &limit} + } + if f.Limit == nil { + limit := uint(1000000) + f.Limit = &limit + } + + // Query events from database + idPkTs, err := h.db.QueryForIds(ctx, f) + if err != nil { + return nil, fmt.Errorf("failed to query events: %w", err) + } + + for _, item := range idPkTs { + storage.Insert(item.Ts, item.IDHex()) + } + + storage.Seal() + return storage, nil +} + +// protoToFilter converts a proto filter to a nostr filter. +func protoToFilter(pf *commonv1.Filter) *filter.F { + if pf == nil { + return nil + } + + f := &filter.F{} + + // Convert Limit + if pf.Limit != nil { + limit := uint(*pf.Limit) + f.Limit = &limit + } + + return f +} diff --git a/pkg/sync/negentropy/grpc/client.go b/pkg/sync/negentropy/grpc/client.go index 23b1cdd..866f77e 100644 --- a/pkg/sync/negentropy/grpc/client.go +++ b/pkg/sync/negentropy/grpc/client.go @@ -10,10 +10,14 @@ import ( "google.golang.org/grpc/credentials/insecure" "lol.mleku.dev/log" + negentropyiface "next.orly.dev/pkg/interfaces/negentropy" commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1" negentropyv1 "next.orly.dev/pkg/proto/orlysync/negentropy/v1" ) +// Verify Client implements the Handler interface +var _ negentropyiface.Handler = (*Client)(nil) + // Client is a gRPC client for the negentropy sync service. type Client struct { conn *grpc.ClientConn @@ -293,25 +297,16 @@ func (c *Client) GetPeerSyncState(ctx context.Context, peerURL string) (*PeerSyn }, true, nil } -// ClientSession represents an active client negentropy session. -type ClientSession struct { - SubscriptionID string - ConnectionID string - CreatedAt int64 - LastActivity int64 - RoundCount int32 -} - // ListSessions returns active client negentropy sessions. -func (c *Client) ListSessions(ctx context.Context) ([]*ClientSession, error) { +func (c *Client) ListSessions(ctx context.Context) ([]*negentropyiface.ClientSession, error) { resp, err := c.client.ListSessions(ctx, &commonv1.Empty{}) if err != nil { return nil, err } - sessions := make([]*ClientSession, 0, len(resp.Sessions)) + sessions := make([]*negentropyiface.ClientSession, 0, len(resp.Sessions)) for _, s := range resp.Sessions { - sessions = append(sessions, &ClientSession{ + sessions = append(sessions, &negentropyiface.ClientSession{ SubscriptionID: s.SubscriptionId, ConnectionID: s.ConnectionId, CreatedAt: s.CreatedAt, diff --git a/pkg/version/version b/pkg/version/version index 7e16869..9ed446c 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.57.0 +v0.57.1