Browse Source

Add spider functionality for relay crawling, marker management, and new SpiderMode config.

- Introduced the `spider` package for relay crawling, including periodic tasks and one-time sync capabilities.
- Added `SetMarker`, `GetMarker`, `HasMarker`, and `DeleteMarker` methods in the database for marker management.
- Updated configuration with `SpiderMode` and `SpiderFrequency` options to enable and customize spider behavior.
- Integrated `spider` initialization into the main application flow.
- Improved tag handling, NIP-70 compliance, and protected tag validation in event processing.
- Removed unnecessary logging and replaced `errorf` with `fmt.Errorf` for better error handling.
- Incremented version to `v0.5.0`.
main
mleku 4 months ago
parent
commit
20fbce9263
No known key found for this signature in database
  1. 2
      app/config/config.go
  2. 14
      app/handle-event.go
  3. 10
      app/handle-message.go
  4. 216
      app/handle-req.go
  5. 4
      app/handle-websocket.go
  6. 7
      main.go
  7. 10
      pkg/acl/follows.go
  8. 62
      pkg/database/markers.go
  9. 3
      pkg/encoders/tag/tags.go
  10. 372
      pkg/spider/spider.go
  11. 2
      pkg/version/version

2
app/config/config.go

@ -40,6 +40,8 @@ type C struct {
Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"` Admins []string `env:"ORLY_ADMINS" usage:"comma-separated list of admin npubs"`
Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"` Owners []string `env:"ORLY_OWNERS" usage:"comma-separated list of owner npubs, who have full control of the relay for wipe and restart and other functions"`
ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows,none" default:"none"` ACLMode string `env:"ORLY_ACL_MODE" usage:"ACL mode: follows,none" default:"none"`
SpiderMode string `env:"ORLY_SPIDER_MODE" usage:"spider mode: none,follow" default:"none"`
SpiderFrequency time.Duration `env:"ORLY_SPIDER_FREQUENCY" usage:"spider frequency in seconds" default:"1h"`
} }
// New creates and initializes a new configuration object for the relay // New creates and initializes a new configuration object for the relay

14
app/handle-event.go

