Browse Source

Add embedded negentropy for monolithic mode NIP-77 support (v0.57.1)

- Create negentropy.Handler interface in pkg/interfaces/negentropy
- Add EmbeddedHandler that wraps negentropy.Manager for embedded mode
- Update app/handle-negentropy.go to use interface instead of gRPC client
- Update gRPC client to implement the Handler interface
- Update startup.go to initialize embedded negentropy when not in gRPC mode
- Enables NIP-77 set reconciliation in monolithic binary without gRPC

Files modified:
- pkg/interfaces/negentropy/negentropy.go: New interface definition
- pkg/sync/negentropy/embedded.go: New embedded handler implementation
- pkg/sync/negentropy/grpc/client.go: Implement Handler interface
- app/handle-negentropy.go: Use interface type
- pkg/relay/startup.go: Initialize embedded negentropy
- pkg/version/version: Bump to v0.57.1

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
main v0.57.1
woikos 4 months ago
parent
commit
8a2aa009d6
No known key found for this signature in database
  1. 32
      app/handle-negentropy.go
  2. 44
      pkg/interfaces/negentropy/negentropy.go
  3. 32
      pkg/relay/startup.go
  4. 253
      pkg/sync/negentropy/embedded.go
  5. 19
      pkg/sync/negentropy/grpc/client.go
  6. 2
      pkg/version/version

32
app/handle-negentropy.go

