@ -65,6 +65,11 @@ func (c *Client) ConnectToRelay(ctx context.Context, url string) (*nostr.Relay,
// Returns the newest event (highest created_at) if multiple events are found
// Returns the newest event (highest created_at) if multiple events are found
// Rate-limited to prevent overwhelming relays
// Rate-limited to prevent overwhelming relays
func ( c * Client ) FetchEvent ( ctx context . Context , filter nostr . Filter ) ( * nostr . Event , error ) {
func ( c * Client ) FetchEvent ( ctx context . Context , filter nostr . Filter ) ( * nostr . Event , error ) {
return c . FetchEventFromRelays ( ctx , filter , c . relays )
}
// FetchEventFromRelays fetches a single event from specific relays
func ( c * Client ) FetchEventFromRelays ( ctx context . Context , filter nostr . Filter , relays [ ] string ) ( * nostr . Event , error ) {
// Acquire semaphore to limit concurrent requests
// Acquire semaphore to limit concurrent requests
c . requestSem <- struct { } { }
c . requestSem <- struct { } { }
defer func ( ) { <- c . requestSem } ( )
defer func ( ) { <- c . requestSem } ( )
@ -74,16 +79,16 @@ func (c *Client) FetchEvent(ctx context.Context, filter nostr.Filter) (*nostr.Ev
defer cancel ( )
defer cancel ( )
logger . WithFields ( map [ string ] interface { } {
logger . WithFields ( map [ string ] interface { } {
"relays" : c . relays ,
"relays" : relays ,
"kinds" : filter . Kinds ,
"kinds" : filter . Kinds ,
"authors" : filter . Authors ,
"authors" : filter . Authors ,
"ids" : filter . IDs ,
"ids" : filter . IDs ,
"tags" : filter . Tags ,
"tags" : filter . Tags ,
} ) . Debug ( "Querying relays using SimplePool" )
} ) . Debug ( "Querying relays using SimplePool" )
// Use SimplePool's SubManyEose to query all relays in parallel
// Use SimplePool's SubManyEose to query specified relays in parallel
// It automatically handles connection pooling, failover, and deduplication
// It automatically handles connection pooling, failover, and deduplication
eventChan := c . pool . SubManyEose ( queryCtx , c . relays , nostr . Filters { filter } )
eventChan := c . pool . SubManyEose ( queryCtx , relays , nostr . Filters { filter } )
// Collect all events from all relays
// Collect all events from all relays
var allEvents [ ] * nostr . Event
var allEvents [ ] * nostr . Event
@ -121,7 +126,12 @@ func (c *Client) FetchEvent(ctx context.Context, filter nostr.Filter) (*nostr.Ev
// Returns deduplicated events, keeping the newest version of each event
// Returns deduplicated events, keeping the newest version of each event
// Rate-limited to prevent overwhelming relays
// Rate-limited to prevent overwhelming relays
func ( c * Client ) FetchEvents ( ctx context . Context , filter nostr . Filter ) ( [ ] * nostr . Event , error ) {
func ( c * Client ) FetchEvents ( ctx context . Context , filter nostr . Filter ) ( [ ] * nostr . Event , error ) {
return c . FetchEventsBatch ( ctx , [ ] nostr . Filter { filter } )
return c . FetchEventsFromRelays ( ctx , filter , c . relays )
}
// FetchEventsFromRelays fetches multiple events from specific relays
func ( c * Client ) FetchEventsFromRelays ( ctx context . Context , filter nostr . Filter , relays [ ] string ) ( [ ] * nostr . Event , error ) {
return c . FetchEventsBatchFromRelays ( ctx , [ ] nostr . Filter { filter } , relays )
}
}
// FetchEventsBatch fetches multiple events using multiple filters in a single batched query
// FetchEventsBatch fetches multiple events using multiple filters in a single batched query
@ -129,6 +139,11 @@ func (c *Client) FetchEvents(ctx context.Context, filter nostr.Filter) ([]*nostr
// Returns deduplicated events, keeping the newest version of each event
// Returns deduplicated events, keeping the newest version of each event
// Rate-limited to prevent overwhelming relays
// Rate-limited to prevent overwhelming relays
func ( c * Client ) FetchEventsBatch ( ctx context . Context , filters [ ] nostr . Filter ) ( [ ] * nostr . Event , error ) {
func ( c * Client ) FetchEventsBatch ( ctx context . Context , filters [ ] nostr . Filter ) ( [ ] * nostr . Event , error ) {
return c . FetchEventsBatchFromRelays ( ctx , filters , c . relays )
}
// FetchEventsBatchFromRelays fetches multiple events from specific relays using multiple filters
func ( c * Client ) FetchEventsBatchFromRelays ( ctx context . Context , filters [ ] nostr . Filter , relays [ ] string ) ( [ ] * nostr . Event , error ) {
if len ( filters ) == 0 {
if len ( filters ) == 0 {
return nil , fmt . Errorf ( "no filters provided" )
return nil , fmt . Errorf ( "no filters provided" )
}
}
@ -142,13 +157,13 @@ func (c *Client) FetchEventsBatch(ctx context.Context, filters []nostr.Filter) (
defer cancel ( )
defer cancel ( )
logger . WithFields ( map [ string ] interface { } {
logger . WithFields ( map [ string ] interface { } {
"relays" : c . relays ,
"relays" : relays ,
"filters" : len ( filters ) ,
"filters" : len ( filters ) ,
} ) . Debug ( "Querying relays using SimplePool with batched filters" )
} ) . Debug ( "Querying relays using SimplePool with batched filters" )
// Use SimplePool's SubManyEose to query all relays in parallel with all filters
// Use SimplePool's SubManyEose to query specified relays in parallel with all filters
// It automatically handles connection pooling, failover, and deduplication
// It automatically handles connection pooling, failover, and deduplication
eventChan := c . pool . SubManyEose ( queryCtx , c . relays , nostr . Filters ( filters ) )
eventChan := c . pool . SubManyEose ( queryCtx , relays , nostr . Filters ( filters ) )
// Collect all events from all relays, deduplicating by ID and keeping newest
// Collect all events from all relays, deduplicating by ID and keeping newest
eventMap := make ( map [ string ] * nostr . Event )
eventMap := make ( map [ string ] * nostr . Event )
@ -208,6 +223,25 @@ func (c *Client) GetRelays() []string {
return c . relays
return c . relays
}
}
// GetPrimaryRelay returns the primary relay (theforest) for main event fetching
func ( c * Client ) GetPrimaryRelay ( ) string {
if len ( c . relays ) > 0 {
return c . relays [ 0 ]
}
return ""
}
// GetProfileRelays returns fallback relays for profile fetching (excludes primary/theforest)
func ( c * Client ) GetProfileRelays ( ) [ ] string {
profileRelays := [ ] string { }
// Skip the first relay (primary/theforest) and use fallback relays
if len ( c . relays ) > 1 {
profileRelays = append ( profileRelays , c . relays [ 1 : ] ... )
}
// If no fallback relays, return empty (shouldn't happen, but handle gracefully)
return profileRelays
}
// GetPool returns the underlying SimplePool (for services that need direct access)
// GetPool returns the underlying SimplePool (for services that need direct access)
func ( c * Client ) GetPool ( ) * nostr . SimplePool {
func ( c * Client ) GetPool ( ) * nostr . SimplePool {
return c . pool
return c . pool
@ -228,3 +262,382 @@ func (c *Client) HealthCheck(ctx context.Context, timeout time.Duration) error {
_ , err := c . FetchEvents ( ctx , filter )
_ , err := c . FetchEvents ( ctx , filter )
return err
return err
}
}
// FetchDeletionEvents fetches kind 5 deletion events for the given authors
// Returns a map of deleted event IDs (event ID -> deletion event)
func ( c * Client ) FetchDeletionEvents ( ctx context . Context , authors [ ] string ) ( map [ string ] * nostr . Event , error ) {
if len ( authors ) == 0 {
return make ( map [ string ] * nostr . Event ) , nil
}
// Deduplicate authors
authorSet := make ( map [ string ] bool )
uniqueAuthors := make ( [ ] string , 0 , len ( authors ) )
for _ , author := range authors {
if author != "" && ! authorSet [ author ] {
authorSet [ author ] = true
uniqueAuthors = append ( uniqueAuthors , author )
}
}
if len ( uniqueAuthors ) == 0 {
return make ( map [ string ] * nostr . Event ) , nil
}
// Fetch kind 5 deletion events from theforest only (primary relay)
primaryRelay := c . GetPrimaryRelay ( )
if primaryRelay == "" {
return nil , fmt . Errorf ( "primary relay not configured" )
}
filter := nostr . Filter {
Kinds : [ ] int { KindDelete } ,
Authors : uniqueAuthors ,
// No limit - fetch all deletion events
}
logger . WithFields ( map [ string ] interface { } {
"authors" : len ( uniqueAuthors ) ,
} ) . Debug ( "Fetching deletion events" )
deletionEvents , err := c . FetchEventsFromRelays ( ctx , filter , [ ] string { primaryRelay } )
if err != nil {
return nil , fmt . Errorf ( "failed to fetch deletion events: %w" , err )
}
// Parse deletion events - extract event IDs from "e" tags
deletedEventIDs := make ( map [ string ] * nostr . Event )
for _ , deletionEvent := range deletionEvents {
// Kind 5 events have "e" tags with the event IDs they're deleting
for _ , tag := range deletionEvent . Tags {
if len ( tag ) > 0 && tag [ 0 ] == "e" && len ( tag ) > 1 {
eventID := tag [ 1 ]
// Keep the newest deletion event if multiple deletions exist
existing , exists := deletedEventIDs [ eventID ]
if ! exists || deletionEvent . CreatedAt > existing . CreatedAt {
deletedEventIDs [ eventID ] = deletionEvent
}
}
}
}
logger . WithFields ( map [ string ] interface { } {
"deletion_events" : len ( deletionEvents ) ,
"deleted_ids" : len ( deletedEventIDs ) ,
} ) . Debug ( "Parsed deletion events" )
return deletedEventIDs , nil
}
// FilterDeletedEvents removes events that have been deleted (kind 5)
// Returns the filtered list of events
func FilterDeletedEvents ( events [ ] * nostr . Event , deletedEventIDs map [ string ] * nostr . Event ) [ ] * nostr . Event {
if len ( deletedEventIDs ) == 0 {
return events
}
filtered := make ( [ ] * nostr . Event , 0 , len ( events ) )
for _ , event := range events {
if _ , deleted := deletedEventIDs [ event . ID ] ; ! deleted {
filtered = append ( filtered , event )
} else {
logger . WithFields ( map [ string ] interface { } {
"event_id" : event . ID ,
"kind" : event . Kind ,
} ) . Debug ( "Filtering out deleted event" )
}
}
logger . WithFields ( map [ string ] interface { } {
"original" : len ( events ) ,
"filtered" : len ( filtered ) ,
"removed" : len ( events ) - len ( filtered ) ,
} ) . Debug ( "Filtered deleted events" )
return filtered
}
// ProcessEventsWithCacheResult contains the processed events and their profiles
type ProcessEventsWithCacheResult struct {
Events [ ] * nostr . Event
Profiles map [ string ] * Profile
}
// ProcessEventsWithCache is the standard process for fetching and processing events
// for kinds 1, 30023, 30040, 30041, 30818. It:
// 1. If indexEventID is provided, fetches the index event and only queries for events referenced in it
// 2. Fetches 2x the display limit (or all index-referenced events if indexEventID is provided)
// 3. Merges with existing cache map
// 4. Deduplicates (keeping newest)
// 5. Fetches deletion events
// 6. Removes deleted events
// 7. Sorts newest-first
// 8. Applies display limit
// 9. Fetches profiles for displayed events
// 10. Returns final events and profiles ready for display
func ( c * Client ) ProcessEventsWithCache (
ctx context . Context ,
kind int ,
displayLimit int ,
existingEvents map [ string ] * nostr . Event , // Existing cache map (event ID -> event)
relayURL string , // Relay to fetch from (empty = use primary relay)
indexEventID string , // Optional: index event ID to fetch and filter by
indexKind int , // Optional: kind of the index event (required if indexEventID is provided)
) ( * ProcessEventsWithCacheResult , error ) {
// Calculate fetch limit (2x display limit, minimum 50)
fetchLimit := displayLimit * 2
if fetchLimit < 50 {
fetchLimit = 50
}
// Determine which relay to use
relays := [ ] string { }
if relayURL != "" {
relays = [ ] string { relayURL }
} else {
primaryRelay := c . GetPrimaryRelay ( )
if primaryRelay == "" {
return nil , fmt . Errorf ( "primary relay not configured" )
}
relays = [ ] string { primaryRelay }
}
var fetchedEvents [ ] * nostr . Event
var err error
var indexItems [ ] IndexItem
// Step 1: If indexEventID is provided, fetch the index event and extract referenced items
if indexEventID != "" {
// Fetch the index event
indexFilter := nostr . Filter {
IDs : [ ] string { indexEventID } ,
}
indexEvents , err := c . FetchEventsFromRelays ( ctx , indexFilter , relays )
if err != nil {
return nil , fmt . Errorf ( "failed to fetch index event: %w" , err )
}
if len ( indexEvents ) == 0 {
return nil , fmt . Errorf ( "index event not found: %s" , indexEventID )
}
// Parse the index event
index , err := ParseIndexEvent ( indexEvents [ 0 ] , indexKind )
if err != nil {
return nil , fmt . Errorf ( "failed to parse index event: %w" , err )
}
// Extract items of the target kind from the index
indexItems = make ( [ ] IndexItem , 0 )
for _ , item := range index . Items {
if item . Kind == kind {
indexItems = append ( indexItems , item )
}
}
if len ( indexItems ) == 0 {
// No items of this kind in the index, return empty result
return & ProcessEventsWithCacheResult {
Events : [ ] * nostr . Event { } ,
Profiles : make ( map [ string ] * Profile ) ,
} , nil
}
logger . WithFields ( map [ string ] interface { } {
"kind" : kind ,
"index_items" : len ( indexItems ) ,
"index_event_id" : indexEventID ,
} ) . Debug ( "Fetched index event, querying referenced events" )
// Build filters for events referenced in the index
// Group by author to create efficient filters
authorDTags := make ( map [ string ] [ ] string ) // author -> list of d tags
for _ , item := range indexItems {
if item . Pubkey != "" && item . DTag != "" {
authorDTags [ item . Pubkey ] = append ( authorDTags [ item . Pubkey ] , item . DTag )
}
}
// Fetch events for each author (query by kind, author, and d tags)
allFetchedEvents := make ( [ ] * nostr . Event , 0 )
for author , dTags := range authorDTags {
// Query for events by this author with any of the d tags
// Note: Nostr filters don't support OR for d tags, so we query each d tag separately
// or query all events by author and filter locally
filter := nostr . Filter {
Kinds : [ ] int { kind } ,
Authors : [ ] string { author } ,
// We'll filter by d tags locally after fetching
}
authorEvents , err := c . FetchEventsFromRelays ( ctx , filter , relays )
if err != nil {
logger . WithFields ( map [ string ] interface { } {
"author" : author ,
"error" : err ,
} ) . Warn ( "Failed to fetch events for author, continuing" )
continue
}
// Filter by d tags locally
dTagSet := make ( map [ string ] bool )
for _ , dTag := range dTags {
dTagSet [ dTag ] = true
}
for _ , event := range authorEvents {
// Extract d tag from event
var eventDTag string
for _ , tag := range event . Tags {
if len ( tag ) > 0 && tag [ 0 ] == "d" && len ( tag ) > 1 {
eventDTag = tag [ 1 ]
break
}
}
if eventDTag != "" && dTagSet [ eventDTag ] {
allFetchedEvents = append ( allFetchedEvents , event )
}
}
}
fetchedEvents = allFetchedEvents
logger . WithFields ( map [ string ] interface { } {
"kind" : kind ,
"fetched" : len ( fetchedEvents ) ,
"index_items" : len ( indexItems ) ,
"index_event_id" : indexEventID ,
} ) . Debug ( "Fetched events from index" )
} else {
// Step 1: Fetch 2x display limit (standard process)
filter := nostr . Filter {
Kinds : [ ] int { kind } ,
Limit : fetchLimit ,
}
logger . WithFields ( map [ string ] interface { } {
"kind" : kind ,
"fetch_limit" : fetchLimit ,
"display_limit" : displayLimit ,
"relay" : relays [ 0 ] ,
} ) . Debug ( "Fetching events for processing" )
// Use client's FetchEventsFromRelays (works with both specific relay and primary)
fetchedEvents , err = c . FetchEventsFromRelays ( ctx , filter , relays )
if err != nil {
return nil , fmt . Errorf ( "failed to fetch events: %w" , err )
}
}
logger . WithFields ( map [ string ] interface { } {
"kind" : kind ,
"fetched" : len ( fetchedEvents ) ,
} ) . Debug ( "Fetched events from relay" )
// Step 2: Merge with existing cache map (existing takes precedence if same ID)
eventMap := make ( map [ string ] * nostr . Event )
// Add existing events first
for id , event := range existingEvents {
eventMap [ id ] = event
}
// Add/update with fetched events (keep newest if duplicate)
for _ , event := range fetchedEvents {
existing , exists := eventMap [ event . ID ]
if ! exists || event . CreatedAt > existing . CreatedAt {
eventMap [ event . ID ] = event
}
}
logger . WithFields ( map [ string ] interface { } {
"existing" : len ( existingEvents ) ,
"fetched" : len ( fetchedEvents ) ,
"merged" : len ( eventMap ) ,
} ) . Debug ( "Merged events with cache" )
// Step 3: Convert map to slice (already deduplicated)
allEvents := make ( [ ] * nostr . Event , 0 , len ( eventMap ) )
for _ , event := range eventMap {
allEvents = append ( allEvents , event )
}
// Step 4: Fetch deletion events for all authors
authors := make ( [ ] string , 0 , len ( allEvents ) )
authorSet := make ( map [ string ] bool )
for _ , event := range allEvents {
if ! authorSet [ event . PubKey ] {
authors = append ( authors , event . PubKey )
authorSet [ event . PubKey ] = true
}
}
deletedEventIDs , err := c . FetchDeletionEvents ( ctx , authors )
if err != nil {
logger . WithField ( "error" , err ) . Warn ( "Failed to fetch deletion events, continuing without filtering" )
deletedEventIDs = make ( map [ string ] * nostr . Event )
}
// Step 5: Remove deleted events
allEvents = FilterDeletedEvents ( allEvents , deletedEventIDs )
// Step 6: Sort newest-first (by created_at descending)
for i := 0 ; i < len ( allEvents ) - 1 ; i ++ {
for j := i + 1 ; j < len ( allEvents ) ; j ++ {
if allEvents [ i ] . CreatedAt < allEvents [ j ] . CreatedAt {
allEvents [ i ] , allEvents [ j ] = allEvents [ j ] , allEvents [ i ]
}
}
}
logger . WithFields ( map [ string ] interface { } {
"after_deletion" : len ( allEvents ) ,
"sorted" : true ,
} ) . Debug ( "Removed deletions and sorted events" )
// Step 7: Apply display limit
if displayLimit > 0 && len ( allEvents ) > displayLimit {
allEvents = allEvents [ : displayLimit ]
logger . WithFields ( map [ string ] interface { } {
"limited" : displayLimit ,
} ) . Debug ( "Applied display limit" )
}
// Ensure we always show the display limit if we have enough events
if len ( allEvents ) < displayLimit && len ( allEvents ) > 0 {
logger . WithFields ( map [ string ] interface { } {
"available" : len ( allEvents ) ,
"requested" : displayLimit ,
} ) . Debug ( "Fewer events available than display limit" )
}
logger . WithFields ( map [ string ] interface { } {
"kind" : kind ,
"final_count" : len ( allEvents ) ,
"display_limit" : displayLimit ,
} ) . Info ( "Processed events with cache" )
// Step 8: Fetch profiles for displayed events
profileAuthors := make ( [ ] string , 0 , len ( allEvents ) )
profileAuthorSet := make ( map [ string ] bool )
for _ , event := range allEvents {
if ! profileAuthorSet [ event . PubKey ] {
profileAuthors = append ( profileAuthors , event . PubKey )
profileAuthorSet [ event . PubKey ] = true
}
}
profiles , err := c . FetchProfilesBatch ( ctx , profileAuthors )
if err != nil {
logger . WithField ( "error" , err ) . Warn ( "Failed to fetch profiles, continuing without profiles" )
profiles = make ( map [ string ] * Profile )
}
logger . WithFields ( map [ string ] interface { } {
"profiles_fetched" : len ( profiles ) ,
"authors" : len ( authors ) ,
} ) . Debug ( "Fetched profiles for displayed events" )
return & ProcessEventsWithCacheResult {
Events : allEvents ,
Profiles : profiles ,
} , nil
}