diff --git a/app/handle-req.go b/app/handle-req.go index 0eb508b..4f6da2f 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -172,11 +172,12 @@ privCheck: if !cancel { l.publishers.Receive( &W{ - Conn: l.conn, - remote: l.remote, - Id: string(env.Subscription), - Receiver: receiver, - Filters: env.Filters, + Conn: l.conn, + remote: l.remote, + Id: string(env.Subscription), + Receiver: receiver, + Filters: env.Filters, + AuthedPubkey: l.authedPubkey.Load(), }, ) } else { diff --git a/app/publisher.go b/app/publisher.go index daf692c..b6f808a 100644 --- a/app/publisher.go +++ b/app/publisher.go @@ -8,18 +8,21 @@ import ( "encoders.orly/envelopes/eventenvelope" "encoders.orly/event" "encoders.orly/filter" + "encoders.orly/hex" "encoders.orly/kind" "github.com/coder/websocket" "interfaces.orly/publisher" "interfaces.orly/typer" "lol.mleku.dev/chk" "lol.mleku.dev/log" + utils "utils.orly" ) const Type = "socketapi" type Subscription struct { - remote string + remote string + AuthedPubkey []byte *filter.S } @@ -47,6 +50,9 @@ type W struct { // associated with this WebSocket connection. It is used to determine which // notifications or data should be received by the subscriber. Filters *filter.S + + // AuthedPubkey is the authenticated pubkey associated with the listener (if any). + AuthedPubkey []byte } func (w *W) Type() (typeName string) { return Type } @@ -113,7 +119,7 @@ func (p *P) Receive(msg typer.T) { defer p.Mx.Unlock() if subs, ok := p.Map[m.Conn]; !ok { subs = make(map[string]Subscription) - subs[m.Id] = Subscription{S: m.Filters, remote: m.remote} + subs[m.Id] = Subscription{S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey} p.Map[m.Conn] = subs log.D.C( func() string { @@ -125,7 +131,7 @@ func (p *P) Receive(msg typer.T) { }, ) } else { - subs[m.Id] = Subscription{S: m.Filters, remote: m.remote} + subs[m.Id] = Subscription{S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey} log.D.C( func() string { return fmt.Sprintf( @@ -151,52 +157,85 @@ func (p *P) Receive(msg typer.T) { // for unauthenticated users when events are privileged. func (p *P) Deliver(ev *event.E) { var err error + // Snapshot the deliveries under read lock to avoid holding locks during I/O p.Mx.RLock() - defer p.Mx.RUnlock() + type delivery struct { + w *websocket.Conn + id string + sub Subscription + } + var deliveries []delivery + for w, subs := range p.Map { + for id, subscriber := range subs { + if subscriber.Match(ev) { + deliveries = append(deliveries, delivery{w: w, id: id, sub: subscriber}) + } + } + } + p.Mx.RUnlock() log.D.C( func() string { return fmt.Sprintf( "delivering event %0x to websocket subscribers %d", ev.ID, - len(p.Map), + len(deliveries), ) }, ) - for w, subs := range p.Map { - for id, subscriber := range subs { - if !subscriber.Match(ev) { - continue - } - if kind.IsPrivileged(ev.Kind) { - - } - var res *eventenvelope.Result - if res, err = eventenvelope.NewResultWith(id, ev); chk.E(err) { - continue - } - if err = w.Write( - p.c, websocket.MessageText, res.Marshal(nil), - ); chk.E(err) { - p.removeSubscriber(w) - if err = w.CloseNow(); chk.E(err) { - continue + for _, d := range deliveries { + // If the event is privileged, enforce that the subscriber's authed pubkey matches + // either the event pubkey or appears in any 'p' tag of the event. + if kind.IsPrivileged(ev.Kind) && len(d.sub.AuthedPubkey) > 0 { + pk := d.sub.AuthedPubkey + allowed := false + // Direct author match + if utils.FastEqual(ev.Pubkey, pk) { + allowed = true + } else if ev.Tags != nil { + for _, pTag := range ev.Tags.GetAll([]byte("p")) { + // pTag.Value() returns []byte hex string; decode to bytes + dec, derr := hex.Dec(string(pTag.Value())) + if derr != nil { + continue + } + if utils.FastEqual(dec, pk) { + allowed = true + break + } } + } + if !allowed { + // Skip delivery for this subscriber continue } - log.D.C( - func() string { - return fmt.Sprintf( - "dispatched event %0x to subscription %s, %s", - ev.ID, id, subscriber.remote, - ) - }, - ) } + var res *eventenvelope.Result + if res, err = eventenvelope.NewResultWith(d.id, ev); chk.E(err) { + continue + } + if err = d.w.Write( + p.c, websocket.MessageText, res.Marshal(nil), + ); chk.E(err) { + // On error, remove the subscriber connection safely + p.removeSubscriber(d.w) + _ = d.w.CloseNow() + continue + } + log.D.C( + func() string { + return fmt.Sprintf( + "dispatched event %0x to subscription %s, %s", + ev.ID, d.id, d.sub.remote, + ) + }, + ) } } // removeSubscriberId removes a specific subscription from a subscriber // websocket. func (p *P) removeSubscriberId(ws *websocket.Conn, id string) { + p.Mx.Lock() + defer p.Mx.Unlock() var subs map[string]Subscription var ok bool if subs, ok = p.Map[ws]; ok { @@ -210,6 +249,8 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) { // removeSubscriber removes a websocket from the P collection. func (p *P) removeSubscriber(ws *websocket.Conn) { + p.Mx.Lock() + defer p.Mx.Unlock() clear(p.Map[ws]) delete(p.Map, ws) }