You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

513 lines
12 KiB

package nrc
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"git.mleku.dev/mleku/nostr/crypto/encryption"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/hex"
"git.mleku.dev/mleku/nostr/encoders/kind"
"git.mleku.dev/mleku/nostr/encoders/tag"
"git.mleku.dev/mleku/nostr/encoders/timestamp"
"git.mleku.dev/mleku/nostr/interfaces/signer"
"git.mleku.dev/mleku/nostr/ws"
"github.com/google/uuid"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
)
// Client connects to a private relay through the NRC tunnel.
type Client struct {
uri *ConnectionURI
sessionID string
rendezvousConn *ws.Client
responseSub *ws.Subscription
conversationKey []byte
clientSigner signer.I
// pending maps request event IDs to response channels.
pending map[string]chan *ResponseMessage
pendingMu sync.Mutex
// subscriptions maps subscription IDs to event channels.
subscriptions map[string]chan *event.E
subscriptionsMu sync.Mutex
ctx context.Context
cancel context.CancelFunc
}
// NewClient creates a new NRC client from a connection URI.
func NewClient(connectionURI string) (*Client, error) {
uri, err := ParseConnectionURI(connectionURI)
if err != nil {
return nil, fmt.Errorf("invalid URI: %w", err)
}
if uri.AuthMode != AuthModeSecret {
return nil, fmt.Errorf("CAT authentication not yet supported in client")
}
ctx, cancel := context.WithCancel(context.Background())
return &Client{
uri: uri,
sessionID: uuid.New().String(),
conversationKey: uri.GetConversationKey(),
clientSigner: uri.GetClientSigner(),
pending: make(map[string]chan *ResponseMessage),
subscriptions: make(map[string]chan *event.E),
ctx: ctx,
cancel: cancel,
}, nil
}
// Connect establishes the connection to the rendezvous relay.
func (c *Client) Connect(ctx context.Context) error {
// Connect to rendezvous relay
conn, err := ws.RelayConnect(ctx, c.uri.RendezvousRelay)
if chk.E(err) {
return fmt.Errorf("%w: %v", ErrRendezvousConnectionFailed, err)
}
c.rendezvousConn = conn
// Subscribe to response events
clientPubkeyHex := hex.Enc(c.clientSigner.Pub())
sub, err := conn.Subscribe(
ctx,
filter.NewS(&filter.F{
Kinds: kind.NewS(kind.New(KindNRCResponse)),
Tags: tag.NewS(
tag.NewFromAny("p", clientPubkeyHex),
),
Since: &timestamp.T{V: time.Now().Unix()},
}),
)
if chk.E(err) {
conn.Close()
return fmt.Errorf("subscription failed: %w", err)
}
c.responseSub = sub
// Start response handler
go c.handleResponses()
log.I.F("NRC client connected to %s via %s",
hex.Enc(c.uri.RelayPubkey), c.uri.RendezvousRelay)
return nil
}
// Close closes the client connection.
func (c *Client) Close() {
c.cancel()
if c.responseSub != nil {
c.responseSub.Unsub()
}
if c.rendezvousConn != nil {
c.rendezvousConn.Close()
}
// Close all pending channels
c.pendingMu.Lock()
for _, ch := range c.pending {
close(ch)
}
c.pending = make(map[string]chan *ResponseMessage)
c.pendingMu.Unlock()
// Close all subscription channels
c.subscriptionsMu.Lock()
for _, ch := range c.subscriptions {
close(ch)
}
c.subscriptions = make(map[string]chan *event.E)
c.subscriptionsMu.Unlock()
}
// handleResponses processes incoming NRC response events.
func (c *Client) handleResponses() {
for {
select {
case <-c.ctx.Done():
return
case ev := <-c.responseSub.Events:
if ev == nil {
return
}
c.processResponse(ev)
}
}
}
// processResponse decrypts and routes a response event.
func (c *Client) processResponse(ev *event.E) {
// Decrypt content
decrypted, err := encryption.Decrypt(c.conversationKey, string(ev.Content))
if err != nil {
log.W.F("NRC response decryption failed: %v", err)
return
}
// Parse response
var resp struct {
Type string `json:"type"`
Payload []any `json:"payload"`
}
if err := json.Unmarshal([]byte(decrypted), &resp); err != nil {
log.W.F("NRC response parse failed: %v", err)
return
}
// Extract request event ID for routing
var requestEventID string
eTag := ev.Tags.GetFirst([]byte("e"))
if eTag != nil && eTag.Len() >= 2 {
requestEventID = string(eTag.ValueHex())
}
// Route based on response type
switch resp.Type {
case "EVENT":
c.handleEventResponse(resp.Payload)
case "EOSE":
c.handleEOSEResponse(resp.Payload, requestEventID)
case "OK":
c.handleOKResponse(resp.Payload, requestEventID)
case "NOTICE":
c.handleNoticeResponse(resp.Payload)
case "CLOSED":
c.handleClosedResponse(resp.Payload)
case "COUNT":
c.handleCountResponse(resp.Payload, requestEventID)
case "AUTH":
c.handleAuthResponse(resp.Payload, requestEventID)
}
}
// handleEventResponse routes an EVENT to the appropriate subscription.
func (c *Client) handleEventResponse(payload []any) {
if len(payload) < 3 {
return
}
// Payload: ["EVENT", "<sub_id>", {...event...}]
subID, ok := payload[1].(string)
if !ok {
return
}
c.subscriptionsMu.Lock()
ch, exists := c.subscriptions[subID]
c.subscriptionsMu.Unlock()
if !exists {
return
}
// Parse event from payload
eventData, ok := payload[2].(map[string]any)
if !ok {
return
}
eventBytes, err := json.Marshal(eventData)
if err != nil {
return
}
var ev event.E
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return
}
select {
case ch <- &ev:
default:
// Channel full, drop event
}
}
// handleEOSEResponse handles an EOSE response.
func (c *Client) handleEOSEResponse(payload []any, requestEventID string) {
// Route to pending request
c.pendingMu.Lock()
ch, exists := c.pending[requestEventID]
c.pendingMu.Unlock()
if exists {
resp := &ResponseMessage{Type: "EOSE", Payload: payload}
select {
case ch <- resp:
default:
}
}
}
// handleOKResponse handles an OK response.
func (c *Client) handleOKResponse(payload []any, requestEventID string) {
c.pendingMu.Lock()
ch, exists := c.pending[requestEventID]
c.pendingMu.Unlock()
if exists {
resp := &ResponseMessage{Type: "OK", Payload: payload}
select {
case ch <- resp:
default:
}
}
}
// handleNoticeResponse logs a NOTICE.
func (c *Client) handleNoticeResponse(payload []any) {
if len(payload) >= 2 {
if msg, ok := payload[1].(string); ok {
log.W.F("NRC NOTICE: %s", msg)
}
}
}
// handleClosedResponse handles a subscription close.
func (c *Client) handleClosedResponse(payload []any) {
if len(payload) >= 2 {
if subID, ok := payload[1].(string); ok {
c.subscriptionsMu.Lock()
if ch, exists := c.subscriptions[subID]; exists {
close(ch)
delete(c.subscriptions, subID)
}
c.subscriptionsMu.Unlock()
}
}
}
// handleCountResponse handles a COUNT response.
func (c *Client) handleCountResponse(payload []any, requestEventID string) {
c.pendingMu.Lock()
ch, exists := c.pending[requestEventID]
c.pendingMu.Unlock()
if exists {
resp := &ResponseMessage{Type: "COUNT", Payload: payload}
select {
case ch <- resp:
default:
}
}
}
// handleAuthResponse handles an AUTH challenge.
func (c *Client) handleAuthResponse(payload []any, requestEventID string) {
c.pendingMu.Lock()
ch, exists := c.pending[requestEventID]
c.pendingMu.Unlock()
if exists {
resp := &ResponseMessage{Type: "AUTH", Payload: payload}
select {
case ch <- resp:
default:
}
}
}
// sendRequest sends an NRC request and waits for response.
func (c *Client) sendRequest(ctx context.Context, msgType string, payload []any) (*ResponseMessage, error) {
// Build request content
reqContent := struct {
Type string `json:"type"`
Payload []any `json:"payload"`
}{
Type: msgType,
Payload: payload,
}
contentBytes, err := json.Marshal(reqContent)
if err != nil {
return nil, fmt.Errorf("marshal failed: %w", err)
}
// Encrypt content
encrypted, err := encryption.Encrypt(c.conversationKey, contentBytes, nil)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrEncryptionFailed, err)
}
// Build request event
reqEvent := &event.E{
Content: []byte(encrypted),
CreatedAt: time.Now().Unix(),
Kind: KindNRCRequest,
Tags: tag.NewS(
tag.NewFromAny("p", hex.Enc(c.uri.RelayPubkey)),
tag.NewFromAny("encryption", "nip44_v2"),
tag.NewFromAny("session", c.sessionID),
),
}
// Sign with client key
if err := reqEvent.Sign(c.clientSigner); chk.E(err) {
return nil, fmt.Errorf("signing failed: %w", err)
}
// Set up response channel
responseCh := make(chan *ResponseMessage, 1)
requestEventID := string(hex.Enc(reqEvent.ID[:]))
c.pendingMu.Lock()
c.pending[requestEventID] = responseCh
c.pendingMu.Unlock()
defer func() {
c.pendingMu.Lock()
delete(c.pending, requestEventID)
c.pendingMu.Unlock()
}()
// Publish request
if err := c.rendezvousConn.Publish(ctx, reqEvent); chk.E(err) {
return nil, fmt.Errorf("publish failed: %w", err)
}
// Wait for response
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-responseCh:
if resp == nil {
return nil, fmt.Errorf("response channel closed")
}
return resp, nil
}
}
// Publish publishes an event to the private relay.
func (c *Client) Publish(ctx context.Context, ev *event.E) (bool, string, error) {
// Convert event to JSON for payload
eventBytes, err := json.Marshal(ev)
if err != nil {
return false, "", fmt.Errorf("marshal event failed: %w", err)
}
var eventMap map[string]any
if err := json.Unmarshal(eventBytes, &eventMap); err != nil {
return false, "", fmt.Errorf("unmarshal event failed: %w", err)
}
payload := []any{"EVENT", eventMap}
resp, err := c.sendRequest(ctx, "EVENT", payload)
if err != nil {
return false, "", err
}
// Parse OK response: ["OK", "<event_id>", <success>, "<message>"]
if resp.Type != "OK" || len(resp.Payload) < 4 {
return false, "", fmt.Errorf("unexpected response type: %s", resp.Type)
}
success, _ := resp.Payload[2].(bool)
message, _ := resp.Payload[3].(string)
return success, message, nil
}
// Subscribe creates a subscription to the private relay.
func (c *Client) Subscribe(ctx context.Context, subID string, filters ...*filter.F) (<-chan *event.E, error) {
// Build payload: ["REQ", "<sub_id>", filter1, filter2, ...]
payload := []any{"REQ", subID}
for _, f := range filters {
filterBytes, err := json.Marshal(f)
if err != nil {
return nil, fmt.Errorf("marshal filter failed: %w", err)
}
var filterMap map[string]any
if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
return nil, fmt.Errorf("unmarshal filter failed: %w", err)
}
payload = append(payload, filterMap)
}
// Create event channel for this subscription
eventCh := make(chan *event.E, 100)
c.subscriptionsMu.Lock()
c.subscriptions[subID] = eventCh
c.subscriptionsMu.Unlock()
// Send request (don't wait for EOSE, events will come asynchronously)
go func() {
reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
_, err := c.sendRequest(reqCtx, "REQ", payload)
if err != nil {
log.W.F("NRC subscribe failed: %v", err)
}
}()
return eventCh, nil
}
// Unsubscribe closes a subscription.
func (c *Client) Unsubscribe(ctx context.Context, subID string) error {
// Remove from local tracking
c.subscriptionsMu.Lock()
if ch, exists := c.subscriptions[subID]; exists {
close(ch)
delete(c.subscriptions, subID)
}
c.subscriptionsMu.Unlock()
// Send CLOSE to relay
payload := []any{"CLOSE", subID}
_, err := c.sendRequest(ctx, "CLOSE", payload)
return err
}
// Count sends a COUNT request to the private relay.
func (c *Client) Count(ctx context.Context, subID string, filters ...*filter.F) (int64, error) {
// Build payload: ["COUNT", "<sub_id>", filter1, filter2, ...]
payload := []any{"COUNT", subID}
for _, f := range filters {
filterBytes, err := json.Marshal(f)
if err != nil {
return 0, fmt.Errorf("marshal filter failed: %w", err)
}
var filterMap map[string]any
if err := json.Unmarshal(filterBytes, &filterMap); err != nil {
return 0, fmt.Errorf("unmarshal filter failed: %w", err)
}
payload = append(payload, filterMap)
}
resp, err := c.sendRequest(ctx, "COUNT", payload)
if err != nil {
return 0, err
}
// Parse COUNT response: ["COUNT", "<sub_id>", {"count": N}]
if resp.Type != "COUNT" || len(resp.Payload) < 3 {
return 0, fmt.Errorf("unexpected response type: %s", resp.Type)
}
countData, ok := resp.Payload[2].(map[string]any)
if !ok {
return 0, fmt.Errorf("invalid count response")
}
count, ok := countData["count"].(float64)
if !ok {
return 0, fmt.Errorf("missing count field")
}
return int64(count), nil
}
// RelayURL returns a pseudo-URL for this NRC connection.
func (c *Client) RelayURL() string {
return "nrc://" + string(hex.Enc(c.uri.RelayPubkey))
}