diff --git a/app/handle-negentropy.go b/app/handle-negentropy.go index 892e6e9..191cff7 100644 --- a/app/handle-negentropy.go +++ b/app/handle-negentropy.go @@ -108,7 +108,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { // Call gRPC service ctx := context.Background() - respMsg, errStr, err := negentropyClient.HandleNegOpen( + respMsg, haveIDs, needIDs, complete, errStr, err := negentropyClient.HandleNegOpen( ctx, l.connectionID, subscriptionID, @@ -124,8 +124,29 @@ func (l *Listener) HandleNegOpen(msg []byte) error { return l.sendNegErr(subscriptionID, errStr) } - // Send NEG-MSG response with initial message - return l.sendNegMsg(subscriptionID, respMsg) + // Log need_ids (events client should send us) + if len(needIDs) > 0 { + log.D.F("NEG-OPEN: client needs to send %d events", len(needIDs)) + } + + // Send NEG-MSG response FIRST (before events) + if err := l.sendNegMsg(subscriptionID, respMsg); err != nil { + return err + } + + // If reconciliation is complete, log it + if complete { + log.D.F("NEG-OPEN: reconciliation complete for %s", subscriptionID) + } + + // AFTER sending response, send events for IDs we have that client needs + if len(haveIDs) > 0 { + if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil { + log.E.F("failed to send events for NEG-OPEN: %v", err) + } + } + + return nil } // HandleNegMsg processes NEG-MSG messages @@ -180,19 +201,12 @@ func (l *Listener) HandleNegMsg(msg []byte) error { return l.sendNegErr(subscriptionID, errStr) } - // If we have events to send (have_ids), fetch and send them - if len(haveIDs) > 0 { - if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil { - log.E.F("failed to send events for NEG-MSG: %v", err) - } - } - // Log need_ids (events client should send us) if len(needIDs) > 0 { log.D.F("NEG-MSG: client needs to send %d events", len(needIDs)) } - // Send NEG-MSG response + // Send NEG-MSG response FIRST (before events) if err := l.sendNegMsg(subscriptionID, respMsg); err != nil { return err } @@ -202,6 +216,13 @@ func (l *Listener) HandleNegMsg(msg []byte) error { log.D.F("NEG-MSG: reconciliation complete for %s", subscriptionID) } + // AFTER sending response, send events for IDs we have that client needs + if len(haveIDs) > 0 { + if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil { + log.E.F("failed to send events for NEG-MSG: %v", err) + } + } + return nil } @@ -275,13 +296,12 @@ func (l *Listener) sendEventsForIDs(subscriptionID string, ids [][]byte) error { log.I.F("NEG: sending %d events for subscription %s", len(ids), subscriptionID) - // Build filter with hex-encoded IDs + // Build filter with binary IDs (32 bytes each) f := &filter.F{} f.Ids = &tag.T{} for _, id := range ids { - // IDs can be 32 bytes (full) or 16 bytes (truncated per NIP-77) - hexID := hex.EncodeToString(id) - f.Ids.T = append(f.Ids.T, []byte(hexID)) + // IDs are binary (32 bytes full or 16 bytes truncated per NIP-77) + f.Ids.T = append(f.Ids.T, id) } // Query events by IDs diff --git a/pkg/proto/orlysync/negentropy/v1/service.pb.go b/pkg/proto/orlysync/negentropy/v1/service.pb.go index cf17adc..31baa56 100644 --- a/pkg/proto/orlysync/negentropy/v1/service.pb.go +++ b/pkg/proto/orlysync/negentropy/v1/service.pb.go @@ -95,8 +95,11 @@ func (x *NegOpenRequest) GetConnectionId() string { // NegOpenResponse returns the initial negentropy response type NegOpenResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - Message []byte `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` // Negentropy protocol message to send back - Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` // Error message if failed + Message []byte `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` // Negentropy protocol message to send back + HaveIds [][]byte `protobuf:"bytes,2,rep,name=have_ids,json=haveIds,proto3" json:"have_ids,omitempty"` // Event IDs we have that client needs (if initial reconciliation completes) + NeedIds [][]byte `protobuf:"bytes,3,rep,name=need_ids,json=needIds,proto3" json:"need_ids,omitempty"` // Event IDs we need from client (if initial reconciliation completes) + Complete bool `protobuf:"varint,4,opt,name=complete,proto3" json:"complete,omitempty"` // True if reconciliation completed in first round + Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` // Error message if failed unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -138,6 +141,27 @@ func (x *NegOpenResponse) GetMessage() []byte { return nil } +func (x *NegOpenResponse) GetHaveIds() [][]byte { + if x != nil { + return x.HaveIds + } + return nil +} + +func (x *NegOpenResponse) GetNeedIds() [][]byte { + if x != nil { + return x.NeedIds + } + return nil +} + +func (x *NegOpenResponse) GetComplete() bool { + if x != nil { + return x.Complete + } + return false +} + func (x *NegOpenResponse) GetError() string { if x != nil { return x.Error @@ -1124,10 +1148,13 @@ const file_orlysync_negentropy_v1_service_proto_rawDesc = "" + "\x0fsubscription_id\x18\x01 \x01(\tR\x0esubscriptionId\x122\n" + "\x06filter\x18\x02 \x01(\v2\x1a.orlysync.common.v1.FilterR\x06filter\x12'\n" + "\x0finitial_message\x18\x03 \x01(\fR\x0einitialMessage\x12#\n" + - "\rconnection_id\x18\x04 \x01(\tR\fconnectionId\"A\n" + + "\rconnection_id\x18\x04 \x01(\tR\fconnectionId\"\x93\x01\n" + "\x0fNegOpenResponse\x12\x18\n" + - "\amessage\x18\x01 \x01(\fR\amessage\x12\x14\n" + - "\x05error\x18\x02 \x01(\tR\x05error\"w\n" + + "\amessage\x18\x01 \x01(\fR\amessage\x12\x19\n" + + "\bhave_ids\x18\x02 \x03(\fR\ahaveIds\x12\x19\n" + + "\bneed_ids\x18\x03 \x03(\fR\aneedIds\x12\x1a\n" + + "\bcomplete\x18\x04 \x01(\bR\bcomplete\x12\x14\n" + + "\x05error\x18\x05 \x01(\tR\x05error\"w\n" + "\rNegMsgRequest\x12'\n" + "\x0fsubscription_id\x18\x01 \x01(\tR\x0esubscriptionId\x12\x18\n" + "\amessage\x18\x02 \x01(\fR\amessage\x12#\n" + diff --git a/pkg/sync/negentropy/grpc/client.go b/pkg/sync/negentropy/grpc/client.go index fa0369e..23b1cdd 100644 --- a/pkg/sync/negentropy/grpc/client.go +++ b/pkg/sync/negentropy/grpc/client.go @@ -102,7 +102,8 @@ func (c *Client) Stop(ctx context.Context) error { } // HandleNegOpen processes a NEG-OPEN message from a client. -func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, filter *commonv1.Filter, initialMessage []byte) ([]byte, string, error) { +// Returns: message, haveIDs, needIDs, complete, errorStr, error +func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID string, filter *commonv1.Filter, initialMessage []byte) ([]byte, [][]byte, [][]byte, bool, string, error) { resp, err := c.client.HandleNegOpen(ctx, &negentropyv1.NegOpenRequest{ ConnectionId: connectionID, SubscriptionId: subscriptionID, @@ -110,9 +111,9 @@ func (c *Client) HandleNegOpen(ctx context.Context, connectionID, subscriptionID InitialMessage: initialMessage, }) if err != nil { - return nil, "", err + return nil, nil, nil, false, "", err } - return resp.Message, resp.Error, nil + return resp.Message, resp.HaveIds, resp.NeedIds, resp.Complete, resp.Error, nil } // HandleNegMsg processes a NEG-MSG message from a client. diff --git a/pkg/sync/negentropy/manager.go b/pkg/sync/negentropy/manager.go index aa85241..42ba3f2 100644 --- a/pkg/sync/negentropy/manager.go +++ b/pkg/sync/negentropy/manager.go @@ -260,6 +260,9 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, var eventsSynced int64 var needIDs []string var haveIDs []string + var waitingForEvents bool + var expectedEvents int + var receivedEvents int64 // Exchange messages until complete for i := 0; i < 20; i++ { // Max 20 rounds @@ -308,10 +311,20 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, haveIDs = append(haveIDs, neg.CollectHaves()...) if complete { - // Send NEG-CLOSE - negClose := []any{"NEG-CLOSE", subID} - conn.WriteJSON(negClose) - goto done + log.I.F("negentropy: reconciliation complete, waiting for %d events from peer", len(needIDs)) + // Don't send NEG-CLOSE yet - wait for EVENT messages + // The server will send events we need after this + waitingForEvents = true + expectedEvents = len(needIDs) + if expectedEvents == 0 { + // No events to receive, close now + negClose := []any{"NEG-CLOSE", subID} + conn.WriteJSON(negClose) + goto done + } + // Set a read deadline to avoid waiting forever + conn.SetReadDeadline(time.Now().Add(30 * time.Second)) + continue } // Send NEG-MSG response @@ -329,7 +342,20 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, case "EVENT": // Peer is sending us an event - eventsSynced++ + if len(msg) >= 3 { + if err := m.storeEventFromJSON(ctx, msg[2]); err != nil { + log.W.F("negentropy: failed to store event from peer: %v", err) + } else { + eventsSynced++ + receivedEvents++ + } + } + // If we've received all expected events (or more), we can close + if waitingForEvents && receivedEvents >= int64(expectedEvents) { + negClose := []any{"NEG-CLOSE", subID} + conn.WriteJSON(negClose) + goto done + } } } diff --git a/pkg/sync/negentropy/server/service.go b/pkg/sync/negentropy/server/service.go index 4bde5ab..f82a309 100644 --- a/pkg/sync/negentropy/server/service.go +++ b/pkg/sync/negentropy/server/service.go @@ -3,6 +3,7 @@ package server import ( "context" + "encoding/hex" "fmt" "google.golang.org/grpc" @@ -75,8 +76,8 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe // If we have an initial message from client, process it var respMsg []byte + var complete bool if len(req.InitialMessage) > 0 { - var complete bool respMsg, complete, err = neg.Reconcile(req.InitialMessage) if err != nil { log.E.F("NEG-OPEN: reconcile failed: %v", err) @@ -99,9 +100,35 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe log.D.F("NEG-OPEN: started negentropy, initial msg len=%d", len(respMsg)) } + // Collect IDs we have that client needs (to send as events) + haveIDs := neg.CollectHaves() + var haveIDBytes [][]byte + for _, id := range haveIDs { + // ID is a hex string, decode to binary + if decoded, err := hex.DecodeString(id); err == nil { + haveIDBytes = append(haveIDBytes, decoded) + } + } + + // Collect IDs we need from client + needIDs := neg.CollectHaveNots() + var needIDBytes [][]byte + for _, id := range needIDs { + // ID is a hex string, decode to binary + if decoded, err := hex.DecodeString(id); err == nil { + needIDBytes = append(needIDBytes, decoded) + } + } + + log.I.F("NEG-OPEN: complete=%v, haves=%d, needs=%d, response len=%d", + complete, len(haveIDs), len(needIDs), len(respMsg)) + return &negentropyv1.NegOpenResponse{ - Message: respMsg, - Error: "", + Message: respMsg, + HaveIds: haveIDBytes, + NeedIds: needIDBytes, + Complete: complete, + Error: "", }, nil } @@ -138,14 +165,20 @@ func (s *Service) HandleNegMsg(ctx context.Context, req *negentropyv1.NegMsgRequ haveIDs := neg.CollectHaves() var haveIDBytes [][]byte for _, id := range haveIDs { - haveIDBytes = append(haveIDBytes, []byte(id)) + // ID is a hex string, decode to binary + if decoded, err := hex.DecodeString(id); err == nil { + haveIDBytes = append(haveIDBytes, decoded) + } } // Collect IDs we need from client needIDs := neg.CollectHaveNots() var needIDBytes [][]byte for _, id := range needIDs { - needIDBytes = append(needIDBytes, []byte(id)) + // ID is a hex string, decode to binary + if decoded, err := hex.DecodeString(id); err == nil { + needIDBytes = append(needIDBytes, decoded) + } } log.I.F("NEG-MSG: complete=%v, haves=%d, needs=%d, response len=%d", diff --git a/pkg/version/version b/pkg/version/version index 97efdeb..a6fe2e6 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.55.6 +v0.55.7 diff --git a/proto/orlysync/negentropy/v1/service.proto b/proto/orlysync/negentropy/v1/service.proto index faaa637..c359754 100644 --- a/proto/orlysync/negentropy/v1/service.proto +++ b/proto/orlysync/negentropy/v1/service.proto @@ -80,7 +80,10 @@ message NegOpenRequest { // NegOpenResponse returns the initial negentropy response message NegOpenResponse { bytes message = 1; // Negentropy protocol message to send back - string error = 2; // Error message if failed + repeated bytes have_ids = 2; // Event IDs we have that client needs (if initial reconciliation completes) + repeated bytes need_ids = 3; // Event IDs we need from client (if initial reconciliation completes) + bool complete = 4; // True if reconciliation completed in first round + string error = 5; // Error message if failed } // NegMsgRequest processes a NEG-MSG from client