@ -103,6 +103,20 @@ func (l *Listener) HandleEvent(msg []byte) (err error) {
// user has write access or better, continue // user has write access or better, continue
// log.D.F("user has %s access", accessLevel) // log.D.F("user has %s access", accessLevel)
} }
// check for protected tag (NIP-70)
protectedTag := env.E.Tags.GetFirst([]byte("-"))
if protectedTag != nil && acl.Registry.Active.Load() != "none" {
// check that the pubkey of the event matches the authed pubkey
if !utils.FastEqual(l.authedPubkey.Load(), env.E.Pubkey) {
if err = Ok.Blocked(
l, env,
"protected tag may only be published by user authed to the same pubkey",
); chk.E(err) {
return
}
return
}
}
// if the event is a delete, process the delete // if the event is a delete, process the delete
if env.E.Kind == kind.EventDeletion.K { if env.E.Kind == kind.EventDeletion.K {
if err = l.HandleDelete(env); err != nil { if err = l.HandleDelete(env); err != nil {

10
app/handle-message.go

@ -1,9 +1,9 @@
package app package app
import ( import (
"fmt"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/pkg/encoders/envelopes" "next.orly.dev/pkg/encoders/envelopes"
"next.orly.dev/pkg/encoders/envelopes/authenvelope" "next.orly.dev/pkg/encoders/envelopes/authenvelope"
"next.orly.dev/pkg/encoders/envelopes/closeenvelope" "next.orly.dev/pkg/encoders/envelopes/closeenvelope"
@ -13,7 +13,7 @@ import (
) )
func (l *Listener) HandleMessage(msg []byte, remote string) { func (l *Listener) HandleMessage(msg []byte, remote string) {
log.D.F("%s received message:\n%s", remote, msg) // log.D.F("%s received message:\n%s", remote, msg)
var err error var err error
var t string var t string
var rem []byte var rem []byte
@ -32,7 +32,7 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
// log.D.F("authenvelope: %s %s", remote, rem) // log.D.F("authenvelope: %s %s", remote, rem)
err = l.HandleAuth(rem) err = l.HandleAuth(rem)
default: default:
err = errorf.E("unknown envelope type %s\n%s", t, rem) err = fmt.Errorf("unknown envelope type %s\n%s", t, rem)
} }
} }
if err != nil { if err != nil {
@ -43,7 +43,7 @@ func (l *Listener) HandleMessage(msg []byte, remote string) {
// ) // )
// }, // },
// ) // )
if err = noticeenvelope.NewFrom(err.Error()).Write(l); chk.E(err) { if err = noticeenvelope.NewFrom(err.Error()).Write(l); err != nil {
return return
} }
} }

216
app/handle-req.go

@ -9,7 +9,7 @@ import (
"github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk" "lol.mleku.dev/chk"
"lol.mleku.dev/log" "lol.mleku.dev/log"
acl "next.orly.dev/pkg/acl" "next.orly.dev/pkg/acl"
"next.orly.dev/pkg/encoders/envelopes/authenvelope" "next.orly.dev/pkg/encoders/envelopes/authenvelope"
"next.orly.dev/pkg/encoders/envelopes/closedenvelope" "next.orly.dev/pkg/encoders/envelopes/closedenvelope"
"next.orly.dev/pkg/encoders/envelopes/eoseenvelope" "next.orly.dev/pkg/encoders/envelopes/eoseenvelope"
@ -22,21 +22,21 @@ import (
"next.orly.dev/pkg/encoders/kind" "next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/reason" "next.orly.dev/pkg/encoders/reason"
"next.orly.dev/pkg/encoders/tag" "next.orly.dev/pkg/encoders/tag"
utils "next.orly.dev/pkg/utils" "next.orly.dev/pkg/utils"
"next.orly.dev/pkg/utils/normalize" "next.orly.dev/pkg/utils/normalize"
"next.orly.dev/pkg/utils/pointers" "next.orly.dev/pkg/utils/pointers"
) )
func (l *Listener) HandleReq(msg []byte) (err error) { func (l *Listener) HandleReq(msg []byte) (err error) {
log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg) // log.T.F("HandleReq: START processing from %s\n%s\n", l.remote, msg)
var rem []byte // var rem []byte
env := reqenvelope.New() env := reqenvelope.New()
if rem, err = env.Unmarshal(msg); chk.E(err) { if _, err = env.Unmarshal(msg); chk.E(err) {
return normalize.Error.Errorf(err.Error()) return normalize.Error.Errorf(err.Error())
} }
if len(rem) > 0 { // if len(rem) > 0 {
log.I.F("REQ extra bytes: '%s'", rem) // log.I.F("REQ extra bytes: '%s'", rem)
} // }
// send a challenge to the client to auth if an ACL is active // send a challenge to the client to auth if an ACL is active
if acl.Registry.Active.Load() != "none" { if acl.Registry.Active.Load() != "none" {
if err = authenvelope.NewChallengeWith(l.challenge.Load()). if err = authenvelope.NewChallengeWith(l.challenge.Load()).
@ -57,59 +57,59 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
return return
default: default:
// user has read access or better, continue // user has read access or better, continue
log.D.F("user has %s access", accessLevel) // log.D.F("user has %s access", accessLevel)
} }
var events event.S var events event.S
for _, f := range *env.Filters { for _, f := range *env.Filters {
idsLen := 0 // idsLen := 0
kindsLen := 0 // kindsLen := 0
authorsLen := 0 // authorsLen := 0
tagsLen := 0 // tagsLen := 0
if f != nil { // if f != nil {
if f.Ids != nil { // if f.Ids != nil {
idsLen = f.Ids.Len() // idsLen = f.Ids.Len()
} // }
if f.Kinds != nil { // if f.Kinds != nil {
kindsLen = f.Kinds.Len() // kindsLen = f.Kinds.Len()
} // }
if f.Authors != nil { // if f.Authors != nil {
authorsLen = f.Authors.Len() // authorsLen = f.Authors.Len()
} // }
if f.Tags != nil { // if f.Tags != nil {
tagsLen = f.Tags.Len() // tagsLen = f.Tags.Len()
} // }
} // }
log.T.F( // log.T.F(
"REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d", // "REQ %s: filter summary ids=%d kinds=%d authors=%d tags=%d",
env.Subscription, idsLen, kindsLen, authorsLen, tagsLen, // env.Subscription, idsLen, kindsLen, authorsLen, tagsLen,
) // )
if f != nil && f.Authors != nil && f.Authors.Len() > 0 { if f != nil && f.Authors != nil && f.Authors.Len() > 0 {
var authors []string var authors []string
for _, a := range f.Authors.T { for _, a := range f.Authors.T {
authors = append(authors, hex.Enc(a)) authors = append(authors, hex.Enc(a))
} }
log.T.F("REQ %s: authors=%v", env.Subscription, authors) // log.T.F("REQ %s: authors=%v", env.Subscription, authors)
} }
if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 { // if f != nil && f.Kinds != nil && f.Kinds.Len() > 0 {
log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16()) // log.T.F("REQ %s: kinds=%v", env.Subscription, f.Kinds.ToUint16())
} // }
if f != nil && f.Ids != nil && f.Ids.Len() > 0 { // if f != nil && f.Ids != nil && f.Ids.Len() > 0 {
var ids []string // var ids []string
for _, id := range f.Ids.T { // for _, id := range f.Ids.T {
ids = append(ids, hex.Enc(id)) // ids = append(ids, hex.Enc(id))
} // }
var lim any // // var lim any
if pointers.Present(f.Limit) { // // if pointers.Present(f.Limit) {
lim = *f.Limit // // lim = *f.Limit
} else { // // } else {
lim = nil // // lim = nil
} // // }
log.T.F( // // log.T.F(
"REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription, // // "REQ %s: ids filter count=%d ids=%v limit=%v", env.Subscription,
f.Ids.Len(), ids, lim, // // f.Ids.Len(), ids, lim,
) // // )
} // }
if pointers.Present(f.Limit) { if f != nil && pointers.Present(f.Limit) {
if *f.Limit == 0 { if *f.Limit == 0 {
continue continue
} }
@ -119,15 +119,15 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
context.Background(), 30*time.Second, context.Background(), 30*time.Second,
) )
defer cancel() defer cancel()
log.T.F( // log.T.F(
"HandleReq: About to QueryEvents for %s, main context done: %v", // "HandleReq: About to QueryEvents for %s, main context done: %v",
l.remote, l.ctx.Err() != nil, // l.remote, l.ctx.Err() != nil,
) // )
if events, err = l.QueryEvents(queryCtx, f); chk.E(err) { if events, err = l.QueryEvents(queryCtx, f); chk.E(err) {
if errors.Is(err, badger.ErrDBClosed) { if errors.Is(err, badger.ErrDBClosed) {
return return
} }
log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err) // log.T.F("HandleReq: QueryEvents error for %s: %v", l.remote, err)
err = nil err = nil
} }
defer func() { defer func() {
@ -135,23 +135,23 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
ev.Free() ev.Free()
} }
}() }()
log.T.F( // log.T.F(
"HandleReq: QueryEvents completed for %s, found %d events", // "HandleReq: QueryEvents completed for %s, found %d events",
l.remote, len(events), // l.remote, len(events),
) // )
} }
var tmp event.S var tmp event.S
privCheck: privCheck:
for _, ev := range events { for _, ev := range events {
if kind.IsPrivileged(ev.Kind) && if kind.IsPrivileged(ev.Kind) &&
accessLevel != "admin" { // admins can see all events accessLevel != "admin" { // admins can see all events
log.T.C( // log.T.C(
func() string { // func() string {
return fmt.Sprintf( // return fmt.Sprintf(
"checking privileged event %0x", ev.ID, // "checking privileged event %0x", ev.ID,
) // )
}, // },
) // )
pk := l.authedPubkey.Load() pk := l.authedPubkey.Load()
if pk == nil { if pk == nil {
continue continue
@ -175,26 +175,26 @@ privCheck:
continue continue
} }
if utils.FastEqual(pt, pk) { if utils.FastEqual(pt, pk) {
log.T.C( // log.T.C(
func() string { // func() string {
return fmt.Sprintf( // return fmt.Sprintf(
"privileged event %s is for logged in pubkey %0x", // "privileged event %s is for logged in pubkey %0x",
ev.ID, pk, // ev.ID, pk,
) // )
}, // },
) // )
tmp = append(tmp, ev) tmp = append(tmp, ev)
continue privCheck continue privCheck
} }
} }
log.T.C( // log.T.C(
func() string { // func() string {
return fmt.Sprintf( // return fmt.Sprintf(
"privileged event %s does not contain the logged in pubkey %0x", // "privileged event %s does not contain the logged in pubkey %0x",
ev.ID, pk, // ev.ID, pk,
) // )
}, // },
) // )
} else { } else {
tmp = append(tmp, ev) tmp = append(tmp, ev)
} }
@ -202,19 +202,19 @@ privCheck:
events = tmp events = tmp
seen := make(map[string]struct{}) seen := make(map[string]struct{})
for _, ev := range events { for _, ev := range events {
log.D.C( // log.D.C(
func() string { // func() string {
return fmt.Sprintf( // return fmt.Sprintf(
"REQ %s: sending EVENT id=%s kind=%d", env.Subscription, // "REQ %s: sending EVENT id=%s kind=%d", env.Subscription,
hex.Enc(ev.ID), ev.Kind, // hex.Enc(ev.ID), ev.Kind,
) // )
}, // },
) // )
log.T.C( // log.T.C(
func() string { // func() string {
return fmt.Sprintf("event:\n%s\n", ev.Serialize()) // return fmt.Sprintf("event:\n%s\n", ev.Serialize())
}, // },
) // )
var res *eventenvelope.Result var res *eventenvelope.Result
if res, err = eventenvelope.NewResultWith( if res, err = eventenvelope.NewResultWith(
env.Subscription, ev, env.Subscription, ev,
@ -229,7 +229,7 @@ privCheck:
} }
// write the EOSE to signal to the client that all events found have been // write the EOSE to signal to the client that all events found have been
// sent. // sent.
log.T.F("sending EOSE to %s", l.remote) // log.T.F("sending EOSE to %s", l.remote)
if err = eoseenvelope.NewFrom(env.Subscription). if err = eoseenvelope.NewFrom(env.Subscription).
Write(l); chk.E(err) { Write(l); chk.E(err) {
return return
@ -237,10 +237,10 @@ privCheck:
// if the query was for just Ids, we know there can't be any more results, // if the query was for just Ids, we know there can't be any more results,
// so cancel the subscription. // so cancel the subscription.
cancel := true cancel := true
log.T.F( // log.T.F(
"REQ %s: computing cancel/subscription; events_sent=%d", // "REQ %s: computing cancel/subscription; events_sent=%d",
env.Subscription, len(events), // env.Subscription, len(events),
) // )
var subbedFilters filter.S var subbedFilters filter.S
for _, f := range *env.Filters { for _, f := range *env.Filters {
if f.Ids.Len() < 1 { if f.Ids.Len() < 1 {
@ -255,10 +255,10 @@ privCheck:
} }
notFounds = append(notFounds, id) notFounds = append(notFounds, id)
} }
log.T.F( // log.T.F(
"REQ %s: ids outstanding=%d of %d", env.Subscription, // "REQ %s: ids outstanding=%d of %d", env.Subscription,
len(notFounds), f.Ids.Len(), // len(notFounds), f.Ids.Len(),
) // )
// if all were found, don't add to subbedFilters // if all were found, don't add to subbedFilters
if len(notFounds) == 0 { if len(notFounds) == 0 {
continue continue
@ -295,6 +295,6 @@ privCheck:
return return
} }
} }
log.T.F("HandleReq: COMPLETED processing from %s", l.remote) // log.T.F("HandleReq: COMPLETED processing from %s", l.remote)
return return
} }

