Browse Source

Supposedly better relay comms

imwald
Nuša Pukšič 2 months ago
parent
commit
402c7b98bc
  1. 86
      src/Service/NostrClient.php
  2. 310
      src/Service/NostrRelayPool.php
  3. 31
      src/Twig/Components/Organisms/Comments.php

86
src/Service/NostrClient.php

@ -44,22 +44,31 @@ class NostrClient
private readonly ArticleFactory $articleFactory, private readonly ArticleFactory $articleFactory,
private readonly TokenStorageInterface $tokenStorage, private readonly TokenStorageInterface $tokenStorage,
private readonly LoggerInterface $logger, private readonly LoggerInterface $logger,
private readonly CacheItemPoolInterface $npubCache) private readonly CacheItemPoolInterface $npubCache,
private readonly NostrRelayPool $relayPool)
{ {
// Initialize default relay set using the relay pool to avoid duplicate connections
$defaultRelayUrls = [
'wss://theforest.nostr1.com',
'wss://nostr.land',
'wss://relay.primal.net'
];
$this->defaultRelaySet = new RelaySet(); $this->defaultRelaySet = new RelaySet();
$this->defaultRelaySet->addRelay(new Relay('wss://theforest.nostr1.com')); // public aggregator relay foreach ($defaultRelayUrls as $url) {
$this->defaultRelaySet->addRelay(new Relay('wss://nostr.land')); // aggregator relay, has AUTH $this->defaultRelaySet->addRelay($this->relayPool->getRelay($url));
$this->defaultRelaySet->addRelay(new Relay('wss://relay.primal.net')); // profile aggregator }
} }
/** /**
* Creates a RelaySet from a list of relay URLs * Creates a RelaySet from a list of relay URLs using the connection pool
*/ */
private function createRelaySet(array $relayUrls): RelaySet private function createRelaySet(array $relayUrls): RelaySet
{ {
$relaySet = $this->defaultRelaySet; $relaySet = new RelaySet();
foreach ($relayUrls as $relayUrl) { foreach ($relayUrls as $relayUrl) {
$relaySet->addRelay(new Relay($relayUrl)); // Use the pool to get persistent relay connections
$relay = $this->relayPool->getRelay($relayUrl);
$relaySet->addRelay($relay);
} }
return $relaySet; return $relaySet;
} }
@ -107,9 +116,15 @@ class NostrClient
*/ */
public function getPubkeyMetadata($pubkey): \stdClass public function getPubkeyMetadata($pubkey): \stdClass
{ {
$relaySet = $this->defaultRelaySet; // Use relay pool for all relays including purplepag.es
// $relaySet->addRelay(new Relay('wss://profiles.nostr1.com')); // profile aggregator $relayUrls = [
$relaySet->addRelay(new Relay('wss://purplepag.es')); // profile aggregator 'wss://theforest.nostr1.com',
'wss://nostr.land',
'wss://relay.primal.net',
'wss://purplepag.es'
];
$relaySet = $this->createRelaySet($relayUrls);
$this->logger->info('Getting metadata for pubkey ' . $pubkey ); $this->logger->info('Getting metadata for pubkey ' . $pubkey );
$request = $this->createNostrRequest( $request = $this->createNostrRequest(
kinds: [KindsEnum::METADATA], kinds: [KindsEnum::METADATA],
@ -142,11 +157,8 @@ class NostrClient
$relays = $this->getTopReputableRelaysForAuthor($key->convertPublicKeyToBech32($event->getPublicKey()), 5); $relays = $this->getTopReputableRelaysForAuthor($key->convertPublicKeyToBech32($event->getPublicKey()), 5);
} }
$relaySet = new RelaySet(); // Use relay pool instead of creating new Relay instances
foreach ($relays as $relayWss) { $relaySet = $this->createRelaySet($relays);
$relay = new Relay($relayWss);
$relaySet->addRelay($relay);
}
$relaySet->setMessage($eventMessage); $relaySet->setMessage($eventMessage);
// TODO handle responses appropriately // TODO handle responses appropriately
return $relaySet->send(); return $relaySet->send();
@ -470,32 +482,38 @@ class NostrClient
$kind = (int)$parts[0]; $kind = (int)$parts[0];
$pubkey = $parts[1]; $pubkey = $parts[1];
$identifier = end($parts); $identifier = end($parts);
// Get relays for the author // Get relays for the author
$authorRelays = $this->getTopReputableRelaysForAuthor($pubkey); $authorRelays = $this->getTopReputableRelaysForAuthor($pubkey);
// Turn into a relaySet
$relaySet = $this->createRelaySet($authorRelays);
// filters // Build the request message
$filters = [ $subscription = new Subscription();
'tag' => ['#A', [$coordinate]], // #A means root event $subscriptionId = $subscription->setId();
]; $filter = new Filter();
$filter->setKinds([
KindsEnum::COMMENTS->value,
// KindsEnum::ZAP_RECEIPT->value // Not yet
]);
$filter->setTag('#A', [$coordinate]);
if (is_int($since) && $since > 0) { if (is_int($since) && $since > 0) {
$filters['since'] = $since; $filter->setSince($since);
} }
// Create request using the helper method $requestMessage = new RequestMessage($subscriptionId, [$filter]);
$request = $this->createNostrRequest(
kinds: [ // Use the relay pool to send the request
KindsEnum::COMMENTS->value, // Pass subscription ID so the pool can send CLOSE message after receiving responses
// KindsEnum::ZAP_RECEIPT->value // Not yet $responses = $this->relayPool->sendToRelays(
], $authorRelays,
filters: $filters, fn() => $requestMessage,
relaySet: $relaySet 30, // timeout in seconds
$subscriptionId // close subscription after receiving responses
); );
// Process the response and deduplicate by eventId // Process the response and deduplicate by eventId
$uniqueEvents = []; $uniqueEvents = [];
$this->processResponse($request->send(), function($event) use (&$uniqueEvents, $pubkey) { $this->processResponse($responses, function($event) use (&$uniqueEvents) {
$this->logger->debug('Received comment event', ['event_id' => $event->id]); $this->logger->debug('Received comment event', ['event_id' => $event->id]);
$uniqueEvents[$event->id] = $event; $uniqueEvents[$event->id] = $event;
return null; return null;
@ -725,9 +743,9 @@ class NostrClient
public function getMediaEventsByHashtags(array $hashtags): array public function getMediaEventsByHashtags(array $hashtags): array
{ {
$allEvents = []; $allEvents = [];
$relayset = new RelaySet(); // Use relay pool for media relays
$relayset->addRelay(new Relay('wss://theforest.nostr1.com')); $relayUrls = ['wss://theforest.nostr1.com', 'wss://relay.nostr.band'];
$relayset->addRelay(new Relay('wss://relay.nostr.band')); $relayset = $this->createRelaySet($relayUrls);
// Fetch events for each hashtag // Fetch events for each hashtag
foreach ($hashtags as $hashtag) { foreach ($hashtags as $hashtag) {

310
src/Service/NostrRelayPool.php

@ -0,0 +1,310 @@
<?php
namespace App\Service;
use Psr\Log\LoggerInterface;
use swentel\nostr\Relay\Relay;
use swentel\nostr\RelayResponse\RelayResponse;
/**
* Manages persistent WebSocket connections to Nostr relays
* Keeps connections alive across multiple requests to avoid reconnection overhead
*/
class NostrRelayPool
{
/** @var array<string, Relay> Map of relay URLs to Relay instances */
private array $relays = [];
/** @var array<string, int> Track connection attempts for backoff */
private array $connectionAttempts = [];
/** @var array<string, int> Track last connection time */
private array $lastConnected = [];
private const MAX_RETRIES = 3;
private const RETRY_DELAY = 5; // seconds
private const CONNECTION_TIMEOUT = 30; // seconds
private const KEEPALIVE_INTERVAL = 60; // seconds
public function __construct(
private readonly LoggerInterface $logger,
private readonly array $defaultRelays = [
'wss://theforest.nostr1.com',
'wss://nostr.land',
'wss://relay.primal.net',
]
) {}
/**
* Normalize relay URL to ensure consistency
*/
private function normalizeRelayUrl(string $url): string
{
$url = trim($url);
// Remove trailing slash for consistency
$url = rtrim($url, '/');
return $url;
}
/**
* Get or create a relay connection
*/
public function getRelay(string $relayUrl): Relay
{
// Normalize the URL to avoid duplicates (e.g., with/without trailing slash)
$relayUrl = $this->normalizeRelayUrl($relayUrl);
if (!isset($this->relays[$relayUrl])) {
$this->logger->debug('Creating new relay connection', ['relay' => $relayUrl]);
$relay = new Relay($relayUrl);
$this->relays[$relayUrl] = $relay;
$this->connectionAttempts[$relayUrl] = 0;
$this->lastConnected[$relayUrl] = time();
} else {
// Check if connection might be stale (older than keepalive interval)
$age = time() - ($this->lastConnected[$relayUrl] ?? 0);
if ($age > self::KEEPALIVE_INTERVAL) {
$this->logger->debug('Relay connection might be stale, will validate', [
'relay' => $relayUrl,
'age' => $age
]);
// The nostr library handles reconnection internally, so we just update timestamp
$this->lastConnected[$relayUrl] = time();
}
}
return $this->relays[$relayUrl];
}
/**
* Get multiple relay connections
*
* @param array $relayUrls
* @return array<Relay>
*/
public function getRelays(array $relayUrls): array
{
$relays = [];
foreach ($relayUrls as $url) {
try {
$relays[] = $this->getRelay($url);
} catch (\Throwable $e) {
$this->logger->warning('Failed to get relay connection', [
'relay' => $url,
'error' => $e->getMessage()
]);
}
}
return $relays;
}
/**
* Send a request to multiple relays and collect responses
*
* @param array $relayUrls Array of relay URLs to query
* @param callable $messageBuilder Function that builds the message to send
* @param int|null $timeout Optional timeout in seconds
* @param string|null $subscriptionId Optional subscription ID to close after receiving responses
* @return array Responses from all relays
*/
public function sendToRelays(array $relayUrls, callable $messageBuilder, ?int $timeout = null, ?string $subscriptionId = null): array
{
$timeout = $timeout ?? self::CONNECTION_TIMEOUT;
$relays = $this->getRelays($relayUrls);
if (empty($relays)) {
$this->logger->error('No relays available for request');
return [];
}
$responses = [];
foreach ($relays as $relay) {
$relayResponses = [];
try {
$message = $messageBuilder();
$payload = $message->generate();
$this->logger->debug('Sending request to relay', [
'relay' => $relay->getUrl(),
'subscription_id' => $subscriptionId
]);
// Ensure relay is connected
if (!$relay->isConnected()) {
$relay->connect();
}
$client = $relay->getClient();
$client->setTimeout(15); // 15 seconds timeout for receiving
// Send the request
$client->text($payload);
// Receive and parse responses
while ($resp = $client->receive()) {
// Handle PING/PONG
if ($resp instanceof \WebSocket\Message\Ping) {
$client->text((new \WebSocket\Message\Pong())->getPayload());
continue;
}
if (!$resp instanceof \WebSocket\Message\Text) {
continue;
}
$decoded = json_decode($resp->getContent());
if (!$decoded) {
$this->logger->debug('Failed to decode response from relay', [
'relay' => $relay->getUrl(),
'content' => $resp->getContent()
]);
continue;
}
$relayResponse = \swentel\nostr\RelayResponse\RelayResponse::create($decoded);
$relayResponses[] = $relayResponse;
// Check for EOSE (End of Stored Events) or CLOSED
if (in_array($relayResponse->type, ['EOSE', 'CLOSED'])) {
$this->logger->debug('Received EOSE/CLOSED from relay', [
'relay' => $relay->getUrl(),
'type' => $relayResponse->type
]);
break;
}
}
// Key responses by relay URL so processResponse can identify which relay they came from
$responses[$relay->getUrl()] = $relayResponses;
// If subscription ID provided, send CLOSE message to clean up
if ($subscriptionId !== null) {
try {
$closeMessage = new \swentel\nostr\Message\CloseMessage($subscriptionId);
$client->text($closeMessage->generate());
$this->logger->debug('Sent CLOSE message to relay', [
'relay' => $relay->getUrl(),
'subscription_id' => $subscriptionId
]);
} catch (\Throwable $closeError) {
$this->logger->warning('Failed to send CLOSE message', [
'relay' => $relay->getUrl(),
'subscription_id' => $subscriptionId,
'error' => $closeError->getMessage()
]);
}
}
// Update last connected time on successful send
$this->lastConnected[$relay->getUrl()] = time();
} catch (\WebSocket\TimeoutException $e) {
// Timeout is normal - relay has sent all events
$this->logger->debug('Relay timeout (normal - all events received)', [
'relay' => $relay->getUrl()
]);
$responses[$relay->getUrl()] = $relayResponses;
$this->lastConnected[$relay->getUrl()] = time();
} catch (\Throwable $e) {
$this->logger->error('Error sending to relay', [
'relay' => $relay->getUrl(),
'error' => $e->getMessage(),
'class' => get_class($e)
]);
// Track failed attempts
$url = $relay->getUrl();
$this->connectionAttempts[$url] = ($this->connectionAttempts[$url] ?? 0) + 1;
// If too many failures, remove from pool
if ($this->connectionAttempts[$url] >= self::MAX_RETRIES) {
$this->logger->warning('Removing relay from pool due to repeated failures', [
'relay' => $url,
'attempts' => $this->connectionAttempts[$url]
]);
unset($this->relays[$url]);
}
}
}
return $responses;
}
/**
* Close a specific relay connection
*/
public function closeRelay(string $relayUrl): void
{
if (isset($this->relays[$relayUrl])) {
$this->logger->debug('Closing relay connection', ['relay' => $relayUrl]);
unset($this->relays[$relayUrl]);
unset($this->connectionAttempts[$relayUrl]);
unset($this->lastConnected[$relayUrl]);
}
}
/**
* Close all relay connections
*/
public function closeAll(): void
{
$this->logger->info('Closing all relay connections', [
'count' => count($this->relays)
]);
$this->relays = [];
$this->connectionAttempts = [];
$this->lastConnected = [];
}
/**
* Get statistics about the connection pool
*/
public function getStats(): array
{
return [
'active_connections' => count($this->relays),
'relays' => array_map(function($url) {
return [
'url' => $url,
'attempts' => $this->connectionAttempts[$url] ?? 0,
'last_connected' => $this->lastConnected[$url] ?? null,
'age' => time() - ($this->lastConnected[$url] ?? time()),
];
}, array_keys($this->relays))
];
}
/**
* Cleanup stale connections (call this periodically)
*/
public function cleanupStaleConnections(int $maxAge = 300): int
{
$now = time();
$cleaned = 0;
foreach ($this->relays as $url => $relay) {
$age = $now - ($this->lastConnected[$url] ?? 0);
if ($age > $maxAge) {
$this->logger->info('Removing stale relay connection', [
'relay' => $url,
'age' => $age
]);
$this->closeRelay($url);
$cleaned++;
}
}
return $cleaned;
}
/**
* Get default relays
*/
public function getDefaultRelays(): array
{
return $this->defaultRelays;
}
}

31
src/Twig/Components/Organisms/Comments.php

@ -84,19 +84,21 @@ final class Comments
{ {
foreach ($this->list as $comment) { foreach ($this->list as $comment) {
$content = $comment['content'] ?? ''; $content = $comment['content'] ?? '';
if (empty($content)) { $commentId = $comment['id'] ?? null;
if (empty($content) || empty($commentId)) {
continue; continue;
} }
// Store the original content // Store the original content
$this->processedContent[$comment->id] = $content; $this->processedContent[$commentId] = $content;
// Parse the content for Nostr links // Parse the content for Nostr links
$links = $this->nostrLinkParser->parseLinks($content); $links = $this->nostrLinkParser->parseLinks($content);
if (!empty($links)) { if (!empty($links)) {
// Save the links for the client-side to fetch // Save the links for the client-side to fetch
$this->commentLinks[$comment->id] = $links; $this->commentLinks[$commentId] = $links;
} }
} }
} }
@ -106,9 +108,16 @@ final class Comments
*/ */
public function parseZaps(): void public function parseZaps(): void
{ {
foreach ($this->list as $comment) { // Use reference (&$comment) so changes to content are saved back to $this->list
foreach ($this->list as &$comment) {
$commentId = $comment['id'] ?? null;
if (empty($commentId)) {
continue;
}
// check if kind is 9735 to get zaps // check if kind is 9735 to get zaps
if ($comment['kind'] !== 9735) { if (($comment['kind'] ?? null) !== 9735) {
continue; continue;
} }
@ -125,15 +134,15 @@ final class Comments
} }
$description = json_decode($descriptionJson); $description = json_decode($descriptionJson);
// 2) If description has content, add it to the comment // 2) If description has content, add it to the comment (now this actually saves!)
if (!empty($description->content)) { if (!empty($description->content)) {
$comment->content = $description->content; $comment['content'] = $description->content;
} }
// 3) Get amount: prefer explicit 'amount' tag (msat), fallback to BOLT11 invoice parsing // 3) Get amount: prefer explicit 'amount' tag (msat), fallback to BOLT11 invoice parsing
$amountSats = null; $amountSats = null;
$amountMsatStr = $this->findTagValue($description->tags, 'amount'); $amountMsatStr = $this->findTagValue($description->tags ?? [], 'amount');
if (is_numeric($amountMsatStr)) { if (is_numeric($amountMsatStr)) {
// amount in millisats per NIP-57 // amount in millisats per NIP-57
$msats = (int) $amountMsatStr; $msats = (int) $amountMsatStr;
@ -149,11 +158,13 @@ final class Comments
} }
} }
$this->zappers[$comment->id] = $this->findTagValue($tags, 'P'); $this->zappers[$commentId] = $this->findTagValue($tags, 'P');
if ($amountSats !== null && $amountSats > 0) { if ($amountSats !== null && $amountSats > 0) {
$this->zapAmounts[$comment->id] = $amountSats; $this->zapAmounts[$commentId] = $amountSats;
} }
} }
// Unset reference to avoid accidental modification later
unset($comment);
} }
// --- Helpers --- // --- Helpers ---

Loading…
Cancel
Save