@ -13,8 +13,8 @@ import ( @@ -13,8 +13,8 @@ import (
"git.mleku.dev/mleku/nostr/encoders/envelopes/eventenvelope"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/tag"
negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc"
)
// NIP-77 Negentropy envelope constants
@ -25,13 +25,13 @@ const ( @@ -25,13 +25,13 @@ const (
NegErrLabel = "NEG-ERR"
)
// negentropyClient is the gRPC client for negentropy operations
// This should be set via Server.SetNegentropyClient() when using gRPC mode
var negentropyClient *negentropygrpc.Client
// negentropyHandler handles NIP-77 negentropy operations
// This can be either a gRPC client or an embedded handler
var negentropyHandler negentropyiface.Handler
// SetNegentropyClient sets the negentropy gRPC client for NIP-77 WebSocket handling
func SetNegentropyClient(client *negentropygrpc.Client) {
negentropyClient = client
// SetNegentropyHandler sets the negentropy handler for NIP-77 WebSocket handling
func SetNegentropyHandler(handler negentropyiface.Handler) {
negentropyHandler = handler
}
// IsNegentropyEnvelope checks if a message starts with a NEG-* envelope type
@ -63,7 +63,7 @@ func IdentifyNegentropyEnvelope(msg []byte) (envelopeType string, ok bool) { @@ -63,7 +63,7 @@ func IdentifyNegentropyEnvelope(msg []byte) (envelopeType string, ok bool) {
// Format: ["NEG-OPEN", subscription_id, filter, initial_message?]
func (l *Listener) HandleNegOpen(msg []byte) error {
log.I.F("HandleNegOpen called from %s", l.connectionID)
if negentropyClient == nil {
if negentropyHandler == nil {
log.E.F("negentropy client is nil")
return l.sendNegErr("", "negentropy not enabled")
}
@ -109,7 +109,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { @@ -109,7 +109,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error {
// Call gRPC service
ctx := context.Background()
respMsg, haveIDs, needIDs, complete, errStr, err := negentropyClient.HandleNegOpen(
respMsg, haveIDs, needIDs, complete, errStr, err := negentropyHandler.HandleNegOpen(
ctx,
l.connectionID,
subscriptionID,
@ -153,7 +153,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { @@ -153,7 +153,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error {
// HandleNegMsg processes NEG-MSG messages
// Format: ["NEG-MSG", subscription_id, message]
func (l *Listener) HandleNegMsg(msg []byte) error {
if negentropyClient == nil {
if negentropyHandler == nil {
return l.sendNegErr("", "negentropy not enabled")
}
@ -187,7 +187,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error { @@ -187,7 +187,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error {
// Call gRPC service
ctx := context.Background()
respMsg, haveIDs, needIDs, complete, errStr, err := negentropyClient.HandleNegMsg(
respMsg, haveIDs, needIDs, complete, errStr, err := negentropyHandler.HandleNegMsg(
ctx,
l.connectionID,
subscriptionID,
@ -230,7 +230,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error { @@ -230,7 +230,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error {
// HandleNegClose processes NEG-CLOSE messages
// Format: ["NEG-CLOSE", subscription_id]
func (l *Listener) HandleNegClose(msg []byte) error {
if negentropyClient == nil {
if negentropyHandler == nil {
return nil // Silently ignore if not enabled
}
@ -252,7 +252,7 @@ func (l *Listener) HandleNegClose(msg []byte) error { @@ -252,7 +252,7 @@ func (l *Listener) HandleNegClose(msg []byte) error {
// Call gRPC service to close the session
ctx := context.Background()
if err := negentropyClient.HandleNegClose(ctx, l.connectionID, subscriptionID); err != nil {
if err := negentropyHandler.HandleNegClose(ctx, l.connectionID, subscriptionID); err != nil {
log.E.F("NEG-CLOSE gRPC error: %v", err)
}
@ -390,19 +390,19 @@ func filterToProto(f *filter.F) *commonv1.Filter { @@ -390,19 +390,19 @@ func filterToProto(f *filter.F) *commonv1.Filter {
// CloseAllNegentropySessions closes all negentropy sessions for a connection
// Called when a WebSocket connection is closed
func (l *Listener) CloseAllNegentropySessions() {
if negentropyClient == nil {
if negentropyHandler == nil {
return
}
ctx := context.Background()
sessions, err := negentropyClient.ListSessions(ctx)
sessions, err := negentropyHandler.ListSessions(ctx)
if chk.E(err) {
return
}
for _, sess := range sessions {
if sess.ConnectionID == l.connectionID {
negentropyClient.CloseSession(ctx, l.connectionID, sess.SubscriptionID)
negentropyHandler.CloseSession(ctx, l.connectionID, sess.SubscriptionID)
}
}
}

44
pkg/interfaces/negentropy/negentropy.go

@ -0,0 +1,44 @@ @@ -0,0 +1,44 @@
// Package negentropy defines the interface for NIP-77 negentropy operations.
package negentropy
import (
"context"
commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
)
// ClientSession represents an active client negentropy session.
type ClientSession struct {
SubscriptionID string
ConnectionID string
CreatedAt int64
LastActivity int64
RoundCount int32
}
// Handler defines the interface for handling NIP-77 negentropy messages.
// This interface is implemented by both the gRPC client and the embedded handler.
type Handler interface {
// HandleNegOpen processes a NEG-OPEN message from a client.
// Returns: message, haveIDs, needIDs, complete, errorStr, error
HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, filter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error)
// HandleNegMsg processes a NEG-MSG message from a client.
// Returns: message, haveIDs, needIDs, complete, errorStr, error
HandleNegMsg(ctx context.Context, connectionID, subscriptionID string, message []byte) ([]byte, [][]byte, [][]byte, bool, string, error)
// HandleNegClose processes a NEG-CLOSE message from a client.
HandleNegClose(ctx context.Context, connectionID, subscriptionID string) error
// ListSessions returns active client negentropy sessions.
ListSessions(ctx context.Context) ([]*ClientSession, error)
// CloseSession forcefully closes a client session.
CloseSession(ctx context.Context, connectionID, subscriptionID string) error
// Ready returns a channel that closes when the handler is ready.
Ready() <-chan struct{}
// Close cleans up resources.
Close() error
}

32
pkg/relay/startup.go

@ -29,6 +29,7 @@ import ( @@ -29,6 +29,7 @@ import (
_ "next.orly.dev/pkg/database/grpc" // Import for grpc factory registration
neo4jdb "next.orly.dev/pkg/neo4j"
"next.orly.dev/pkg/ratelimit"
"next.orly.dev/pkg/sync/negentropy"
negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc"
"next.orly.dev/pkg/utils/interrupt"
)
@ -71,8 +72,8 @@ func Startup(cfg *config.C) (*StartupResult, error) { @@ -71,8 +72,8 @@ func Startup(cfg *config.C) (*StartupResult, error) {
return nil, fmt.Errorf("failed to initialize ACL: %w", err)
}
// Initialize negentropy client if enabled
initializeNegentropy(ctx, cfg)
// Initialize negentropy handler (embedded or gRPC client)
initializeNegentropy(ctx, cfg, db)
// Create rate limiter
limiter := createRateLimiter(cfg, db)
@ -294,11 +295,16 @@ func initializeACL(ctx context.Context, cfg *config.C, db database.Database) err @@ -294,11 +295,16 @@ func initializeACL(ctx context.Context, cfg *config.C, db database.Database) err
return nil
}
// initializeNegentropy sets up the negentropy gRPC client if enabled.
func initializeNegentropy(ctx context.Context, cfg *config.C) {
// initializeNegentropy sets up negentropy handling (embedded or gRPC client).
func initializeNegentropy(ctx context.Context, cfg *config.C, db database.Database) {
syncType, _, _, _, negentropyAddr, syncTimeout, negentropyEnabled := cfg.GetGRPCSyncConfigValues()
if negentropyEnabled && syncType == "grpc" && negentropyAddr != "" {
if !negentropyEnabled {
return
}
if syncType == "grpc" && negentropyAddr != "" {
// Use gRPC client to connect to remote negentropy server
log.I.F("connecting to gRPC negentropy server at %s", negentropyAddr)
negClient, err := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{
ServerAddress: negentropyAddr,
@ -313,12 +319,22 @@ func initializeNegentropy(ctx context.Context, cfg *config.C) { @@ -313,12 +319,22 @@ func initializeNegentropy(ctx context.Context, cfg *config.C) {
select {
case <-negClient.Ready():
log.I.F("gRPC negentropy client connected")
app.SetNegentropyClient(negClient)
app.SetNegentropyHandler(negClient)
case <-time.After(30 * time.Second):
log.W.F("timeout waiting for gRPC negentropy server (NIP-77 disabled)")
}
} else if negentropyEnabled {
log.I.F("negentropy enabled but sync type is %q, skipping gRPC client", syncType)
} else {
// Use embedded negentropy handler (monolithic mode)
log.I.F("initializing embedded negentropy handler")
negHandler := negentropy.NewEmbeddedHandler(db, &negentropy.Config{
SyncInterval: 60 * time.Second,
FrameSize: 128 * 1024,
IDSize: 16,
ClientSessionTimeout: 5 * time.Minute,
})
negHandler.Start()
app.SetNegentropyHandler(negHandler)
log.I.F("embedded negentropy handler initialized (NIP-77 enabled)")
}
}

253
pkg/sync/negentropy/embedded.go

@ -0,0 +1,253 @@ @@ -0,0 +1,253 @@
package negentropy
import (
"context"
"encoding/hex"
"fmt"
"lol.mleku.dev/log"
"git.mleku.dev/mleku/nostr/encoders/filter"
negentropylib "git.mleku.dev/mleku/nostr/negentropy"
"next.orly.dev/pkg/database"
negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
)
// EmbeddedHandler wraps the negentropy Manager to implement the Handler interface.
// This allows negentropy to run embedded in monolithic mode without gRPC.
type EmbeddedHandler struct {
mgr *Manager
db database.Database
ready chan struct{}
}
// NewEmbeddedHandler creates a new embedded negentropy handler.
func NewEmbeddedHandler(db database.Database, cfg *Config) *EmbeddedHandler {
h := &EmbeddedHandler{
mgr: NewManager(db, cfg),
db: db,
ready: make(chan struct{}),
}
close(h.ready) // Immediately ready in embedded mode
return h
}
// Start starts the background sync loop.
func (h *EmbeddedHandler) Start() {
h.mgr.Start()
}
// Stop stops the background sync.
func (h *EmbeddedHandler) Stop() {
h.mgr.Stop()
}
// Ready returns a channel that closes when the handler is ready.
func (h *EmbeddedHandler) Ready() <-chan struct{} {
return h.ready
}
// Close cleans up resources.
func (h *EmbeddedHandler) Close() error {
h.mgr.Stop()
return nil
}
// HandleNegOpen processes a NEG-OPEN message from a client.
func (h *EmbeddedHandler) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, protoFilter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
// Open a session for this client
session := h.mgr.OpenSession(connectionID, subscriptionID)
// Build storage from local events matching the filter
storage, err := h.buildStorageForFilter(ctx, protoFilter)
if err != nil {
log.E.F("NEG-OPEN: failed to build storage: %v", err)
return nil, nil, nil, false, fmt.Sprintf("failed to build storage: %v", err), nil
}
log.I.F("NEG-OPEN: built storage with %d events", storage.Size())
// Create negentropy instance for this session
neg := negentropylib.New(storage, negentropylib.DefaultFrameSizeLimit)
// Store in session for later use
session.SetNegentropy(neg, storage)
// If we have an initial message from client, process it
var respMsg []byte
var complete bool
if len(initialMessage) > 0 {
respMsg, complete, err = neg.Reconcile(initialMessage)
if err != nil {
log.E.F("NEG-OPEN: reconcile failed: %v", err)
return nil, nil, nil, false, fmt.Sprintf("reconcile failed: %v", err), nil
}
log.I.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg))
} else {
// No initial message, start as server (initiator)
respMsg, err = neg.Start()
if err != nil {
log.E.F("NEG-OPEN: failed to start: %v", err)
return nil, nil, nil, false, fmt.Sprintf("failed to start: %v", err), nil
}
log.D.F("NEG-OPEN: started negentropy, initial msg len=%d", len(respMsg))
}
// Collect IDs we have that client needs (to send as events)
haveIDs := neg.CollectHaves()
var haveIDBytes [][]byte
for _, id := range haveIDs {
if decoded, err := hex.DecodeString(id); err == nil {
haveIDBytes = append(haveIDBytes, decoded)
}
}
// Collect IDs we need from client
needIDs := neg.CollectHaveNots()
var needIDBytes [][]byte
for _, id := range needIDs {
if decoded, err := hex.DecodeString(id); err == nil {
needIDBytes = append(needIDBytes, decoded)
}
}
log.I.F("NEG-OPEN: complete=%v, haves=%d, needs=%d, response len=%d",
complete, len(haveIDs), len(needIDs), len(respMsg))
return respMsg, haveIDBytes, needIDBytes, complete, "", nil
}
// HandleNegMsg processes a NEG-MSG message from a client.
func (h *EmbeddedHandler) HandleNegMsg(ctx context.Context, connectionID, subscriptionID string, message []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
// Update session activity
h.mgr.UpdateSessionActivity(connectionID, subscriptionID)
// Look up session
session, ok := h.mgr.GetSession(connectionID, subscriptionID)
if !ok {
return nil, nil, nil, false, "session not found", nil
}
neg := session.GetNegentropy()
if neg == nil {
return nil, nil, nil, false, "session has no negentropy state", nil
}
// Process the message
respMsg, complete, err := neg.Reconcile(message)
if err != nil {
log.E.F("NEG-MSG: reconcile failed: %v", err)
return nil, nil, nil, false, fmt.Sprintf("reconcile failed: %v", err), nil
}
// Collect IDs we have that client needs
haveIDs := neg.CollectHaves()
var haveIDBytes [][]byte
for _, id := range haveIDs {
if decoded, err := hex.DecodeString(id); err == nil {
haveIDBytes = append(haveIDBytes, decoded)
}
}
// Collect IDs we need from client
needIDs := neg.CollectHaveNots()
var needIDBytes [][]byte
for _, id := range needIDs {
if decoded, err := hex.DecodeString(id); err == nil {
needIDBytes = append(needIDBytes, decoded)
}
}
log.I.F("NEG-MSG: complete=%v, haves=%d, needs=%d, response len=%d",
complete, len(haveIDs), len(needIDs), len(respMsg))
return respMsg, haveIDBytes, needIDBytes, complete, "", nil
}
// HandleNegClose processes a NEG-CLOSE message from a client.
func (h *EmbeddedHandler) HandleNegClose(ctx context.Context, connectionID, subscriptionID string) error {
h.mgr.CloseSession(connectionID, subscriptionID)
return nil
}
// ListSessions returns active client negentropy sessions.
func (h *EmbeddedHandler) ListSessions(ctx context.Context) ([]*negentropyiface.ClientSession, error) {
sessions := h.mgr.ListSessions()
result := make([]*negentropyiface.ClientSession, 0, len(sessions))
for _, sess := range sessions {
result = append(result, &negentropyiface.ClientSession{
SubscriptionID: sess.SubscriptionID,
ConnectionID: sess.ConnectionID,
CreatedAt: sess.CreatedAt.Unix(),
LastActivity: sess.LastActivity.Unix(),
RoundCount: sess.RoundCount,
})
}
return result, nil
}
// CloseSession forcefully closes a client session.
func (h *EmbeddedHandler) CloseSession(ctx context.Context, connectionID, subscriptionID string) error {
if connectionID == "" {
// Close all sessions with this subscription ID
sessions := h.mgr.ListSessions()
for _, sess := range sessions {
if sess.SubscriptionID == subscriptionID {
h.mgr.CloseSession(sess.ConnectionID, sess.SubscriptionID)
}
}
} else {
h.mgr.CloseSession(connectionID, subscriptionID)
}
return nil
}
// buildStorageForFilter creates a negentropy Vector from local events matching the filter.
func (h *EmbeddedHandler) buildStorageForFilter(ctx context.Context, protoFilter *commonv1.Filter) (*negentropylib.Vector, error) {
storage := negentropylib.NewVector()
// Convert proto filter to nostr filter
f := protoToFilter(protoFilter)
// If no filter provided, use a reasonable limit
if f == nil {
limit := uint(1000000)
f = &filter.F{Limit: &limit}
}
if f.Limit == nil {
limit := uint(1000000)
f.Limit = &limit
}
// Query events from database
idPkTs, err := h.db.QueryForIds(ctx, f)
if err != nil {
return nil, fmt.Errorf("failed to query events: %w", err)
}
for _, item := range idPkTs {
storage.Insert(item.Ts, item.IDHex())
}
storage.Seal()
return storage, nil
}
// protoToFilter converts a proto filter to a nostr filter.
func protoToFilter(pf *commonv1.Filter) *filter.F {
if pf == nil {
return nil
}
f := &filter.F{}
// Convert Limit
if pf.Limit != nil {
limit := uint(*pf.Limit)
f.Limit = &limit
}
return f
}

19
pkg/sync/negentropy/grpc/client.go

@ -10,10 +10,14 @@ import ( @@ -10,10 +10,14 @@ import (
"google.golang.org/grpc/credentials/insecure"
"lol.mleku.dev/log"
negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
negentropyv1 "next.orly.dev/pkg/proto/orlysync/negentropy/v1"
)
// Verify Client implements the Handler interface
var _ negentropyiface.Handler = (*Client)(nil)
// Client is a gRPC client for the negentropy sync service.
type Client struct {
conn *grpc.ClientConn
@ -293,25 +297,16 @@ func (c *Client) GetPeerSyncState(ctx context.Context, peerURL string) (*PeerSyn @@ -293,25 +297,16 @@ func (c *Client) GetPeerSyncState(ctx context.Context, peerURL string) (*PeerSyn
}, true, nil
}
// ClientSession represents an active client negentropy session.
type ClientSession struct {
SubscriptionID string
ConnectionID string
CreatedAt int64
LastActivity int64
RoundCount int32
}
// ListSessions returns active client negentropy sessions.
func (c *Client) ListSessions(ctx context.Context) ([]*ClientSession, error) {
func (c *Client) ListSessions(ctx context.Context) ([]*negentropyiface.ClientSession, error) {
resp, err := c.client.ListSessions(ctx, &commonv1.Empty{})
if err != nil {
return nil, err
}
sessions := make([]*ClientSession, 0, len(resp.Sessions))
sessions := make([]*negentropyiface.ClientSession, 0, len(resp.Sessions))
for _, s := range resp.Sessions {
sessions = append(sessions, &ClientSession{
sessions = append(sessions, &negentropyiface.ClientSession{
SubscriptionID: s.SubscriptionId,
ConnectionID: s.ConnectionId,
CreatedAt: s.CreatedAt,

2
pkg/version/version

@ -1 +1 @@ @@ -1 +1 @@
v0.57.0
v0.57.1

Loading…
Cancel
Save