diff --git a/src/Service/NostrClient.php b/src/Service/NostrClient.php index 436e443..e14453b 100644 --- a/src/Service/NostrClient.php +++ b/src/Service/NostrClient.php @@ -44,22 +44,31 @@ class NostrClient private readonly ArticleFactory $articleFactory, private readonly TokenStorageInterface $tokenStorage, private readonly LoggerInterface $logger, - private readonly CacheItemPoolInterface $npubCache) + private readonly CacheItemPoolInterface $npubCache, + private readonly NostrRelayPool $relayPool) { + // Initialize default relay set using the relay pool to avoid duplicate connections + $defaultRelayUrls = [ + 'wss://theforest.nostr1.com', + 'wss://nostr.land', + 'wss://relay.primal.net' + ]; $this->defaultRelaySet = new RelaySet(); - $this->defaultRelaySet->addRelay(new Relay('wss://theforest.nostr1.com')); // public aggregator relay - $this->defaultRelaySet->addRelay(new Relay('wss://nostr.land')); // aggregator relay, has AUTH - $this->defaultRelaySet->addRelay(new Relay('wss://relay.primal.net')); // profile aggregator + foreach ($defaultRelayUrls as $url) { + $this->defaultRelaySet->addRelay($this->relayPool->getRelay($url)); + } } /** - * Creates a RelaySet from a list of relay URLs + * Creates a RelaySet from a list of relay URLs using the connection pool */ private function createRelaySet(array $relayUrls): RelaySet { - $relaySet = $this->defaultRelaySet; + $relaySet = new RelaySet(); foreach ($relayUrls as $relayUrl) { - $relaySet->addRelay(new Relay($relayUrl)); + // Use the pool to get persistent relay connections + $relay = $this->relayPool->getRelay($relayUrl); + $relaySet->addRelay($relay); } return $relaySet; } @@ -107,9 +116,15 @@ class NostrClient */ public function getPubkeyMetadata($pubkey): \stdClass { - $relaySet = $this->defaultRelaySet; - // $relaySet->addRelay(new Relay('wss://profiles.nostr1.com')); // profile aggregator - $relaySet->addRelay(new Relay('wss://purplepag.es')); // profile aggregator + // Use relay pool for all relays including purplepag.es + $relayUrls = [ + 'wss://theforest.nostr1.com', + 'wss://nostr.land', + 'wss://relay.primal.net', + 'wss://purplepag.es' + ]; + $relaySet = $this->createRelaySet($relayUrls); + $this->logger->info('Getting metadata for pubkey ' . $pubkey ); $request = $this->createNostrRequest( kinds: [KindsEnum::METADATA], @@ -142,11 +157,8 @@ class NostrClient $relays = $this->getTopReputableRelaysForAuthor($key->convertPublicKeyToBech32($event->getPublicKey()), 5); } - $relaySet = new RelaySet(); - foreach ($relays as $relayWss) { - $relay = new Relay($relayWss); - $relaySet->addRelay($relay); - } + // Use relay pool instead of creating new Relay instances + $relaySet = $this->createRelaySet($relays); $relaySet->setMessage($eventMessage); // TODO handle responses appropriately return $relaySet->send(); @@ -470,32 +482,38 @@ class NostrClient $kind = (int)$parts[0]; $pubkey = $parts[1]; $identifier = end($parts); + // Get relays for the author $authorRelays = $this->getTopReputableRelaysForAuthor($pubkey); - // Turn into a relaySet - $relaySet = $this->createRelaySet($authorRelays); - // filters - $filters = [ - 'tag' => ['#A', [$coordinate]], // #A means root event - ]; + // Build the request message + $subscription = new Subscription(); + $subscriptionId = $subscription->setId(); + $filter = new Filter(); + $filter->setKinds([ + KindsEnum::COMMENTS->value, + // KindsEnum::ZAP_RECEIPT->value // Not yet + ]); + $filter->setTag('#A', [$coordinate]); + if (is_int($since) && $since > 0) { - $filters['since'] = $since; + $filter->setSince($since); } - // Create request using the helper method - $request = $this->createNostrRequest( - kinds: [ - KindsEnum::COMMENTS->value, - // KindsEnum::ZAP_RECEIPT->value // Not yet - ], - filters: $filters, - relaySet: $relaySet + $requestMessage = new RequestMessage($subscriptionId, [$filter]); + + // Use the relay pool to send the request + // Pass subscription ID so the pool can send CLOSE message after receiving responses + $responses = $this->relayPool->sendToRelays( + $authorRelays, + fn() => $requestMessage, + 30, // timeout in seconds + $subscriptionId // close subscription after receiving responses ); // Process the response and deduplicate by eventId $uniqueEvents = []; - $this->processResponse($request->send(), function($event) use (&$uniqueEvents, $pubkey) { + $this->processResponse($responses, function($event) use (&$uniqueEvents) { $this->logger->debug('Received comment event', ['event_id' => $event->id]); $uniqueEvents[$event->id] = $event; return null; @@ -725,9 +743,9 @@ class NostrClient public function getMediaEventsByHashtags(array $hashtags): array { $allEvents = []; - $relayset = new RelaySet(); - $relayset->addRelay(new Relay('wss://theforest.nostr1.com')); - $relayset->addRelay(new Relay('wss://relay.nostr.band')); + // Use relay pool for media relays + $relayUrls = ['wss://theforest.nostr1.com', 'wss://relay.nostr.band']; + $relayset = $this->createRelaySet($relayUrls); // Fetch events for each hashtag foreach ($hashtags as $hashtag) { diff --git a/src/Service/NostrRelayPool.php b/src/Service/NostrRelayPool.php new file mode 100644 index 0000000..7d0729a --- /dev/null +++ b/src/Service/NostrRelayPool.php @@ -0,0 +1,310 @@ + Map of relay URLs to Relay instances */ + private array $relays = []; + + /** @var array Track connection attempts for backoff */ + private array $connectionAttempts = []; + + /** @var array Track last connection time */ + private array $lastConnected = []; + + private const MAX_RETRIES = 3; + private const RETRY_DELAY = 5; // seconds + private const CONNECTION_TIMEOUT = 30; // seconds + private const KEEPALIVE_INTERVAL = 60; // seconds + + public function __construct( + private readonly LoggerInterface $logger, + private readonly array $defaultRelays = [ + 'wss://theforest.nostr1.com', + 'wss://nostr.land', + 'wss://relay.primal.net', + ] + ) {} + + /** + * Normalize relay URL to ensure consistency + */ + private function normalizeRelayUrl(string $url): string + { + $url = trim($url); + // Remove trailing slash for consistency + $url = rtrim($url, '/'); + return $url; + } + + /** + * Get or create a relay connection + */ + public function getRelay(string $relayUrl): Relay + { + // Normalize the URL to avoid duplicates (e.g., with/without trailing slash) + $relayUrl = $this->normalizeRelayUrl($relayUrl); + + if (!isset($this->relays[$relayUrl])) { + $this->logger->debug('Creating new relay connection', ['relay' => $relayUrl]); + $relay = new Relay($relayUrl); + $this->relays[$relayUrl] = $relay; + $this->connectionAttempts[$relayUrl] = 0; + $this->lastConnected[$relayUrl] = time(); + } else { + // Check if connection might be stale (older than keepalive interval) + $age = time() - ($this->lastConnected[$relayUrl] ?? 0); + if ($age > self::KEEPALIVE_INTERVAL) { + $this->logger->debug('Relay connection might be stale, will validate', [ + 'relay' => $relayUrl, + 'age' => $age + ]); + // The nostr library handles reconnection internally, so we just update timestamp + $this->lastConnected[$relayUrl] = time(); + } + } + + return $this->relays[$relayUrl]; + } + + /** + * Get multiple relay connections + * + * @param array $relayUrls + * @return array + */ + public function getRelays(array $relayUrls): array + { + $relays = []; + foreach ($relayUrls as $url) { + try { + $relays[] = $this->getRelay($url); + } catch (\Throwable $e) { + $this->logger->warning('Failed to get relay connection', [ + 'relay' => $url, + 'error' => $e->getMessage() + ]); + } + } + return $relays; + } + + /** + * Send a request to multiple relays and collect responses + * + * @param array $relayUrls Array of relay URLs to query + * @param callable $messageBuilder Function that builds the message to send + * @param int|null $timeout Optional timeout in seconds + * @param string|null $subscriptionId Optional subscription ID to close after receiving responses + * @return array Responses from all relays + */ + public function sendToRelays(array $relayUrls, callable $messageBuilder, ?int $timeout = null, ?string $subscriptionId = null): array + { + $timeout = $timeout ?? self::CONNECTION_TIMEOUT; + $relays = $this->getRelays($relayUrls); + + if (empty($relays)) { + $this->logger->error('No relays available for request'); + return []; + } + + $responses = []; + foreach ($relays as $relay) { + $relayResponses = []; + try { + $message = $messageBuilder(); + $payload = $message->generate(); + + $this->logger->debug('Sending request to relay', [ + 'relay' => $relay->getUrl(), + 'subscription_id' => $subscriptionId + ]); + + // Ensure relay is connected + if (!$relay->isConnected()) { + $relay->connect(); + } + + $client = $relay->getClient(); + $client->setTimeout(15); // 15 seconds timeout for receiving + + // Send the request + $client->text($payload); + + // Receive and parse responses + while ($resp = $client->receive()) { + // Handle PING/PONG + if ($resp instanceof \WebSocket\Message\Ping) { + $client->text((new \WebSocket\Message\Pong())->getPayload()); + continue; + } + + if (!$resp instanceof \WebSocket\Message\Text) { + continue; + } + + $decoded = json_decode($resp->getContent()); + if (!$decoded) { + $this->logger->debug('Failed to decode response from relay', [ + 'relay' => $relay->getUrl(), + 'content' => $resp->getContent() + ]); + continue; + } + + $relayResponse = \swentel\nostr\RelayResponse\RelayResponse::create($decoded); + $relayResponses[] = $relayResponse; + + // Check for EOSE (End of Stored Events) or CLOSED + if (in_array($relayResponse->type, ['EOSE', 'CLOSED'])) { + $this->logger->debug('Received EOSE/CLOSED from relay', [ + 'relay' => $relay->getUrl(), + 'type' => $relayResponse->type + ]); + break; + } + } + + // Key responses by relay URL so processResponse can identify which relay they came from + $responses[$relay->getUrl()] = $relayResponses; + + // If subscription ID provided, send CLOSE message to clean up + if ($subscriptionId !== null) { + try { + $closeMessage = new \swentel\nostr\Message\CloseMessage($subscriptionId); + $client->text($closeMessage->generate()); + $this->logger->debug('Sent CLOSE message to relay', [ + 'relay' => $relay->getUrl(), + 'subscription_id' => $subscriptionId + ]); + } catch (\Throwable $closeError) { + $this->logger->warning('Failed to send CLOSE message', [ + 'relay' => $relay->getUrl(), + 'subscription_id' => $subscriptionId, + 'error' => $closeError->getMessage() + ]); + } + } + + // Update last connected time on successful send + $this->lastConnected[$relay->getUrl()] = time(); + + } catch (\WebSocket\TimeoutException $e) { + // Timeout is normal - relay has sent all events + $this->logger->debug('Relay timeout (normal - all events received)', [ + 'relay' => $relay->getUrl() + ]); + $responses[$relay->getUrl()] = $relayResponses; + $this->lastConnected[$relay->getUrl()] = time(); + + } catch (\Throwable $e) { + $this->logger->error('Error sending to relay', [ + 'relay' => $relay->getUrl(), + 'error' => $e->getMessage(), + 'class' => get_class($e) + ]); + + // Track failed attempts + $url = $relay->getUrl(); + $this->connectionAttempts[$url] = ($this->connectionAttempts[$url] ?? 0) + 1; + + // If too many failures, remove from pool + if ($this->connectionAttempts[$url] >= self::MAX_RETRIES) { + $this->logger->warning('Removing relay from pool due to repeated failures', [ + 'relay' => $url, + 'attempts' => $this->connectionAttempts[$url] + ]); + unset($this->relays[$url]); + } + } + } + + return $responses; + } + + /** + * Close a specific relay connection + */ + public function closeRelay(string $relayUrl): void + { + if (isset($this->relays[$relayUrl])) { + $this->logger->debug('Closing relay connection', ['relay' => $relayUrl]); + unset($this->relays[$relayUrl]); + unset($this->connectionAttempts[$relayUrl]); + unset($this->lastConnected[$relayUrl]); + } + } + + /** + * Close all relay connections + */ + public function closeAll(): void + { + $this->logger->info('Closing all relay connections', [ + 'count' => count($this->relays) + ]); + + $this->relays = []; + $this->connectionAttempts = []; + $this->lastConnected = []; + } + + /** + * Get statistics about the connection pool + */ + public function getStats(): array + { + return [ + 'active_connections' => count($this->relays), + 'relays' => array_map(function($url) { + return [ + 'url' => $url, + 'attempts' => $this->connectionAttempts[$url] ?? 0, + 'last_connected' => $this->lastConnected[$url] ?? null, + 'age' => time() - ($this->lastConnected[$url] ?? time()), + ]; + }, array_keys($this->relays)) + ]; + } + + /** + * Cleanup stale connections (call this periodically) + */ + public function cleanupStaleConnections(int $maxAge = 300): int + { + $now = time(); + $cleaned = 0; + + foreach ($this->relays as $url => $relay) { + $age = $now - ($this->lastConnected[$url] ?? 0); + if ($age > $maxAge) { + $this->logger->info('Removing stale relay connection', [ + 'relay' => $url, + 'age' => $age + ]); + $this->closeRelay($url); + $cleaned++; + } + } + + return $cleaned; + } + + /** + * Get default relays + */ + public function getDefaultRelays(): array + { + return $this->defaultRelays; + } +} + diff --git a/src/Twig/Components/Organisms/Comments.php b/src/Twig/Components/Organisms/Comments.php index a1578ad..155ab58 100644 --- a/src/Twig/Components/Organisms/Comments.php +++ b/src/Twig/Components/Organisms/Comments.php @@ -84,19 +84,21 @@ final class Comments { foreach ($this->list as $comment) { $content = $comment['content'] ?? ''; - if (empty($content)) { + $commentId = $comment['id'] ?? null; + + if (empty($content) || empty($commentId)) { continue; } // Store the original content - $this->processedContent[$comment->id] = $content; + $this->processedContent[$commentId] = $content; // Parse the content for Nostr links $links = $this->nostrLinkParser->parseLinks($content); if (!empty($links)) { // Save the links for the client-side to fetch - $this->commentLinks[$comment->id] = $links; + $this->commentLinks[$commentId] = $links; } } } @@ -106,9 +108,16 @@ final class Comments */ public function parseZaps(): void { - foreach ($this->list as $comment) { + // Use reference (&$comment) so changes to content are saved back to $this->list + foreach ($this->list as &$comment) { + $commentId = $comment['id'] ?? null; + + if (empty($commentId)) { + continue; + } + // check if kind is 9735 to get zaps - if ($comment['kind'] !== 9735) { + if (($comment['kind'] ?? null) !== 9735) { continue; } @@ -125,15 +134,15 @@ final class Comments } $description = json_decode($descriptionJson); - // 2) If description has content, add it to the comment + // 2) If description has content, add it to the comment (now this actually saves!) if (!empty($description->content)) { - $comment->content = $description->content; + $comment['content'] = $description->content; } // 3) Get amount: prefer explicit 'amount' tag (msat), fallback to BOLT11 invoice parsing $amountSats = null; - $amountMsatStr = $this->findTagValue($description->tags, 'amount'); + $amountMsatStr = $this->findTagValue($description->tags ?? [], 'amount'); if (is_numeric($amountMsatStr)) { // amount in millisats per NIP-57 $msats = (int) $amountMsatStr; @@ -149,11 +158,13 @@ final class Comments } } - $this->zappers[$comment->id] = $this->findTagValue($tags, 'P'); + $this->zappers[$commentId] = $this->findTagValue($tags, 'P'); if ($amountSats !== null && $amountSats > 0) { - $this->zapAmounts[$comment->id] = $amountSats; + $this->zapAmounts[$commentId] = $amountSats; } } + // Unset reference to avoid accidental modification later + unset($comment); } // --- Helpers ---