|
|
|
|
@ -28,6 +28,7 @@ type Subscription struct {
@@ -28,6 +28,7 @@ type Subscription struct {
|
|
|
|
|
remote string |
|
|
|
|
AuthedPubkey []byte |
|
|
|
|
Receiver event.C // Channel for delivering events to this subscription
|
|
|
|
|
AuthRequired bool // Whether ACL requires authentication for privileged events
|
|
|
|
|
*filter.S |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -58,6 +59,11 @@ type W struct {
@@ -58,6 +59,11 @@ type W struct {
|
|
|
|
|
|
|
|
|
|
// AuthedPubkey is the authenticated pubkey associated with the listener (if any).
|
|
|
|
|
AuthedPubkey []byte |
|
|
|
|
|
|
|
|
|
// AuthRequired indicates whether the ACL in operation requires auth. If
|
|
|
|
|
// this is set to true, the publisher will not publish privileged or other
|
|
|
|
|
// restricted events to non-authed listeners, otherwise, it will.
|
|
|
|
|
AuthRequired bool |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *W) Type() (typeName string) { return Type } |
|
|
|
|
@ -87,7 +93,6 @@ func NewPublisher(c context.Context) (publisher *P) {
@@ -87,7 +93,6 @@ func NewPublisher(c context.Context) (publisher *P) {
|
|
|
|
|
|
|
|
|
|
func (p *P) Type() (typeName string) { return Type } |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Receive handles incoming messages to manage websocket listener subscriptions
|
|
|
|
|
// and associated filters.
|
|
|
|
|
//
|
|
|
|
|
@ -120,12 +125,14 @@ func (p *P) Receive(msg typer.T) {
@@ -120,12 +125,14 @@ func (p *P) Receive(msg typer.T) {
|
|
|
|
|
if subs, ok := p.Map[m.Conn]; !ok { |
|
|
|
|
subs = make(map[string]Subscription) |
|
|
|
|
subs[m.Id] = Subscription{ |
|
|
|
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, Receiver: m.Receiver, |
|
|
|
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, |
|
|
|
|
Receiver: m.Receiver, AuthRequired: m.AuthRequired, |
|
|
|
|
} |
|
|
|
|
p.Map[m.Conn] = subs |
|
|
|
|
} else { |
|
|
|
|
subs[m.Id] = Subscription{ |
|
|
|
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, Receiver: m.Receiver, |
|
|
|
|
S: m.Filters, remote: m.remote, AuthedPubkey: m.AuthedPubkey, |
|
|
|
|
Receiver: m.Receiver, AuthRequired: m.AuthRequired, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -174,11 +181,14 @@ func (p *P) Deliver(ev *event.E) {
@@ -174,11 +181,14 @@ func (p *P) Deliver(ev *event.E) {
|
|
|
|
|
for _, d := range deliveries { |
|
|
|
|
// If the event is privileged, enforce that the subscriber's authed pubkey matches
|
|
|
|
|
// either the event pubkey or appears in any 'p' tag of the event.
|
|
|
|
|
if kind.IsPrivileged(ev.Kind) { |
|
|
|
|
// Only check authentication if AuthRequired is true (ACL is active)
|
|
|
|
|
if kind.IsPrivileged(ev.Kind) && d.sub.AuthRequired { |
|
|
|
|
if len(d.sub.AuthedPubkey) == 0 { |
|
|
|
|
// Not authenticated - cannot see privileged events
|
|
|
|
|
log.D.F("subscription delivery DENIED for privileged event %s to %s (not authenticated)", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote) |
|
|
|
|
log.D.F( |
|
|
|
|
"subscription delivery DENIED for privileged event %s to %s (not authenticated)", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote, |
|
|
|
|
) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -201,8 +211,10 @@ func (p *P) Deliver(ev *event.E) {
@@ -201,8 +211,10 @@ func (p *P) Deliver(ev *event.E) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if !allowed { |
|
|
|
|
log.D.F("subscription delivery DENIED for privileged event %s to %s (auth mismatch)", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote) |
|
|
|
|
log.D.F( |
|
|
|
|
"subscription delivery DENIED for privileged event %s to %s (auth mismatch)", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote, |
|
|
|
|
) |
|
|
|
|
// Skip delivery for this subscriber
|
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
@ -225,26 +237,37 @@ func (p *P) Deliver(ev *event.E) {
@@ -225,26 +237,37 @@ func (p *P) Deliver(ev *event.E) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if hasPrivateTag { |
|
|
|
|
canSeePrivate := p.canSeePrivateEvent(d.sub.AuthedPubkey, privatePubkey, d.sub.remote) |
|
|
|
|
canSeePrivate := p.canSeePrivateEvent( |
|
|
|
|
d.sub.AuthedPubkey, privatePubkey, d.sub.remote, |
|
|
|
|
) |
|
|
|
|
if !canSeePrivate { |
|
|
|
|
log.D.F("subscription delivery DENIED for private event %s to %s (unauthorized)", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote) |
|
|
|
|
log.D.F( |
|
|
|
|
"subscription delivery DENIED for private event %s to %s (unauthorized)", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote, |
|
|
|
|
) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
log.D.F("subscription delivery ALLOWED for private event %s to %s (authorized)", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote) |
|
|
|
|
log.D.F( |
|
|
|
|
"subscription delivery ALLOWED for private event %s to %s (authorized)", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote, |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Send event to the subscription's receiver channel
|
|
|
|
|
// The consumer goroutine (in handle-req.go) will read from this channel
|
|
|
|
|
// and forward it to the client via the write channel
|
|
|
|
|
log.D.F("attempting delivery of event %s (kind=%d) to subscription %s @ %s", |
|
|
|
|
hex.Enc(ev.ID), ev.Kind, d.id, d.sub.remote) |
|
|
|
|
log.D.F( |
|
|
|
|
"attempting delivery of event %s (kind=%d) to subscription %s @ %s", |
|
|
|
|
hex.Enc(ev.ID), ev.Kind, d.id, d.sub.remote, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// Check if receiver channel exists
|
|
|
|
|
if d.sub.Receiver == nil { |
|
|
|
|
log.E.F("subscription %s has nil receiver channel for %s", d.id, d.sub.remote) |
|
|
|
|
log.E.F( |
|
|
|
|
"subscription %s has nil receiver channel for %s", d.id, |
|
|
|
|
d.sub.remote, |
|
|
|
|
) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -253,11 +276,15 @@ func (p *P) Deliver(ev *event.E) {
@@ -253,11 +276,15 @@ func (p *P) Deliver(ev *event.E) {
|
|
|
|
|
case <-p.c.Done(): |
|
|
|
|
continue |
|
|
|
|
case d.sub.Receiver <- ev: |
|
|
|
|
log.D.F("subscription delivery QUEUED: event=%s to=%s sub=%s", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote, d.id) |
|
|
|
|
log.D.F( |
|
|
|
|
"subscription delivery QUEUED: event=%s to=%s sub=%s", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote, d.id, |
|
|
|
|
) |
|
|
|
|
case <-time.After(DefaultWriteTimeout): |
|
|
|
|
log.E.F("subscription delivery TIMEOUT: event=%s to=%s sub=%s", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote, d.id) |
|
|
|
|
log.E.F( |
|
|
|
|
"subscription delivery TIMEOUT: event=%s to=%s sub=%s", |
|
|
|
|
hex.Enc(ev.ID), d.sub.remote, d.id, |
|
|
|
|
) |
|
|
|
|
// Receiver channel is full - subscription consumer is stuck or slow
|
|
|
|
|
// The subscription should be removed by the cleanup logic
|
|
|
|
|
} |
|
|
|
|
@ -285,7 +312,9 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
@@ -285,7 +312,9 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
|
|
|
|
|
|
|
|
|
|
// SetWriteChan stores the write channel for a websocket connection
|
|
|
|
|
// If writeChan is nil, the entry is removed from the map
|
|
|
|
|
func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan publish.WriteRequest) { |
|
|
|
|
func (p *P) SetWriteChan( |
|
|
|
|
conn *websocket.Conn, writeChan chan publish.WriteRequest, |
|
|
|
|
) { |
|
|
|
|
p.Mx.Lock() |
|
|
|
|
defer p.Mx.Unlock() |
|
|
|
|
if writeChan == nil { |
|
|
|
|
@ -296,7 +325,9 @@ func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan publish.WriteReque
@@ -296,7 +325,9 @@ func (p *P) SetWriteChan(conn *websocket.Conn, writeChan chan publish.WriteReque
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetWriteChan returns the write channel for a websocket connection
|
|
|
|
|
func (p *P) GetWriteChan(conn *websocket.Conn) (chan publish.WriteRequest, bool) { |
|
|
|
|
func (p *P) GetWriteChan(conn *websocket.Conn) ( |
|
|
|
|
chan publish.WriteRequest, bool, |
|
|
|
|
) { |
|
|
|
|
p.Mx.RLock() |
|
|
|
|
defer p.Mx.RUnlock() |
|
|
|
|
ch, ok := p.WriteChans[conn] |
|
|
|
|
@ -313,7 +344,9 @@ func (p *P) removeSubscriber(ws *websocket.Conn) {
@@ -313,7 +344,9 @@ func (p *P) removeSubscriber(ws *websocket.Conn) {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// canSeePrivateEvent checks if the authenticated user can see an event with a private tag
|
|
|
|
|
func (p *P) canSeePrivateEvent(authedPubkey, privatePubkey []byte, remote string) (canSee bool) { |
|
|
|
|
func (p *P) canSeePrivateEvent( |
|
|
|
|
authedPubkey, privatePubkey []byte, remote string, |
|
|
|
|
) (canSee bool) { |
|
|
|
|
// If no authenticated user, deny access
|
|
|
|
|
if len(authedPubkey) == 0 { |
|
|
|
|
return false |
|
|
|
|
|