@ -8,18 +8,21 @@ import (
"encoders.orly/envelopes/eventenvelope"
"encoders.orly/envelopes/eventenvelope"
"encoders.orly/event"
"encoders.orly/event"
"encoders.orly/filter"
"encoders.orly/filter"
"encoders.orly/hex"
"encoders.orly/kind"
"encoders.orly/kind"
"github.com/coder/websocket"
"github.com/coder/websocket"
"interfaces.orly/publisher"
"interfaces.orly/publisher"
"interfaces.orly/typer"
"interfaces.orly/typer"
"lol.mleku.dev/chk"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"lol.mleku.dev/log"
utils "utils.orly"
)
)
const Type = "socketapi"
const Type = "socketapi"
type Subscription struct {
type Subscription struct {
remote string
remote string
AuthedPubkey [ ] byte
* filter . S
* filter . S
}
}
@ -47,6 +50,9 @@ type W struct {
// associated with this WebSocket connection. It is used to determine which
// associated with this WebSocket connection. It is used to determine which
// notifications or data should be received by the subscriber.
// notifications or data should be received by the subscriber.
Filters * filter . S
Filters * filter . S
// AuthedPubkey is the authenticated pubkey associated with the listener (if any).
AuthedPubkey [ ] byte
}
}
func ( w * W ) Type ( ) ( typeName string ) { return Type }
func ( w * W ) Type ( ) ( typeName string ) { return Type }
@ -113,7 +119,7 @@ func (p *P) Receive(msg typer.T) {
defer p . Mx . Unlock ( )
defer p . Mx . Unlock ( )
if subs , ok := p . Map [ m . Conn ] ; ! ok {
if subs , ok := p . Map [ m . Conn ] ; ! ok {
subs = make ( map [ string ] Subscription )
subs = make ( map [ string ] Subscription )
subs [ m . Id ] = Subscription { S : m . Filters , remote : m . remote }
subs [ m . Id ] = Subscription { S : m . Filters , remote : m . remote , AuthedPubkey : m . AuthedPubkey }
p . Map [ m . Conn ] = subs
p . Map [ m . Conn ] = subs
log . D . C (
log . D . C (
func ( ) string {
func ( ) string {
@ -125,7 +131,7 @@ func (p *P) Receive(msg typer.T) {
} ,
} ,
)
)
} else {
} else {
subs [ m . Id ] = Subscription { S : m . Filters , remote : m . remote }
subs [ m . Id ] = Subscription { S : m . Filters , remote : m . remote , AuthedPubkey : m . AuthedPubkey }
log . D . C (
log . D . C (
func ( ) string {
func ( ) string {
return fmt . Sprintf (
return fmt . Sprintf (
@ -151,52 +157,85 @@ func (p *P) Receive(msg typer.T) {
// for unauthenticated users when events are privileged.
// for unauthenticated users when events are privileged.
func ( p * P ) Deliver ( ev * event . E ) {
func ( p * P ) Deliver ( ev * event . E ) {
var err error
var err error
// Snapshot the deliveries under read lock to avoid holding locks during I/O
p . Mx . RLock ( )
p . Mx . RLock ( )
defer p . Mx . RUnlock ( )
type delivery struct {
w * websocket . Conn
id string
sub Subscription
}
var deliveries [ ] delivery
for w , subs := range p . Map {
for id , subscriber := range subs {
if subscriber . Match ( ev ) {
deliveries = append ( deliveries , delivery { w : w , id : id , sub : subscriber } )
}
}
}
p . Mx . RUnlock ( )
log . D . C (
log . D . C (
func ( ) string {
func ( ) string {
return fmt . Sprintf (
return fmt . Sprintf (
"delivering event %0x to websocket subscribers %d" , ev . ID ,
"delivering event %0x to websocket subscribers %d" , ev . ID ,
len ( p . Map ) ,
len ( deliveries ) ,
)
)
} ,
} ,
)
)
for w , subs := range p . Map {
for _ , d := range deliveries {
for id , subscriber := range subs {
// If the event is privileged, enforce that the subscriber's authed pubkey matches
if ! subscriber . Match ( ev ) {
// either the event pubkey or appears in any 'p' tag of the event.
continue
if kind . IsPrivileged ( ev . Kind ) && len ( d . sub . AuthedPubkey ) > 0 {
}
pk := d . sub . AuthedPubkey
if kind . IsPrivileged ( ev . Kind ) {
allowed := false
// Direct author match
}
if utils . FastEqual ( ev . Pubkey , pk ) {
var res * eventenvelope . Result
allowed = true
if res , err = eventenvelope . NewResultWith ( id , ev ) ; chk . E ( err ) {
} else if ev . Tags != nil {
continue
for _ , pTag := range ev . Tags . GetAll ( [ ] byte ( "p" ) ) {
}
// pTag.Value() returns []byte hex string; decode to bytes
if err = w . Write (
dec , derr := hex . Dec ( string ( pTag . Value ( ) ) )
p . c , websocket . MessageText , res . Marshal ( nil ) ,
if derr != nil {
) ; chk . E ( err ) {
continue
p . removeSubscriber ( w )
}
if err = w . CloseNow ( ) ; chk . E ( err ) {
if utils . FastEqual ( dec , pk ) {
continue
allowed = true
break
}
}
}
}
if ! allowed {
// Skip delivery for this subscriber
continue
continue
}
}
log . D . C (
func ( ) string {
return fmt . Sprintf (
"dispatched event %0x to subscription %s, %s" ,
ev . ID , id , subscriber . remote ,
)
} ,
)
}
}
var res * eventenvelope . Result
if res , err = eventenvelope . NewResultWith ( d . id , ev ) ; chk . E ( err ) {
continue
}
if err = d . w . Write (
p . c , websocket . MessageText , res . Marshal ( nil ) ,
) ; chk . E ( err ) {
// On error, remove the subscriber connection safely
p . removeSubscriber ( d . w )
_ = d . w . CloseNow ( )
continue
}
log . D . C (
func ( ) string {
return fmt . Sprintf (
"dispatched event %0x to subscription %s, %s" ,
ev . ID , d . id , d . sub . remote ,
)
} ,
)
}
}
}
}
// removeSubscriberId removes a specific subscription from a subscriber
// removeSubscriberId removes a specific subscription from a subscriber
// websocket.
// websocket.
func ( p * P ) removeSubscriberId ( ws * websocket . Conn , id string ) {
func ( p * P ) removeSubscriberId ( ws * websocket . Conn , id string ) {
p . Mx . Lock ( )
defer p . Mx . Unlock ( )
var subs map [ string ] Subscription
var subs map [ string ] Subscription
var ok bool
var ok bool
if subs , ok = p . Map [ ws ] ; ok {
if subs , ok = p . Map [ ws ] ; ok {
@ -210,6 +249,8 @@ func (p *P) removeSubscriberId(ws *websocket.Conn, id string) {
// removeSubscriber removes a websocket from the P collection.
// removeSubscriber removes a websocket from the P collection.
func ( p * P ) removeSubscriber ( ws * websocket . Conn ) {
func ( p * P ) removeSubscriber ( ws * websocket . Conn ) {
p . Mx . Lock ( )
defer p . Mx . Unlock ( )
clear ( p . Map [ ws ] )
clear ( p . Map [ ws ] )
delete ( p . Map , ws )
delete ( p . Map , ws )
}
}