Browse Source

Fix NIP-77 negentropy server-side event pushing (v0.55.7)

- Add have_ids, need_ids, complete fields to NegOpenResponse proto
- Fix ID encoding in server: decode hex strings to binary before sending
- Fix event sending order: send NEG-MSG response before EVENT envelopes
- Client now waits for expected events after reconciliation completes
- Store received events from peer in sync client message loop

This enables bidirectional event sync via NIP-77 negentropy. The server
can now push events it has that the client needs, in addition to the
client pushing events to the server.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
main
woikos 4 months ago
parent
commit
1711345c64
No known key found for this signature in database
  1. 50
      app/handle-negentropy.go
  2. 37
      pkg/proto/orlysync/negentropy/v1/service.pb.go
  3. 7
      pkg/sync/negentropy/grpc/client.go
  4. 36
      pkg/sync/negentropy/manager.go
  5. 43
      pkg/sync/negentropy/server/service.go
  6. 2
      pkg/version/version
  7. 5
      proto/orlysync/negentropy/v1/service.proto

50
app/handle-negentropy.go

@ -108,7 +108,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { @@ -108,7 +108,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error {
// Call gRPC service
ctx := context.Background()
respMsg, errStr, err := negentropyClient.HandleNegOpen(
respMsg, haveIDs, needIDs, complete, errStr, err := negentropyClient.HandleNegOpen(
ctx,
l.connectionID,
subscriptionID,
@ -124,8 +124,29 @@ func (l *Listener) HandleNegOpen(msg []byte) error { @@ -124,8 +124,29 @@ func (l *Listener) HandleNegOpen(msg []byte) error {
return l.sendNegErr(subscriptionID, errStr)
}
// Send NEG-MSG response with initial message
return l.sendNegMsg(subscriptionID, respMsg)
// Log need_ids (events client should send us)
if len(needIDs) > 0 {
log.D.F("NEG-OPEN: client needs to send %d events", len(needIDs))
}
// Send NEG-MSG response FIRST (before events)
if err := l.sendNegMsg(subscriptionID, respMsg); err != nil {
return err
}
// If reconciliation is complete, log it
if complete {
log.D.F("NEG-OPEN: reconciliation complete for %s", subscriptionID)
}
// AFTER sending response, send events for IDs we have that client needs
if len(haveIDs) > 0 {
if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
log.E.F("failed to send events for NEG-OPEN: %v", err)
}
}
return nil
}
// HandleNegMsg processes NEG-MSG messages
@ -180,19 +201,12 @@ func (l *Listener) HandleNegMsg(msg []byte) error { @@ -180,19 +201,12 @@ func (l *Listener) HandleNegMsg(msg []byte) error {
return l.sendNegErr(subscriptionID, errStr)
}
// If we have events to send (have_ids), fetch and send them
if len(haveIDs) > 0 {
if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
log.E.F("failed to send events for NEG-MSG: %v", err)
}
}
// Log need_ids (events client should send us)
if len(needIDs) > 0 {
log.D.F("NEG-MSG: client needs to send %d events", len(needIDs))
}
// Send NEG-MSG response
// Send NEG-MSG response FIRST (before events)
if err := l.sendNegMsg(subscriptionID, respMsg); err != nil {
return err
}
@ -202,6 +216,13 @@ func (l *Listener) HandleNegMsg(msg []byte) error { @@ -202,6 +216,13 @@ func (l *Listener) HandleNegMsg(msg []byte) error {
log.D.F("NEG-MSG: reconciliation complete for %s", subscriptionID)
}
// AFTER sending response, send events for IDs we have that client needs
if len(haveIDs) > 0 {
if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
log.E.F("failed to send events for NEG-MSG: %v", err)
}
}
return nil
}
@ -275,13 +296,12 @@ func (l *Listener) sendEventsForIDs(subscriptionID string, ids [][]byte) error { @@ -275,13 +296,12 @@ func (l *Listener) sendEventsForIDs(subscriptionID string, ids [][]byte) error {
log.I.F("NEG: sending %d events for subscription %s", len(ids), subscriptionID)
// Build filter with hex-encoded IDs
// Build filter with binary IDs (32 bytes each)
f := &filter.F{}
f.Ids = &tag.T{}
for _, id := range ids {
// IDs can be 32 bytes (full) or 16 bytes (truncated per NIP-77)
hexID := hex.EncodeToString(id)
f.Ids.T = append(f.Ids.T, []byte(hexID))
// IDs are binary (32 bytes full or 16 bytes truncated per NIP-77)
f.Ids.T = append(f.Ids.T, id)
}
// Query events by IDs

37
pkg/proto/orlysync/negentropy/v1/service.pb.go

@ -95,8 +95,11 @@ func (x *NegOpenRequest) GetConnectionId() string { @@ -95,8 +95,11 @@ func (x *NegOpenRequest) GetConnectionId() string {
// NegOpenResponse returns the initial negentropy response
type NegOpenResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Message []byte `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` // Negentropy protocol message to send back
Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` // Error message if failed
Message []byte `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` // Negentropy protocol message to send back
HaveIds [][]byte `protobuf:"bytes,2,rep,name=have_ids,json=haveIds,proto3" json:"have_ids,omitempty"` // Event IDs we have that client needs (if initial reconciliation completes)
NeedIds [][]byte `protobuf:"bytes,3,rep,name=need_ids,json=needIds,proto3" json:"need_ids,omitempty"` // Event IDs we need from client (if initial reconciliation completes)
Complete bool `protobuf:"varint,4,opt,name=complete,proto3" json:"complete,omitempty"` // True if reconciliation completed in first round
Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` // Error message if failed
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -138,6 +141,27 @@ func (x *NegOpenResponse) GetMessage() []byte { @@ -138,6 +141,27 @@ func (x *NegOpenResponse) GetMessage() []byte {
return nil
}
func (x *NegOpenResponse) GetHaveIds() [][]byte {
if x != nil {
return x.HaveIds
}
return nil
}
func (x *NegOpenResponse) GetNeedIds() [][]byte {
if x != nil {
return x.NeedIds
}
return nil
}
func (x *NegOpenResponse) GetComplete() bool {
if x != nil {
return x.Complete
}
return false
}
func (x *NegOpenResponse) GetError() string {
if x != nil {
return x.Error
@ -1124,10 +1148,13 @@ const file_orlysync_negentropy_v1_service_proto_rawDesc = "" + @@ -1124,10 +1148,13 @@ const file_orlysync_negentropy_v1_service_proto_rawDesc = "" +
"\x0fsubscription_id\x18\x01 \x01(\tR\x0esubscriptionId\x122\n" +
"\x06filter\x18\x02 \x01(\v2\x1a.orlysync.common.v1.FilterR\x06filter\x12'\n" +
"\x0finitial_message\x18\x03 \x01(\fR\x0einitialMessage\x12#\n" +
"\rconnection_id\x18\x04 \x01(\tR\fconnectionId\"A\n" +
"\rconnection_id\x18\x04 \x01(\tR\fconnectionId\"\x93\x01\n" +
"\x0fNegOpenResponse\x12\x18\n" +
"\amessage\x18\x01 \x01(\fR\amessage\x12\x14\n" +
"\x05error\x18\x02 \x01(\tR\x05error\"w\n" +
"\amessage\x18\x01 \x01(\fR\amessage\x12\x19\n" +
"\bhave_ids\x18\x02 \x03(\fR\ahaveIds\x12\x19\n" +
"\bneed_ids\x18\x03 \x03(\fR\aneedIds\x12\x1a\n" +
"\bcomplete\x18\x04 \x01(\bR\bcomplete\x12\x14\n" +
"\x05error\x18\x05 \x01(\tR\x05error\"w\n" +
"\rNegMsgRequest\x12'\n" +
"\x0fsubscription_id\x18\x01 \x01(\tR\x0esubscriptionId\x12\x18\n" +
"\amessage\x18\x02 \x01(\fR\amessage\x12#\n" +

7
pkg/sync/negentropy/grpc/client.go

@ -102,7 +102,8 @@ func (c *Client) Stop(ctx context.Context) error { @@ -102,7 +102,8 @@ func (c *Client) Stop(ctx context.Context) error {
}
// HandleNegOpen processes a NEG-OPEN message from a client.
func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, filter *commonv1.Filter, initialMessage []byte) ([]byte, string, error) {
// Returns: message, haveIDs, needIDs, complete, errorStr, error
func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, filter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) {
resp, err := c.client.HandleNegOpen(ctx, &negentropyv1.NegOpenRequest{
ConnectionId: connectionID,
SubscriptionId: subscriptionID,
@ -110,9 +111,9 @@ func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID @@ -110,9 +111,9 @@ func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID
InitialMessage: initialMessage,
})
if err != nil {
return nil, "", err
return nil, nil, nil, false, "", err
}
return resp.Message, resp.Error, nil
return resp.Message, resp.HaveIds, resp.NeedIds, resp.Complete, resp.Error, nil
}
// HandleNegMsg processes a NEG-MSG message from a client.

36
pkg/sync/negentropy/manager.go

@ -260,6 +260,9 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, @@ -260,6 +260,9 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64,
var eventsSynced int64
var needIDs []string
var haveIDs []string
var waitingForEvents bool
var expectedEvents int
var receivedEvents int64
// Exchange messages until complete
for i := 0; i < 20; i++ { // Max 20 rounds
@ -308,10 +311,20 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, @@ -308,10 +311,20 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64,
haveIDs = append(haveIDs, neg.CollectHaves()...)
if complete {
// Send NEG-CLOSE
negClose := []any{"NEG-CLOSE", subID}
conn.WriteJSON(negClose)
goto done
log.I.F("negentropy: reconciliation complete, waiting for %d events from peer", len(needIDs))
// Don't send NEG-CLOSE yet - wait for EVENT messages
// The server will send events we need after this
waitingForEvents = true
expectedEvents = len(needIDs)
if expectedEvents == 0 {
// No events to receive, close now
negClose := []any{"NEG-CLOSE", subID}
conn.WriteJSON(negClose)
goto done
}
// Set a read deadline to avoid waiting forever
conn.SetReadDeadline(time.Now().Add(30 * time.Second))
continue
}
// Send NEG-MSG response
@ -329,7 +342,20 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, @@ -329,7 +342,20 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64,
case "EVENT":
// Peer is sending us an event
eventsSynced++
if len(msg) >= 3 {
if err := m.storeEventFromJSON(ctx, msg[2]); err != nil {
log.W.F("negentropy: failed to store event from peer: %v", err)
} else {
eventsSynced++
receivedEvents++
}
}
// If we've received all expected events (or more), we can close
if waitingForEvents && receivedEvents >= int64(expectedEvents) {
negClose := []any{"NEG-CLOSE", subID}
conn.WriteJSON(negClose)
goto done
}
}
}

43
pkg/sync/negentropy/server/service.go

@ -3,6 +3,7 @@ package server @@ -3,6 +3,7 @@ package server
import (
"context"
"encoding/hex"
"fmt"
"google.golang.org/grpc"
@ -75,8 +76,8 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe @@ -75,8 +76,8 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe
// If we have an initial message from client, process it
var respMsg []byte
var complete bool
if len(req.InitialMessage) > 0 {
var complete bool
respMsg, complete, err = neg.Reconcile(req.InitialMessage)
if err != nil {
log.E.F("NEG-OPEN: reconcile failed: %v", err)
@ -99,9 +100,35 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe @@ -99,9 +100,35 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe
log.D.F("NEG-OPEN: started negentropy, initial msg len=%d", len(respMsg))
}
// Collect IDs we have that client needs (to send as events)
haveIDs := neg.CollectHaves()
var haveIDBytes [][]byte
for _, id := range haveIDs {
// ID is a hex string, decode to binary
if decoded, err := hex.DecodeString(id); err == nil {
haveIDBytes = append(haveIDBytes, decoded)
}
}
// Collect IDs we need from client
needIDs := neg.CollectHaveNots()
var needIDBytes [][]byte
for _, id := range needIDs {
// ID is a hex string, decode to binary
if decoded, err := hex.DecodeString(id); err == nil {
needIDBytes = append(needIDBytes, decoded)
}
}
log.I.F("NEG-OPEN: complete=%v, haves=%d, needs=%d, response len=%d",
complete, len(haveIDs), len(needIDs), len(respMsg))
return &negentropyv1.NegOpenResponse{
Message: respMsg,
Error: "",
Message: respMsg,
HaveIds: haveIDBytes,
NeedIds: needIDBytes,
Complete: complete,
Error: "",
}, nil
}
@ -138,14 +165,20 @@ func (s *Service) HandleNegMsg(ctx context.Context, req *negentropyv1.NegMsgRequ @@ -138,14 +165,20 @@ func (s *Service) HandleNegMsg(ctx context.Context, req *negentropyv1.NegMsgRequ
haveIDs := neg.CollectHaves()
var haveIDBytes [][]byte
for _, id := range haveIDs {
haveIDBytes = append(haveIDBytes, []byte(id))
// ID is a hex string, decode to binary
if decoded, err := hex.DecodeString(id); err == nil {
haveIDBytes = append(haveIDBytes, decoded)
}
}
// Collect IDs we need from client
needIDs := neg.CollectHaveNots()
var needIDBytes [][]byte
for _, id := range needIDs {
needIDBytes = append(needIDBytes, []byte(id))
// ID is a hex string, decode to binary
if decoded, err := hex.DecodeString(id); err == nil {
needIDBytes = append(needIDBytes, decoded)
}
}
log.I.F("NEG-MSG: complete=%v, haves=%d, needs=%d, response len=%d",

2
pkg/version/version

@ -1 +1 @@ @@ -1 +1 @@
v0.55.6
v0.55.7

5
proto/orlysync/negentropy/v1/service.proto

@ -80,7 +80,10 @@ message NegOpenRequest { @@ -80,7 +80,10 @@ message NegOpenRequest {
// NegOpenResponse returns the initial negentropy response
message NegOpenResponse {
bytes message = 1; // Negentropy protocol message to send back
string error = 2; // Error message if failed
repeated bytes have_ids = 2; // Event IDs we have that client needs (if initial reconciliation completes)
repeated bytes need_ids = 3; // Event IDs we need from client (if initial reconciliation completes)
bool complete = 4; // True if reconciliation completed in first round
string error = 5; // Error message if failed
}
// NegMsgRequest processes a NEG-MSG from client

Loading…
Cancel
Save