@ -3,6 +3,8 @@ package acl
@@ -3,6 +3,8 @@ package acl
import (
"bytes"
"context"
"encoding/hex"
"net/http"
"reflect"
"strings"
"sync"
@ -22,9 +24,9 @@ import (
@@ -22,9 +24,9 @@ import (
"next.orly.dev/pkg/encoders/envelopes/reqenvelope"
"next.orly.dev/pkg/encoders/event"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/hex"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/protocol/publish"
"next.orly.dev/pkg/utils"
"next.orly.dev/pkg/utils/normalize"
@ -108,7 +110,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
@@ -108,7 +110,7 @@ func (f *Follows) Configure(cfg ...any) (err error) {
for _ , v := range ev . Tags . GetAll ( [ ] byte ( "p" ) ) {
// log.I.F("adding follow: %s", v.Value())
var a [ ] byte
if b , e := hex . Dec ( string ( v . Value ( ) ) ) ; chk . E ( e ) {
if b , e := hex . DecodeString ( string ( v . Value ( ) ) ) ; chk . E ( e ) {
continue
} else {
a = b
@ -158,6 +160,8 @@ func (f *Follows) adminRelays() (urls []string) {
@@ -158,6 +160,8 @@ func (f *Follows) adminRelays() (urls []string) {
copy ( admins , f . admins )
f . followsMx . RUnlock ( )
seen := make ( map [ string ] struct { } )
// First, try to get relay URLs from admin kind 10002 events
for _ , adm := range admins {
fl := & filter . F {
Authors : tag . NewFromAny ( adm ) ,
@ -194,6 +198,29 @@ func (f *Follows) adminRelays() (urls []string) {
@@ -194,6 +198,29 @@ func (f *Follows) adminRelays() (urls []string) {
}
}
}
// If no admin relays found, use bootstrap relays as fallback
if len ( urls ) == 0 {
log . I . F ( "no admin relays found in DB, checking bootstrap relays" )
if len ( f . cfg . BootstrapRelays ) > 0 {
log . I . F ( "using bootstrap relays: %v" , f . cfg . BootstrapRelays )
for _ , relay := range f . cfg . BootstrapRelays {
n := string ( normalize . URL ( relay ) )
if n == "" {
log . W . F ( "invalid bootstrap relay URL: %s" , relay )
continue
}
if _ , ok := seen [ n ] ; ok {
continue
}
seen [ n ] = struct { } { }
urls = append ( urls , n )
}
} else {
log . W . F ( "no bootstrap relays configured" )
}
}
return
}
@ -211,7 +238,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
@@ -211,7 +238,7 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
urls := f . adminRelays ( )
log . I . S ( urls )
if len ( urls ) == 0 {
log . W . F ( "follows syncer: no admin relays found in DB (kind 10002)" )
log . W . F ( "follows syncer: no admin relays found in DB (kind 10002) and no bootstrap relays configured " )
return
}
log . T . F (
@ -228,18 +255,45 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
@@ -228,18 +255,45 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
return
default :
}
c , _ , err := websocket . Dial ( ctx , u , nil )
// Create a timeout context for the connection
connCtx , cancel := context . WithTimeout ( ctx , 10 * time . Second )
// Create proper headers for the WebSocket connection
headers := http . Header { }
headers . Set ( "User-Agent" , "ORLY-Relay/0.9.2" )
headers . Set ( "Origin" , "https://orly.dev" )
// Use proper WebSocket dial options
dialOptions := & websocket . DialOptions {
HTTPHeader : headers ,
}
c , _ , err := websocket . Dial ( connCtx , u , dialOptions )
cancel ( )
if err != nil {
log . W . F ( "follows syncer: dial %s failed: %v" , u , err )
if strings . Contains (
err . Error ( ) , "response status code 101 but got 403" ,
) {
// 403 means the relay is not accepting connections from
// us. Forbidden is the meaning, usually used to
// indicate either the IP or user is blocked. so stop
// trying this one.
return
// Handle different types of errors
if strings . Contains ( err . Error ( ) , "response status code 101 but got 403" ) {
// 403 means the relay is not accepting connections from us
// Forbidden is the meaning, usually used to indicate either the IP or user is blocked
// But we should still retry after a longer delay
log . W . F ( "follows syncer: relay %s returned 403, will retry after longer delay" , u )
timer := time . NewTimer ( 5 * time . Minute ) // Wait 5 minutes before retrying 403 errors
select {
case <- ctx . Done ( ) :
return
case <- timer . C :
}
continue
} else if strings . Contains ( err . Error ( ) , "timeout" ) || strings . Contains ( err . Error ( ) , "connection refused" ) {
// Network issues, retry with normal backoff
log . W . F ( "follows syncer: network issue with %s, retrying in %v" , u , backoff )
} else {
// Other errors, retry with normal backoff
log . W . F ( "follows syncer: connection error with %s, retrying in %v" , u , backoff )
}
timer := time . NewTimer ( backoff )
select {
case <- ctx . Done ( ) :
@ -252,21 +306,37 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
@@ -252,21 +306,37 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
continue
}
backoff = time . Second
// send REQ
log . I . F ( "follows syncer: successfully connected to %s" , u )
// send REQ for kind 3 (follow lists), kind 10002 (relay lists), and all events from follows
ff := & filter . S { }
f1 := & filter . F {
Authors : tag . NewFromBytesSlice ( authors ... ) ,
Limit : values . ToUintPointer ( 0 ) ,
Kinds : kind . NewS ( kind . New ( kind . FollowList . K ) ) ,
Limit : values . ToUintPointer ( 100 ) ,
}
f2 := & filter . F {
Authors : tag . NewFromBytesSlice ( authors ... ) ,
Kinds : kind . NewS ( kind . New ( kind . RelayListMetadata . K ) ) ,
Limit : values . ToUintPointer ( 100 ) ,
}
// Add filter for all events from follows (last 30 days)
oneMonthAgo := timestamp . FromUnix ( time . Now ( ) . Add ( - 30 * 24 * time . Hour ) . Unix ( ) )
f3 := & filter . F {
Authors : tag . NewFromBytesSlice ( authors ... ) ,
Since : oneMonthAgo ,
Limit : values . ToUintPointer ( 1000 ) ,
}
* ff = append ( * ff , f1 )
* ff = append ( * ff , f1 , f2 , f3 )
req := reqenvelope . NewFrom ( [ ] byte ( "follows-sync" ) , ff )
if err = c . Write (
ctx , websocket . MessageText , req . Marshal ( nil ) ,
) ; chk . E ( err ) {
log . W . F ( "follows syncer: failed to send REQ to %s: %v" , u , err )
_ = c . Close ( websocket . StatusInternalError , "write failed" )
continue
}
log . T . F ( "sent REQ to %s for follows subscription" , u )
log . I . F ( "follows syncer: sent REQ to %s for kind 3, 10002, and all events (last 30 days) from followed users " , u )
// read loop
for {
select {
@ -294,6 +364,23 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
@@ -294,6 +364,23 @@ func (f *Follows) startSubscriptions(ctx context.Context) {
if ok , err := res . Event . Verify ( ) ; chk . T ( err ) || ! ok {
continue
}
// Process events based on kind
switch res . Event . Kind {
case kind . FollowList . K :
log . I . F ( "follows syncer: received kind 3 (follow list) event from %s on relay %s" ,
hex . EncodeToString ( res . Event . Pubkey ) , u )
// Extract followed pubkeys from 'p' tags in kind 3 events
f . extractFollowedPubkeys ( res . Event )
case kind . RelayListMetadata . K :
log . I . F ( "follows syncer: received kind 10002 (relay list) event from %s on relay %s" ,
hex . EncodeToString ( res . Event . Pubkey ) , u )
default :
// Log all other events from followed users
log . I . F ( "follows syncer: received kind %d event from %s on relay %s" ,
res . Event . Kind , hex . EncodeToString ( res . Event . Pubkey ) , u )
}
if _ , _ , err = f . D . SaveEvent (
ctx , res . Event ,
) ; err != nil {
@ -371,6 +458,20 @@ func (f *Follows) GetFollowedPubkeys() [][]byte {
@@ -371,6 +458,20 @@ func (f *Follows) GetFollowedPubkeys() [][]byte {
return followedPubkeys
}
// extractFollowedPubkeys extracts followed pubkeys from 'p' tags in kind 3 events
func ( f * Follows ) extractFollowedPubkeys ( event * event . E ) {
if event . Kind != kind . FollowList . K {
return
}
// Extract all 'p' tags (followed pubkeys) from the kind 3 event
for _ , tag := range event . Tags . GetAll ( [ ] byte ( "p" ) ) {
if len ( tag . Value ( ) ) == 32 { // Valid pubkey length
f . AddFollow ( tag . Value ( ) )
}
}
}
// AddFollow appends a pubkey to the in-memory follows list if not already present
// and signals the syncer to refresh subscriptions.
func ( f * Follows ) AddFollow ( pub [ ] byte ) {
@ -387,6 +488,7 @@ func (f *Follows) AddFollow(pub []byte) {
@@ -387,6 +488,7 @@ func (f *Follows) AddFollow(pub []byte) {
b := make ( [ ] byte , len ( pub ) )
copy ( b , pub )
f . follows = append ( f . follows , b )
log . I . F ( "follows syncer: added new followed pubkey: %s" , hex . EncodeToString ( pub ) )
// notify syncer if initialized
if f . updated != nil {
select {