4
app/handle-websocket.go

@ -97,7 +97,7 @@ whitelist:
} }
var typ websocket.MessageType var typ websocket.MessageType
var msg []byte var msg []byte
log.T.F("waiting for message from %s", remote) // log.T.F("waiting for message from %s", remote)
// Create a read context with timeout to prevent indefinite blocking // Create a read context with timeout to prevent indefinite blocking
readCtx, readCancel := context.WithTimeout(ctx, DefaultReadTimeout) readCtx, readCancel := context.WithTimeout(ctx, DefaultReadTimeout)
@ -152,7 +152,7 @@ whitelist:
writeCancel() writeCancel()
continue continue
} }
log.T.F("received message from %s: %s", remote, string(msg)) // log.T.F("received message from %s: %s", remote, string(msg))
go listener.HandleMessage(msg, remote) go listener.HandleMessage(msg, remote)
} }
} }

7
main.go

@ -18,6 +18,7 @@ import (
"next.orly.dev/app/config" "next.orly.dev/app/config"
"next.orly.dev/pkg/acl" "next.orly.dev/pkg/acl"
"next.orly.dev/pkg/database" "next.orly.dev/pkg/database"
"next.orly.dev/pkg/spider"
"next.orly.dev/pkg/version" "next.orly.dev/pkg/version"
) )
@ -158,6 +159,12 @@ func main() {
} }
acl.Registry.Syncer() acl.Registry.Syncer()
// Initialize and start spider functionality if enabled
spiderCtx, spiderCancel := context.WithCancel(ctx)
spiderInstance := spider.New(db, cfg, spiderCtx, spiderCancel)
spiderInstance.Start()
defer spiderInstance.Stop()
// Start HTTP pprof server if enabled // Start HTTP pprof server if enabled
if cfg.PprofHTTP { if cfg.PprofHTTP {
pprofAddr := fmt.Sprintf("%s:%d", cfg.Listen, 6060) pprofAddr := fmt.Sprintf("%s:%d", cfg.Listen, 6060)

10
pkg/acl/follows.go

@ -360,6 +360,16 @@ func (f *Follows) Syncer() {
f.updated <- struct{}{} f.updated <- struct{}{}
} }
// GetFollowedPubkeys returns a copy of the followed pubkeys list
func (f *Follows) GetFollowedPubkeys() [][]byte {
f.followsMx.RLock()
defer f.followsMx.RUnlock()
followedPubkeys := make([][]byte, len(f.follows))
copy(followedPubkeys, f.follows)
return followedPubkeys
}
func init() { func init() {
log.T.F("registering follows ACL") log.T.F("registering follows ACL")
Registry.Register(new(Follows)) Registry.Register(new(Follows))

62
pkg/database/markers.go

@ -0,0 +1,62 @@
package database
import (
"github.com/dgraph-io/badger/v4"
"lol.mleku.dev/chk"
)
const (
markerPrefix = "MARKER:"
)
// SetMarker stores an arbitrary marker in the database
func (d *D) SetMarker(key string, value []byte) (err error) {
markerKey := []byte(markerPrefix + key)
err = d.Update(func(txn *badger.Txn) error {
return txn.Set(markerKey, value)
})
return
}
// GetMarker retrieves an arbitrary marker from the database
func (d *D) GetMarker(key string) (value []byte, err error) {
markerKey := []byte(markerPrefix + key)
err = d.View(func(txn *badger.Txn) error {
item, err := txn.Get(markerKey)
if err != nil {
return err
}
value, err = item.ValueCopy(nil)
return err
})
return
}
// HasMarker checks if a marker exists in the database
func (d *D) HasMarker(key string) (exists bool) {
markerKey := []byte(markerPrefix + key)
err := d.View(func(txn *badger.Txn) error {
_, err := txn.Get(markerKey)
return err
})
exists = !chk.E(err)
return
}
// DeleteMarker removes a marker from the database
func (d *D) DeleteMarker(key string) (err error) {
markerKey := []byte(markerPrefix + key)
err = d.Update(func(txn *badger.Txn) error {
return txn.Delete(markerKey)
})
return
}

3
pkg/encoders/tag/tags.go

@ -24,7 +24,8 @@ func NewSWithCap(c int) (s *S) {
func (s *S) Len() int { func (s *S) Len() int {
if s == nil { if s == nil {
panic("tags cannot be used without initialization") return 0
// panic("tags cannot be used without initialization")
} }
return len(*s) return len(*s)
} }

372
pkg/spider/spider.go

@ -0,0 +1,372 @@
package spider
import (
"context"
"strconv"
"strings"
"time"
"lol.mleku.dev/chk"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
"next.orly.dev/pkg/acl"
"next.orly.dev/pkg/database"
"next.orly.dev/pkg/database/indexes/types"
"next.orly.dev/pkg/encoders/filter"
"next.orly.dev/pkg/encoders/kind"
"next.orly.dev/pkg/encoders/tag"
"next.orly.dev/pkg/encoders/timestamp"
"next.orly.dev/pkg/protocol/ws"
"next.orly.dev/pkg/utils/normalize"
)
const (
OneTimeSpiderSyncMarker = "spider_one_time_sync_completed"
SpiderLastScanMarker = "spider_last_scan_time"
)
type Spider struct {
db *database.D
cfg *config.C
ctx context.Context
cancel context.CancelFunc
}
func New(
db *database.D, cfg *config.C, ctx context.Context,
cancel context.CancelFunc,
) *Spider {
return &Spider{
db: db,
cfg: cfg,
ctx: ctx,
cancel: cancel,
}
}
// Start initializes the spider functionality based on configuration
func (s *Spider) Start() {
if s.cfg.SpiderMode != "follows" {
log.D.Ln("Spider mode is not set to 'follows', skipping spider functionality")
return
}
log.I.Ln("Starting spider in follow mode")
// Check if one-time sync has been completed
if !s.db.HasMarker(OneTimeSpiderSyncMarker) {
log.I.Ln("Performing one-time spider sync back one month")
go s.performOneTimeSync()
} else {
log.D.Ln("One-time spider sync already completed, skipping")
}
// Start periodic scanning
go s.startPeriodicScanning()
}
// performOneTimeSync performs the initial sync going back one month
func (s *Spider) performOneTimeSync() {
defer func() {
// Mark the one-time sync as completed
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
if err := s.db.SetMarker(
OneTimeSpiderSyncMarker, []byte(timestamp),
); err != nil {
log.E.F("Failed to set one-time sync marker: %v", err)
} else {
log.I.Ln("One-time spider sync completed and marked")
}
}()
// Calculate the time one month ago
oneMonthAgo := time.Now().AddDate(0, -1, 0)
log.I.F("Starting one-time spider sync from %v", oneMonthAgo)
// Perform the sync (placeholder - would need actual implementation based on follows)
if err := s.performSync(oneMonthAgo, time.Now()); err != nil {
log.E.F("One-time spider sync failed: %v", err)
return
}
log.I.Ln("One-time spider sync completed successfully")
}
// startPeriodicScanning starts the regular scanning process
func (s *Spider) startPeriodicScanning() {
ticker := time.NewTicker(s.cfg.SpiderFrequency)
defer ticker.Stop()
log.I.F("Starting periodic spider scanning every %v", s.cfg.SpiderFrequency)
for {
select {
case <-s.ctx.Done():
log.D.Ln("Spider periodic scanning stopped due to context cancellation")
return
case <-ticker.C:
s.performPeriodicScan()
}
}
}
// performPeriodicScan performs the regular scan of the last two hours (double the frequency window)
func (s *Spider) performPeriodicScan() {
// Calculate the scanning window (double the frequency period)
scanWindow := s.cfg.SpiderFrequency * 2
scanStart := time.Now().Add(-scanWindow)
scanEnd := time.Now()
log.D.F(
"Performing periodic spider scan from %v to %v (window: %v)", scanStart,
scanEnd, scanWindow,
)
if err := s.performSync(scanStart, scanEnd); err != nil {
log.E.F("Periodic spider scan failed: %v", err)
return
}
// Update the last scan marker
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
if err := s.db.SetMarker(
SpiderLastScanMarker, []byte(timestamp),
); err != nil {
log.E.F("Failed to update last scan marker: %v", err)
}
log.D.F("Periodic spider scan completed successfully")
}
// performSync performs the actual sync operation for the given time range
func (s *Spider) performSync(startTime, endTime time.Time) error {
log.D.F(
"Spider sync from %v to %v - starting implementation", startTime,
endTime,
)
// 1. Check ACL mode is set to "follows"
if s.cfg.ACLMode != "follows" {
log.D.F(
"Spider sync skipped - ACL mode is not 'follows' (current: %s)",
s.cfg.ACLMode,
)
return nil
}
// 2. Get the list of followed users from the ACL system
followedPubkeys, err := s.getFollowedPubkeys()
if err != nil {
return err
}
if len(followedPubkeys) == 0 {
log.D.Ln("Spider sync: no followed pubkeys found")
return nil
}
log.D.F("Spider sync: found %d followed pubkeys", len(followedPubkeys))
// 3. Discover relay lists from followed users
relayURLs, err := s.discoverRelays(followedPubkeys)
if err != nil {
return err
}
if len(relayURLs) == 0 {
log.W.Ln("Spider sync: no relays discovered from followed users")
return nil
}
log.I.F("Spider sync: discovered %d relay URLs", len(relayURLs))
// 4. Query each relay for events from followed pubkeys in the time range
eventsFound := 0
for _, relayURL := range relayURLs {
count, err := s.queryRelayForEvents(
relayURL, followedPubkeys, startTime, endTime,
)
if err != nil {
log.E.F("Spider sync: error querying relay %s: %v", relayURL, err)
continue
}
eventsFound += count
}
log.I.F(
"Spider sync completed: found %d new events from %d relays",
eventsFound, len(relayURLs),
)
return nil
}
// getFollowedPubkeys retrieves the list of followed pubkeys from the ACL system
func (s *Spider) getFollowedPubkeys() ([][]byte, error) {
// Access the ACL registry to get the current ACL instance
var followedPubkeys [][]byte
// Get all ACL instances and find the active one
for _, aclInstance := range acl.Registry.ACL {
if aclInstance.Type() == acl.Registry.Active.Load() {
// Cast to *Follows to access the follows field
if followsACL, ok := aclInstance.(*acl.Follows); ok {
followedPubkeys = followsACL.GetFollowedPubkeys()
break
}
}
}
return followedPubkeys, nil
}
// discoverRelays discovers relay URLs from kind 10002 events of followed users
func (s *Spider) discoverRelays(followedPubkeys [][]byte) ([]string, error) {
seen := make(map[string]struct{})
var urls []string
for _, pubkey := range followedPubkeys {
// Query for kind 10002 (RelayListMetadata) events from this pubkey
fl := &filter.F{
Authors: tag.NewFromAny(pubkey),
Kinds: kind.NewS(kind.New(kind.RelayListMetadata.K)),
}
idxs, err := database.GetIndexesFromFilter(fl)
if chk.E(err) {
continue
}
var sers types.Uint40s
for _, idx := range idxs {
s, err := s.db.GetSerialsByRange(idx)
if chk.E(err) {
continue
}
sers = append(sers, s...)
}
for _, ser := range sers {
ev, err := s.db.FetchEventBySerial(ser)
if chk.E(err) || ev == nil {
continue
}
// Extract relay URLs from 'r' tags
for _, v := range ev.Tags.GetAll([]byte("r")) {
u := string(v.Value())
n := string(normalize.URL(u))
if n == "" {
continue
}
if _, ok := seen[n]; ok {
continue
}
seen[n] = struct{}{}
urls = append(urls, n)
}
}
}
return urls, nil
}
// queryRelayForEvents connects to a relay and queries for events from followed pubkeys
func (s *Spider) queryRelayForEvents(
relayURL string, followedPubkeys [][]byte, startTime, endTime time.Time,
) (int, error) {
log.T.F("Spider sync: querying relay %s", relayURL)
// Connect to the relay with a timeout context
ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
defer cancel()
client, err := ws.RelayConnect(ctx, relayURL)
if err != nil {
return 0, err
}
defer client.Close()
// Create filter for the time range and followed pubkeys
f := &filter.F{
Authors: tag.NewFromBytesSlice(followedPubkeys...),
Since: timestamp.FromUnix(startTime.Unix()),
Until: timestamp.FromUnix(endTime.Unix()),
Limit: func() *uint { l := uint(1000); return &l }(), // Limit to avoid overwhelming
}
// Subscribe to get events
sub, err := client.Subscribe(ctx, filter.NewS(f))
if err != nil {
return 0, err
}
defer sub.Unsub()
eventsCount := 0
eventsSaved := 0
timeout := time.After(10 * time.Second) // Timeout for receiving events
for {
select {
case <-ctx.Done():
log.T.F(
"Spider sync: context done for relay %s, saved %d/%d events",
relayURL, eventsSaved, eventsCount,
)
return eventsSaved, nil
case <-timeout:
log.T.F(
"Spider sync: timeout for relay %s, saved %d/%d events",
relayURL, eventsSaved, eventsCount,
)
return eventsSaved, nil
case <-sub.EndOfStoredEvents:
log.T.F(
"Spider sync: end of stored events for relay %s, saved %d/%d events",
relayURL, eventsSaved, eventsCount,
)
return eventsSaved, nil
case ev := <-sub.Events:
if ev == nil {
continue
}
eventsCount++
// Verify the event signature
if ok, err := ev.Verify(); !ok || err != nil {
log.T.F(
"Spider sync: invalid event signature from relay %s",
relayURL,
)
ev.Free()
continue
}
// Save the event to the database
if _, _, err := s.db.SaveEvent(s.ctx, ev); err != nil {
if !strings.HasPrefix(err.Error(), "blocked:") {
log.T.F(
"Spider sync: error saving event from relay %s: %v",
relayURL, err,
)
}
// Event might already exist, which is fine for deduplication
} else {
eventsSaved++
if eventsSaved%10 == 0 {
log.T.F(
"Spider sync: saved %d events from relay %s",
eventsSaved, relayURL,
)
}
}
ev.Free()
}
}
}
// Stop stops the spider functionality
func (s *Spider) Stop() {
log.D.Ln("Stopping spider")
s.cancel()
}

2
pkg/version/version

@ -1 +1 @@
v0.4.9 v0.5.0
Loading…
Cancel
Save