From ccf35a1c9af81a4895d89cbfa64cae04ad4a3532 Mon Sep 17 00:00:00 2001 From: Silberengel Date: Mon, 27 Apr 2026 21:02:34 +0200 Subject: [PATCH] more refactoring --- config/services.yaml | 11 +- phpstan-baseline.neon | 12 +- src/Service/NostrClient.php | 971 ++++-------------- src/Service/NostrRelayFanoutTransport.php | 230 +++++ src/Service/NostrRelayListFactory.php | 317 ++++++ src/Service/NostrRelayQuery.php | 165 +++ src/Service/NostrRelayRequestFactory.php | 49 + .../Service/NostrRelayFanoutTransportTest.php | 37 + tests/Service/NostrRelayListFactoryTest.php | 35 + tests/Service/NostrRelayQueryTest.php | 38 + .../Service/NostrRelayRequestFactoryTest.php | 33 + 11 files changed, 1098 insertions(+), 800 deletions(-) create mode 100644 src/Service/NostrRelayFanoutTransport.php create mode 100644 src/Service/NostrRelayListFactory.php create mode 100644 src/Service/NostrRelayQuery.php create mode 100644 src/Service/NostrRelayRequestFactory.php create mode 100644 tests/Service/NostrRelayFanoutTransportTest.php create mode 100644 tests/Service/NostrRelayListFactoryTest.php create mode 100644 tests/Service/NostrRelayQueryTest.php create mode 100644 tests/Service/NostrRelayRequestFactoryTest.php diff --git a/config/services.yaml b/config/services.yaml index 8a34537..1462a13 100644 --- a/config/services.yaml +++ b/config/services.yaml @@ -35,13 +35,20 @@ services: Symfony\Component\HttpFoundation\Session\Storage\Handler\PdoSessionHandler: arguments: - '%env(DATABASE_URL)%' - App\Service\NostrClient: + App\Service\NostrRelayRequestFactory: + arguments: + $relayRequestTimeoutSec: '%nostr_relay_request_timeout_sec%' + App\Service\NostrRelayFanoutTransport: + arguments: + $projectDir: '%kernel.project_dir%' + App\Service\NostrRelayListFactory: arguments: $defaultRelayUrl: '%default_relay%' $articleRelayUrls: '%article_relays%' $profileRelayUrls: '%profile_relays%' + App\Service\NostrClient: + arguments: $projectDir: '%kernel.project_dir%' - $relayRequestTimeoutSec: '%nostr_relay_request_timeout_sec%' App\Service\ArticleCommentThreadLoader: arguments: $appCachePool: '@cache.replies' diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 0875db9..1041dcb 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -501,7 +501,7 @@ parameters: - message: '#^Call to function is_array\(\) with array will always evaluate to true\.$#' identifier: function.alreadyNarrowedType - count: 2 + count: 1 path: src/Service/NostrClient.php - @@ -525,13 +525,7 @@ parameters: - message: '#^Call to function is_string\(\) with string will always evaluate to true\.$#' identifier: function.alreadyNarrowedType - count: 11 - path: src/Service/NostrClient.php - - - - message: '#^Call to function method_exists\(\) with swentel\\nostr\\Request\\Request and ''setTimeout'' will always evaluate to true\.$#' - identifier: function.alreadyNarrowedType - count: 1 + count: 6 path: src/Service/NostrClient.php - @@ -579,7 +573,7 @@ parameters: - message: '#^Parameter \#1 \$array \(non\-empty\-list\\) of array_values is already a list, call has no effect\.$#' identifier: arrayValues.list - count: 4 + count: 2 path: src/Service/NostrClient.php - diff --git a/src/Service/NostrClient.php b/src/Service/NostrClient.php index 9221764..70c845b 100644 --- a/src/Service/NostrClient.php +++ b/src/Service/NostrClient.php @@ -3,7 +3,6 @@ namespace App\Service; use App\Entity\Article; -use App\Entity\User; use App\Entity\Event as PublicationEventEntity; use App\Enum\EventStatusEnum; use App\Enum\KindsEnum; @@ -19,8 +18,6 @@ use swentel\nostr\Relay\Relay; use swentel\nostr\Relay\RelaySet; use swentel\nostr\Request\Request; use swentel\nostr\Subscription\Subscription; -use Symfony\Component\Process\PhpExecutableFinder; -use Symfony\Component\Process\Process; use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; use Symfony\Contracts\Cache\CacheInterface; use Symfony\Contracts\Cache\ItemInterface; @@ -29,15 +26,13 @@ use Symfony\Contracts\Cache\ItemInterface; * Main integration point for swentel/nostr against configured relays: long-form fetch, kind-0 profile * metadata, article discussion and comment publish relay lists, magazine 30040 / highlight 9802 ingest, * and related REQ flows. Tuned via `default_relay`, `article_relays`, `profile_relays`, and - * `nostr_relay_request_timeout_sec` (see `config/unfold.yaml`). + * `nostr_relay_request_timeout_sec` (see `config/unfold.yaml`). Shared building blocks: + * {@see NostrRelayRequestFactory} (timeouts), {@see NostrRelayQuery} (REQ + response fan-in), + * {@see NostrRelayFanoutTransport} (sequential vs parallel multi-relay REQ), {@see NostrRelayListFactory} + * (config relay lists, merge/dedupe, {@link RelaySet} for profile fetches, Nostr Land + aggr). */ class NostrClient { - /** Extra wall time for {@see bin/nostr_relay_request_worker.php} process vs. WebSocket timeout. */ - private const DISCUSSION_WORKER_GRACE_SEC = 5.0; - /** Soft wall-time for parallel discussion collection before returning partial results. */ - private const DISCUSSION_PARALLEL_SOFT_DEADLINE_SEC = 3.5; - /** * Hard cap on unique relay URLs for article discussion. More relays do not help much (indexers duplicate) * but blow up wall time when we fall back to sequential in-process {@see Request::send()}. @@ -50,34 +45,11 @@ class NostrClient */ private const MAX_HIGHLIGHT_RELAY_URLS = 32; - /** - * {@see Request::send()} hits relays sequentially; profile pages (metadata, long-form list, 10133) used - * the full default+article+profile list (~8–9 wss) → 2 slow relays can exceed PHP’s 30s default max_execution_time. - */ - private const MAX_PROFILE_SEQUENTIAL_RELAY_URLS = 3; - - /** - * {@see sendArticleDiscussionToRelaysSequential} visits relays one after another - * (~{@see NostrClient::$relayRequestTimeoutSec} s each). Keep this low so HTTP /fragment/comments - * and browsers do not hit 60–90s proxy cuts. - */ - private const MAX_SEQUENTIAL_RELAY_URLS = 3; - - /** When a logged-in user lists this relay, also use {@see self::AGGR_NOSTR_LAND} for comment + profile reads. */ - private const NOSTR_LAND = 'wss://nostr.land'; - - /** - * Aggregated / subscription relay (not for anonymous visitors). Only added when the session user - * has {@see self::NOSTR_LAND} in their NIP-65-style relay list. - */ - private const AGGR_NOSTR_LAND = 'wss://aggr.nostr.land'; - private RelaySet $defaultRelaySet; /** - * @param list $articleRelayUrls extra relays for the default set (default_relay is always first) - * @param list $profileRelayUrls kind-0 / profile; merged for metadata (see {@see profileMetadataQueryRelayUrlList()}) - * @param int $relayRequestTimeoutSec Per-relay WebSocket I/O cap (see `nostr_relay_request_timeout_sec` in `config/unfold.yaml`) + * @param NostrRelayRequestFactory $relayRequestFactory Per-relay WebSocket I/O cap (see `nostr_relay_request_timeout_sec` in `config/unfold.yaml`) + * @param NostrRelayFanoutTransport $relayFanout Multi-relay sequential/parallel + wire logging */ public function __construct( private readonly EntityManagerInterface $entityManager, @@ -85,14 +57,14 @@ class NostrClient private readonly ArticleFactory $articleFactory, private readonly TokenStorageInterface $tokenStorage, private readonly LoggerInterface $logger, - private readonly string $defaultRelayUrl, - private readonly array $articleRelayUrls, - private readonly array $profileRelayUrls, private readonly CacheInterface $relayQueryCache, private readonly string $projectDir, - private readonly int $relayRequestTimeoutSec = 12, + private readonly NostrRelayRequestFactory $relayRequestFactory, + private readonly NostrRelayQuery $nostrRelayQuery, + private readonly NostrRelayFanoutTransport $relayFanout, + private readonly NostrRelayListFactory $relayListFactory, ) { - $this->defaultRelaySet = $this->buildArticleRelaySet(); + $this->defaultRelaySet = $this->relayListFactory->getDefaultArticleRelaySet(); } /** @@ -102,7 +74,7 @@ class NostrClient */ public function getArticleWriteRelayUrls(): array { - return $this->configuredArticleRelayUrlList(); + return $this->relayListFactory->getConfiguredArticleRelayUrlList(); } /** @@ -117,7 +89,7 @@ class NostrClient */ public function getRelayUrlsForCommentPublish(string $articleCoordinate, string $parentEventAuthorHex): array { - $base = $this->configuredArticleRelayUrlList(); + $base = $this->relayListFactory->getConfiguredArticleRelayUrlList(); $parts = explode(':', $articleCoordinate, 3); $articlePk = \count($parts) >= 2 ? strtolower((string) $parts[1]) : ''; if (64 !== \strlen($articlePk) || !ctype_xdigit($articlePk)) { @@ -150,149 +122,6 @@ class NostrClient return $out; } - /** - * default_relay + article_relays from config, in order, deduplicated. Used for the static - * default set and as the base when merging author/extra relay URLs in {@see createRelaySet()}. - * - * @return list - */ - private function configuredArticleRelayUrlList(): array - { - $seen = []; - $out = []; - foreach (array_merge([$this->defaultRelayUrl], $this->articleRelayUrls) as $url) { - if (!\is_string($url) || $url === '' || isset($seen[$url])) { - continue; - } - $seen[$url] = true; - $out[] = $url; - } - if ($out === []) { - $out[] = $this->defaultRelayUrl; - } - - return $out; - } - - private function buildArticleRelaySet(): RelaySet - { - $relaySet = new RelaySet(); - foreach ($this->configuredArticleRelayUrlList() as $url) { - $relaySet->addRelay(new Relay($url)); - } - - return $relaySet; - } - - /** - * Configured profile relays (kind-0 / NIP-05 hints) that are not already in the article relay list. - * Used as a second pass for magazine 30040 and category long-form ingest when article relays return nothing. - * Intentionally excludes merging article URLs again — {@see createRelaySet()} prepends article relays. - * - * @return list - */ - private function profileRelayUrlsExcludedFromArticleRelays(): array - { - $article = array_fill_keys($this->configuredArticleRelayUrlList(), true); - $out = []; - foreach ($this->profileRelayUrlList() as $u) { - if (!isset($article[$u])) { - $out[] = $u; - } - } - - return $out; - } - - /** - * Relay set built only from the given URLs (no implicit article-relay merge). - */ - private function createRelaySetFromUrlsOnly(array $relayUrls): RelaySet - { - $relaySet = new RelaySet(); - $seen = []; - foreach ($relayUrls as $relayUrl) { - if (!\is_string($relayUrl) || $relayUrl === '' || isset($seen[$relayUrl])) { - continue; - } - $seen[$relayUrl] = true; - $relaySet->addRelay(new Relay($relayUrl)); - } - - return $relaySet; - } - - /** - * Single-relay set for I/O that intentionally hits one wss (e.g. longform ingest). Magazine - * 30040 resolution uses the full article relay set so all relays can contribute the latest - * NIP-33 replaceable per address. - */ - private function buildSingleRelaySet(string $wssUrl): RelaySet - { - $rs = new RelaySet(); - $rs->addRelay(new Relay($wssUrl)); - - return $rs; - } - - /** - * Host (or full URL) for log messages so console output shows which relay without opening context. - */ - private static function relayLogLabel(string $relayUrl): string - { - $host = parse_url($relayUrl, \PHP_URL_HOST); - if (\is_string($host) && $host !== '') { - return $host; - } - - return $relayUrl; - } - - private function newTimedRequest(RelaySet $relaySet, RequestMessage $requestMessage): Request - { - $request = new Request($relaySet, $requestMessage); - // 1.9.4+: Request::setTimeout() drives getResponseFromRelay(). Older: only WebSocket client on Relay. - if (method_exists($request, 'setTimeout')) { - $request->setTimeout($this->relayRequestTimeoutSec); - } else { - $this->applyRelaySocketTimeoutToSet($relaySet); - } - - return $request; - } - - /** - * Set per-relay WebSocket I/O cap. {@see RelaySet::send()} bypasses {@see Request}; use this there too. - */ - private function applyRelaySocketTimeoutToSet(RelaySet $relaySet): void - { - foreach ($relaySet->getRelays() as $relay) { - $client = $relay->getClient(); - if (method_exists($client, 'setTimeout')) { - $client->setTimeout($this->relayRequestTimeoutSec); - } - } - } - - /** - * Merges all configured article relays (default + article_relays) with the given URLs in order, deduped. - * Used for comment threads (getArticleDiscussion), per-author fetches, etc. - */ - private function createRelaySet(array $relayUrls): RelaySet - { - $relaySet = new RelaySet(); - $seen = []; - foreach (array_merge($this->configuredArticleRelayUrlList(), $relayUrls) as $relayUrl) { - if (!\is_string($relayUrl) || $relayUrl === '' || isset($seen[$relayUrl])) { - continue; - } - $seen[$relayUrl] = true; - $relaySet->addRelay(new Relay($relayUrl)); - } - - return $relaySet; - } - /** * Suffix to segregate HTTP caches: aggr is only used for some logged-in readers, so results differ. * @@ -300,108 +129,7 @@ class NostrClient */ public function getNostrLandAggrReaderCacheSuffix(): string { - return $this->loggedInUserHasNostrLandInRelayList() ? 'a1' : ''; - } - - private function loggedInUserHasNostrLandInRelayList(): bool - { - $token = $this->tokenStorage->getToken(); - if ($token === null) { - return false; - } - $user = $token->getUser(); - if (!$user instanceof User) { - return false; - } - - return $this->userRelayListContainsNostrLand($user->getRelays()); - } - - /** - * @param list|array|null $relays - */ - private function userRelayListContainsNostrLand(?array $relays): bool - { - if ($relays === null || $relays === []) { - return false; - } - $target = $this->normalizeWssUrlForNostrLandMatch(self::NOSTR_LAND); - foreach ($relays as $row) { - if (!\is_array($row) || !isset($row[1]) || !\is_string($row[1])) { - continue; - } - if ($this->normalizeWssUrlForNostrLandMatch($row[1]) === $target) { - return true; - } - } - - return false; - } - - private function normalizeWssUrlForNostrLandMatch(string $url): string - { - return rtrim(trim($url), '/'); - } - - /** - * Appends wss://aggr.nostr.land when the current user listed wss://nostr.land (session). - * - * @param list $urls - * @return list - */ - private function withAggrNostrLandIfUserSubscribesNostrLand(array $urls): array - { - if (!$this->loggedInUserHasNostrLandInRelayList()) { - return $urls; - } - $seen = array_fill_keys($urls, true); - if (isset($seen[self::AGGR_NOSTR_LAND])) { - return $urls; - } - $this->logger->debug('nostr.relay.append_aggr_nostr_land', [ - 'user_has_nostr_land' => true, - ]); - $out = $urls; - $out[] = self::AGGR_NOSTR_LAND; - - return $out; - } - - /** - * @param list $urls - */ - private function relaySetFromDistinctUrlList(array $urls): RelaySet - { - $relaySet = new RelaySet(); - $seen = []; - foreach ($urls as $relayUrl) { - if (!\is_string($relayUrl) || $relayUrl === '' || isset($seen[$relayUrl])) { - continue; - } - $seen[$relayUrl] = true; - $relaySet->addRelay(new Relay($relayUrl)); - } - - return $relaySet; - } - - /** - * @param list $urls - * - * @return list - */ - private function capSequentialRelaysForProfileFetches(array $urls): array - { - if (\count($urls) <= self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS) { - return $urls; - } - $this->logger->notice('nostr.relay_list_capped', [ - 'context' => 'profile_sequential', - 'max' => self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS, - 'had' => \count($urls), - ]); - - return \array_values(\array_slice($urls, 0, self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS)); + return $this->relayListFactory->getNostrLandAggrReaderCacheSuffix(); } /** @@ -457,7 +185,7 @@ class NostrClient { $all = $this->getAuthorNip65RelaysList($pubkey); if ($all === []) { - return [$this->defaultRelayUrl]; + return [$this->relayListFactory->getDefaultRelayUrl()]; } if ($limit < 1) { $limit = 1; @@ -466,64 +194,6 @@ class NostrClient return \array_values(\array_slice($all, 0, $limit)); } - /** - * @return list Deduplicated profile relay URLs from config - */ - private function profileRelayUrlList(): array - { - $seen = []; - $out = []; - foreach ($this->profileRelayUrls as $url) { - if (!\is_string($url) || $url === '' || isset($seen[$url])) { - continue; - } - if (!str_starts_with($url, 'wss:')) { - continue; - } - $seen[$url] = true; - $out[] = $url; - } - - return $out; - } - - /** - * Profile (kind-0) queries: {@see profileRelayUrlList()} first (Damus, nos.lol, …), then default + article set. - * Order matters: {@see Request::send()} walks relays sequentially. - * - * @return list - */ - private function profileMetadataQueryRelayUrlList(): array - { - $seen = []; - $ordered = []; - foreach (array_merge($this->profileRelayUrlList(), $this->configuredArticleRelayUrlList()) as $u) { - if (!\is_string($u) || $u === '' || isset($seen[$u])) { - continue; - } - $seen[$u] = true; - $ordered[] = $u; - } - if ($ordered === []) { - $ordered[] = $this->defaultRelayUrl; - } - - return $this->withAggrNostrLandIfUserSubscribesNostrLand($ordered); - } - - /** - * Same relays for kind-0 metadata, without mutating {@see $this->defaultRelaySet}. - */ - private function relaySetForProfileMetadataFetch(): RelaySet - { - $relaySet = new RelaySet(); - foreach ($this->profileMetadataQueryRelayUrlList() as $url) { - $relaySet->addRelay(new Relay($url)); - } - - return $relaySet; - } - /** * NIP kind-range convention: kind 0, 3, and 10_000–19_999 are replaceable by (kind, pubkey) only; * 30_000–39_999 are addressable by (kind, pubkey, d). On equal {@see created_at}, the @@ -654,18 +324,19 @@ class NostrClient } $authorsPerRequest = max(1, min(200, $authorsPerRequest)); $byPub = []; - $relaysTried = $this->profileMetadataQueryRelayUrlList(); - $relaysTriedStr = implode(', ', array_map(self::relayLogLabel(...), $relaysTried)); - $relaySet = $this->relaySetForProfileMetadataFetch(); + $relaysTried = $this->relayListFactory->getProfileMetadataQueryRelayUrlList(); + $relaysTriedStr = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $relaysTried)); + $relaySet = $this->relayListFactory->getRelaySetForProfileMetadataFetch(); $chunks = array_chunk($authorPubkeyHex, $authorsPerRequest); foreach ($chunks as $i => $chunk) { $t0 = microtime(true); - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [KindsEnum::METADATA], filters: ['authors' => $chunk], relaySet: $relaySet ); - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $request->send(), static fn ($ev) => $ev, ); @@ -713,16 +384,17 @@ class NostrClient } $authorsPerRequest = max(1, min(200, $authorsPerRequest)); $byPub = []; - $relaysTried = $this->profileMetadataQueryRelayUrlList(); - $relaySet = $this->relaySetForProfileMetadataFetch(); + $relaysTried = $this->relayListFactory->getProfileMetadataQueryRelayUrlList(); + $relaySet = $this->relayListFactory->getRelaySetForProfileMetadataFetch(); $chunks = array_chunk($authorPubkeyHex, $authorsPerRequest); foreach ($chunks as $chunk) { - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [KindsEnum::METADATA], filters: ['authors' => $chunk], relaySet: $relaySet ); - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $request->send(), static fn ($ev) => $ev, ); @@ -766,7 +438,8 @@ class NostrClient $chunks = array_chunk($authorPubkeyHex, $authorsPerRequest); $numChunks = \count($chunks); foreach ($chunks as $i => $chunk) { - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [KindsEnum::DELETION_REQUEST], filters: [ 'authors' => $chunk, @@ -775,7 +448,7 @@ class NostrClient ], ); $t0 = microtime(true); - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $request->send(), static fn (object $event) => $event, ); @@ -853,17 +526,18 @@ class NostrClient */ public function getNpubMetadata($npub): \stdClass { - $relaysTried = $this->capSequentialRelaysForProfileFetches($this->profileMetadataQueryRelayUrlList()); - $relaysTriedStr = implode(', ', array_map(self::relayLogLabel(...), $relaysTried)); - $relaySet = $this->relaySetFromDistinctUrlList($relaysTried); + $relaysTried = $this->relayListFactory->capSequentialRelaysForProfileFetches($this->relayListFactory->getProfileMetadataQueryRelayUrlList()); + $relaysTriedStr = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $relaysTried)); + $relaySet = $this->relayListFactory->relaySetFromDistinctUrlList($relaysTried); $this->logger->info(sprintf('Getting metadata for npub (relays: %s)', $relaysTriedStr), ['npub' => $npub, 'relays' => $relaysTried]); - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [KindsEnum::METADATA], filters: ['authors' => [$npub]], relaySet: $relaySet ); - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $request->send(), function ($received) { $this->logger->debug('nostr.metadata.relay_event', ['event' => $received]); @@ -897,16 +571,17 @@ class NostrClient */ public function getKind10133PaymentTargetEventsForNpub(string $npub, int $limit = 20): array { - $relaysTried = $this->capSequentialRelaysForProfileFetches($this->profileMetadataQueryRelayUrlList()); - $relaysTriedStr = implode(', ', array_map(self::relayLogLabel(...), $relaysTried)); - $relaySet = $this->relaySetFromDistinctUrlList($relaysTried); + $relaysTried = $this->relayListFactory->capSequentialRelaysForProfileFetches($this->relayListFactory->getProfileMetadataQueryRelayUrlList()); + $relaysTriedStr = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $relaysTried)); + $relaySet = $this->relayListFactory->relaySetFromDistinctUrlList($relaysTried); try { - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [KindsEnum::PAYMENT_TARGETS], filters: ['authors' => [$npub], 'limit' => max(1, min(50, $limit))], relaySet: $relaySet ); - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $request->send(), static fn ($ev) => $ev, ); @@ -919,7 +594,7 @@ class NostrClient return []; } - if (!\is_array($events) || $events === []) { + if ($events === []) { return []; } @@ -949,9 +624,9 @@ class NostrClient } } - $request = $this->newTimedRequest($relays, $requestMessage); + $request = $this->relayRequestFactory->createTimedRequest($relays, $requestMessage); - $wrappers = $this->processResponse($request->send(), function (object $event) { + $wrappers = $this->nostrRelayQuery->processResponse($request->send(), function (object $event) { $w = new \stdClass(); $w->event = $event; @@ -975,7 +650,7 @@ class NostrClient $relaySet = new RelaySet(); $relaySet->addRelay(new Relay($relayWss)); $relaySet->setMessage($eventMessage); - $this->applyRelaySocketTimeoutToSet($relaySet); + $this->relayRequestFactory->applySocketTimeoutToRelaySet($relaySet); $sent = $relaySet->send(); if (\is_array($sent) && \array_key_exists($relayWss, $sent)) { $results[$relayWss] = $sent[$relayWss]; @@ -1031,9 +706,9 @@ class NostrClient $filter->setUntil($until); $requestMessage = new RequestMessage($subscriptionId, [$filter]); - $request = $this->newTimedRequest($this->defaultRelaySet, $requestMessage); + $request = $this->relayRequestFactory->createTimedRequest($this->defaultRelaySet, $requestMessage); - $wrappers = $this->processResponse($request->send(), function (object $event) { + $wrappers = $this->nostrRelayQuery->processResponse($request->send(), function (object $event) { $w = new \stdClass(); $w->event = $event; @@ -1051,17 +726,18 @@ class NostrClient { if (empty($relayList)) { $topAuthorRelays = $this->getTopReputableRelaysForAuthor($author); - $authorRelaySet = $this->createRelaySet($topAuthorRelays); + $authorRelaySet = $this->relayListFactory->createRelaySetMergedWithArticleList($topAuthorRelays); $relaysTried = $this->plannedRelayUrlsForSet($topAuthorRelays); } else { - $authorRelaySet = $this->createRelaySet($relayList); + $authorRelaySet = $this->relayListFactory->createRelaySetMergedWithArticleList($relayList); $relaysTried = $this->plannedRelayUrlsForSet($relayList); } - $relaysTriedStr = implode(', ', array_map(self::relayLogLabel(...), $relaysTried)); + $relaysTriedStr = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $relaysTried)); try { // Create request using the helper method for forest relay set - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [$kind], filters: [ 'authors' => [$author], @@ -1071,7 +747,7 @@ class NostrClient ); // Process the response - $events = $this->processResponse($request->send(), function($event) { + $events = $this->nostrRelayQuery->processResponse($request->send(), function($event) { return $event; }); @@ -1111,17 +787,18 @@ class NostrClient $this->logger->info('Getting event by ID', ['event_id' => $eventId, 'relays' => $relays]); // Use provided relays or default if empty - $relaySet = empty($relays) ? $this->defaultRelaySet : $this->createRelaySet($relays); + $relaySet = empty($relays) ? $this->defaultRelaySet : $this->relayListFactory->createRelaySetMergedWithArticleList($relays); // Create request using the helper method - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [], // Leave empty to accept any kind filters: ['ids' => [$eventId]], relaySet: $relaySet ); // Process the response - $events = $this->processResponse($request->send(), function($event) { + $events = $this->nostrRelayQuery->processResponse($request->send(), function($event) { $this->logger->debug('Received event', ['event' => $event]); return $event; }); @@ -1157,10 +834,11 @@ class NostrClient // Try author's relays first $authorRelays = empty($relays) ? $this->getTopReputableRelaysForAuthor($pubkey) : $relays; - $relaySet = $this->createRelaySet($authorRelays); + $relaySet = $this->relayListFactory->createRelaySetMergedWithArticleList($authorRelays); // Create request using the helper method - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [$kind], filters: [ 'authors' => [$pubkey], @@ -1170,7 +848,7 @@ class NostrClient ); // Process the response - $events = $this->processResponse($request->send(), function($event) { + $events = $this->nostrRelayQuery->processResponse($request->send(), function($event) { return $event; }); @@ -1179,7 +857,8 @@ class NostrClient } // Try default relays as fallback - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [$kind], filters: [ 'authors' => [$pubkey], @@ -1187,7 +866,7 @@ class NostrClient ] ); - $events = $this->processResponse($request->send(), function($event) { + $events = $this->nostrRelayQuery->processResponse($request->send(), function($event) { return $event; }); @@ -1225,12 +904,13 @@ class NostrClient */ public function getNpubRelayList10002Wire($npub): ?object { - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [KindsEnum::RELAY_LIST], filters: ['authors' => [$npub]], relaySet: $this->defaultRelaySet ); - $response = $this->processResponse($request->send(), function ($received) { + $response = $this->nostrRelayQuery->processResponse($request->send(), function ($received) { return $received; }); if (empty($response)) { @@ -1315,8 +995,8 @@ class NostrClient 'author_relay_count' => \count($authorRelays), ]); - $baseForDiscussion = $this->configuredArticleRelayUrlList(); - $mergedForDiscussion = $this->withAggrNostrLandIfUserSubscribesNostrLand( + $baseForDiscussion = $this->relayListFactory->getConfiguredArticleRelayUrlList(); + $mergedForDiscussion = $this->relayListFactory->withAggrNostrLandIfUserSubscribesNostrLand( array_merge($baseForDiscussion, $authorRelays) ); $plannedRelayUrls = array_values(array_unique($mergedForDiscussion, \SORT_REGULAR)); @@ -1344,21 +1024,27 @@ class NostrClient $tSend = microtime(true); $workerPath = $this->projectDir.'/bin/nostr_relay_request_worker.php'; if (!\is_file($workerPath) || \count($plannedRelayUrls) <= 1) { - $forSeq = $this->capRelayUrlsForSequentialPath($plannedRelayUrls); - $response = $this->sendArticleDiscussionToRelaysSequential($forSeq, $requestMessage); + $forSeq = $this->relayFanout->capUrlsForSequential($plannedRelayUrls); + $response = $this->relayFanout->sendSequential( + $this->relayListFactory->relaySetFromDistinctUrlList($forSeq), + $requestMessage + ); } else { try { - $response = $this->sendArticleDiscussionToRelaysParallel($plannedRelayUrls, $requestMessage); + $response = $this->relayFanout->sendParallelWorkers($plannedRelayUrls, $requestMessage); } catch (\Throwable $e) { $this->logger->warning('nostr.article_discussion.parallel_failed', [ 'message' => $e->getMessage(), 'exception_class' => \get_class($e), ]); - $forSeq = $this->capRelayUrlsForSequentialPath($plannedRelayUrls); + $forSeq = $this->relayFanout->capUrlsForSequential($plannedRelayUrls); $this->logger->warning('nostr.article_discussion.sequential_fallback', [ 'relays' => $forSeq, ]); - $response = $this->sendArticleDiscussionToRelaysSequential($forSeq, $requestMessage); + $response = $this->relayFanout->sendSequential( + $this->relayListFactory->relaySetFromDistinctUrlList($forSeq), + $requestMessage + ); } } $sendMs = (int) round((microtime(true) - $tSend) * 1000); @@ -1366,11 +1052,11 @@ class NostrClient 'elapsed_ms' => $sendMs, 'subscription_id' => $subscriptionId, ]); - $this->logNostrWireResponseSummary('article_discussion', $response); + $this->relayFanout->logWireResponseSummary('article_discussion', $response); } catch (\Throwable $e) { $this->logger->error(sprintf( 'nostr.article_discussion.req_send_failed (relays: %s): %s', - implode(', ', array_map(self::relayLogLabel(...), $plannedRelayUrls)), + implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $plannedRelayUrls)), $e->getMessage() ), [ 'coordinate' => $coordinate, @@ -1387,7 +1073,7 @@ class NostrClient $respondedRelayCount = \count($response); $partial = $respondedRelayCount < \count($plannedRelayUrls); $tParse = microtime(true); - $this->processResponse($response, function ($event) use (&$byId) { + $this->nostrRelayQuery->processResponse($response, function ($event) use (&$byId) { if (\is_object($event) && isset($event->id)) { $byId[(string) $event->id] = $event; } @@ -1450,8 +1136,8 @@ class NostrClient /** * Fetches kind 9802 (highlights) that reference the long-form address. Used for DB ingest only - * ({@see HighlightSyncService} / prewarm). Relays: {@see configuredArticleRelayUrlList} (main + - * article_relays), then config {@see profileRelayUrlList}, then author NIP-65, deduped (cap + * ({@see HighlightSyncService} / prewarm). Relays: {@see NostrRelayListFactory::getConfiguredArticleRelayUrlList()} (main + + * article_relays), then config {@see NostrRelayListFactory::getProfileRelayUrlList()}, then author NIP-65, deduped (cap * {@see MAX_HIGHLIGHT_RELAY_URLS}). * * @return list unique wire events by id @@ -1471,9 +1157,9 @@ class NostrClient 'author_relay_count' => \count($authorRelays), ]); - $baseArticle = $this->configuredArticleRelayUrlList(); - $profileConfigured = $this->profileRelayUrlList(); - $mergedForDiscussion = $this->withAggrNostrLandIfUserSubscribesNostrLand( + $baseArticle = $this->relayListFactory->getConfiguredArticleRelayUrlList(); + $profileConfigured = $this->relayListFactory->getProfileRelayUrlList(); + $mergedForDiscussion = $this->relayListFactory->withAggrNostrLandIfUserSubscribesNostrLand( array_merge($baseArticle, $profileConfigured, $authorRelays) ); $plannedRelayUrls = array_values(array_unique($mergedForDiscussion, \SORT_REGULAR)); @@ -1510,18 +1196,24 @@ class NostrClient try { if (!\is_file($this->projectDir.'/bin/nostr_relay_request_worker.php') || \count($plannedRelayUrls) <= 1) { - $forSeq = $this->capRelayUrlsForSequentialPath($plannedRelayUrls); - $response = $this->sendArticleDiscussionToRelaysSequential($forSeq, $requestMessage); + $forSeq = $this->relayFanout->capUrlsForSequential($plannedRelayUrls); + $response = $this->relayFanout->sendSequential( + $this->relayListFactory->relaySetFromDistinctUrlList($forSeq), + $requestMessage + ); } else { try { - $response = $this->sendArticleDiscussionToRelaysParallel($plannedRelayUrls, $requestMessage); + $response = $this->relayFanout->sendParallelWorkers($plannedRelayUrls, $requestMessage); } catch (\Throwable $e) { $this->logger->warning('nostr.highlight.parallel_failed', [ 'message' => $e->getMessage(), 'exception_class' => \get_class($e), ]); - $forSeq = $this->capRelayUrlsForSequentialPath($plannedRelayUrls); - $response = $this->sendArticleDiscussionToRelaysSequential($forSeq, $requestMessage); + $forSeq = $this->relayFanout->capUrlsForSequential($plannedRelayUrls); + $response = $this->relayFanout->sendSequential( + $this->relayListFactory->relaySetFromDistinctUrlList($forSeq), + $requestMessage + ); } } } catch (\Throwable $e) { @@ -1532,7 +1224,7 @@ class NostrClient } $byId = []; - $this->processResponse($response, function ($event) use (&$byId) { + $this->nostrRelayQuery->processResponse($response, function ($event) use (&$byId) { if (\is_object($event) && isset($event->id) && (int) ($event->kind ?? 0) === KindsEnum::HIGHLIGHTS->value) { $byId[(string) $event->id] = $event; } @@ -1546,139 +1238,7 @@ class NostrClient } /** - * @param list $relayUrls - * - * @return list - */ - private function capRelayUrlsForSequentialPath(array $relayUrls): array - { - if (\count($relayUrls) <= self::MAX_SEQUENTIAL_RELAY_URLS) { - return $relayUrls; - } - $this->logger->notice('nostr.article_discussion.sequential_relay_cap', [ - 'used' => self::MAX_SEQUENTIAL_RELAY_URLS, - 'had' => \count($relayUrls), - ]); - - return \array_values(\array_slice($relayUrls, 0, self::MAX_SEQUENTIAL_RELAY_URLS)); - } - - /** - * One {@see Request} over all relays (library visits each wss:// in series). - * - * @param list $relayUrls - * - * @return array Same shape as {@see Request::send()} (relay url → message list) - */ - private function sendArticleDiscussionToRelaysSequential(array $relayUrls, RequestMessage $requestMessage): array - { - $relaySet = $this->relaySetFromDistinctUrlList($relayUrls); - $request = $this->newTimedRequest($relaySet, $requestMessage); - - return $request->send(); - } - - /** - * One short-lived CLI worker per relay so WebSocket I/O is parallel (vs. a single in-process - * {@see Request::send() loop). - * - * @param list $relayUrls - * - * @return array Same shape as {@see Request::send()} - */ - private function sendArticleDiscussionToRelaysParallel(array $relayUrls, RequestMessage $requestMessage): array - { - $worker = $this->projectDir.'/bin/nostr_relay_request_worker.php'; - $phpBinary = (new PhpExecutableFinder())->find() ?: 'php'; - $timeout = $this->relayRequestTimeoutSec + (int) self::DISCUSSION_WORKER_GRACE_SEC; - $workerTimeoutEnv = ['NOSTR_RELAY_REQUEST_TIMEOUT' => (string) $this->relayRequestTimeoutSec]; - - $rawPayload = serialize($requestMessage); - $tmp = tempnam(sys_get_temp_dir(), 'nrq_'); - if ($tmp === false) { - throw new \RuntimeException('tempnam failed for Nostr discussion payload'); - } - try { - if (file_put_contents($tmp, $rawPayload) === false) { - throw new \RuntimeException('Could not write Nostr discussion temp payload'); - } - - /** @var array $procs */ - $procs = []; - foreach ($relayUrls as $wss) { - if (!\is_string($wss) || $wss === '') { - continue; - } - $p = new Process( - [$phpBinary, $worker, $wss, $tmp], - $this->projectDir, - null, - null, - (float) $timeout - ); - $p->start(null, $workerTimeoutEnv); - $procs[$wss] = $p; - } - - $merged = []; - $pending = $procs; - $deadlineAt = microtime(true) + self::DISCUSSION_PARALLEL_SOFT_DEADLINE_SEC; - while ($pending !== []) { - foreach ($pending as $wss => $p) { - if ($p->isRunning()) { - continue; - } - unset($pending[$wss]); - if (!$p->isSuccessful()) { - $err = $p->getErrorOutput(); - $this->logger->warning('nostr.article_discussion.relay_worker_failed', [ - 'relay' => $wss, - 'exit_code' => $p->getExitCode(), - 'stderr' => $err !== '' ? $err : null, - ]); - - continue; - } - $out = trim($p->getOutput()); - if ($out === '') { - continue; - } - $decoded = base64_decode($out, true); - if ($decoded === false || $decoded === '') { - continue; - } - $chunk = unserialize($decoded, ['allowed_classes' => true]); - if (!\is_array($chunk)) { - continue; - } - $merged = array_replace($merged, $chunk); - } - if ($pending === []) { - break; - } - if (microtime(true) >= $deadlineAt) { - foreach ($pending as $wss => $p) { - $this->logger->warning('nostr.article_discussion.relay_worker_soft_timeout', [ - 'relay' => $wss, - 'soft_deadline_sec' => self::DISCUSSION_PARALLEL_SOFT_DEADLINE_SEC, - ]); - $p->stop(0.2); - } - break; - } - usleep(100_000); - } - - return $merged; - } finally { - if (\is_file($tmp)) { - @unlink($tmp); - } - } - } - - /** - * Same merge/dedupe rules as {@see createRelaySet()} — used only for logging planned relay URLs. + * Same merge/dedupe rules as {@see NostrRelayListFactory::createRelaySetMergedWithArticleList()} — used only for logging planned relay URLs. * * @param array $relayUrls * @@ -1688,7 +1248,7 @@ class NostrClient { $seen = []; $out = []; - foreach (array_merge($this->configuredArticleRelayUrlList(), $relayUrls) as $relayUrl) { + foreach (array_merge($this->relayListFactory->getConfiguredArticleRelayUrlList(), $relayUrls) as $relayUrl) { if (!\is_string($relayUrl) || $relayUrl === '' || isset($seen[$relayUrl])) { continue; } @@ -1699,71 +1259,6 @@ class NostrClient return $out; } - /** - * One line per relay after {@see Request::send()}: errors vs message-type counts (EVENT, EOSE, …). - * - * @param array $response - */ - private function logNostrWireResponseSummary(string $context, array $response): void - { - foreach ($response as $relayUrl => $relayRes) { - if ($relayRes instanceof \Throwable) { - $this->logger->warning(sprintf( - 'nostr.wire.relay_throwable [%s]: %s', - self::relayLogLabel($relayUrl), - $relayRes->getMessage() - ), [ - 'context' => $context, - 'relay' => $relayUrl, - 'message' => $relayRes->getMessage(), - 'class' => \get_class($relayRes), - ]); - - continue; - } - if (!\is_iterable($relayRes)) { - $this->logger->warning(sprintf( - 'nostr.wire.relay_not_iterable [%s]: %s', - self::relayLogLabel($relayUrl), - \get_debug_type($relayRes) - ), [ - 'context' => $context, - 'relay' => $relayUrl, - 'php_type' => \get_debug_type($relayRes), - ]); - - continue; - } - $counts = [ - 'EVENT' => 0, - 'EOSE' => 0, - 'NOTICE' => 0, - 'ERROR' => 0, - 'AUTH' => 0, - 'CLOSED' => 0, - 'other' => 0, - ]; - foreach ($relayRes as $item) { - if (!\is_object($item)) { - ++$counts['other']; - - continue; - } - $t = (string) ($item->type ?? 'other'); - if (\array_key_exists($t, $counts)) { - ++$counts[$t]; - } else { - ++$counts['other']; - } - } - $this->logger->info(sprintf('nostr.wire.relay_messages [%s]', self::relayLogLabel($relayUrl)), [ - 'context' => $context, - 'relay' => $relayUrl, - 'counts' => $counts, - ]); - } - } - private function eventIsNip22ArticleThreadReply(object $event, string $coordinate): bool { if ((int) ($event->kind ?? 0) !== KindsEnum::COMMENTS->value) { @@ -1939,18 +1434,19 @@ class NostrClient // Get author's relays for better chances of finding zaps $authorRelays = $this->getTopReputableRelaysForAuthor($pubkey); - $relaySet = $this->createRelaySet($authorRelays); + $relaySet = $this->relayListFactory->createRelaySetMergedWithArticleList($authorRelays); // Create request using the helper method // Zaps are kind 9735 - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [KindsEnum::ZAP], filters: ['tag' => ['#a', [$coordinate]]], relaySet: $relaySet ); // Process the response - return $this->processResponse($request->send(), function($event) { + return $this->nostrRelayQuery->processResponse($request->send(), function($event) { $this->logger->debug('Received zap event', ['event_id' => $event->id]); return $event; }); @@ -1962,7 +1458,7 @@ class NostrClient public function getLongFormContentForPubkey(string $ident): array { $authorRelays = $this->getTopReputableRelaysForAuthor($ident); - $base = $this->configuredArticleRelayUrlList(); + $base = $this->relayListFactory->getConfiguredArticleRelayUrlList(); $merged = $authorRelays !== [] ? array_merge($base, $authorRelays) : $base; $seen = []; $deduped = []; @@ -1973,11 +1469,12 @@ class NostrClient $seen[$url] = true; $deduped[] = $url; } - $capped = $this->capSequentialRelaysForProfileFetches($deduped); - $relaySet = $this->relaySetFromDistinctUrlList($capped); + $capped = $this->relayListFactory->capSequentialRelaysForProfileFetches($deduped); + $relaySet = $this->relayListFactory->relaySetFromDistinctUrlList($capped); // Create request using the helper method - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [KindsEnum::LONGFORM], filters: [ 'authors' => [$ident], @@ -1986,7 +1483,7 @@ class NostrClient relaySet: $relaySet ); - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $request->send(), static fn (object $event) => $event, ); @@ -2012,7 +1509,7 @@ class NostrClient $requestMessage = new RequestMessage($subscriptionId, [$filter]); try { - $request = $this->newTimedRequest($this->defaultRelaySet, $requestMessage); + $request = $this->relayRequestFactory->createTimedRequest($this->defaultRelaySet, $requestMessage); $response = $request->send(); $hasEvents = false; @@ -2021,7 +1518,7 @@ class NostrClient if ($value instanceof \Throwable) { $this->logger->warning(sprintf( '[%s] getArticles: %s', - self::relayLogLabel($relayUrl), + NostrRelayQuery::relayLogLabel($relayUrl), $value->getMessage() ), ['relay' => $relayUrl]); @@ -2044,14 +1541,14 @@ class NostrClient if (!$hasEvents && !empty($slugs)) { $this->logger->info('No results from theforest, trying default relays'); - $request = $this->newTimedRequest($this->defaultRelaySet, $requestMessage); + $request = $this->relayRequestFactory->createTimedRequest($this->defaultRelaySet, $requestMessage); $response = $request->send(); foreach ($response as $relayUrl => $value) { if ($value instanceof \Throwable) { $this->logger->warning(sprintf( '[%s] getArticles: %s', - self::relayLogLabel($relayUrl), + NostrRelayQuery::relayLogLabel($relayUrl), $value->getMessage() ), ['relay' => $relayUrl]); @@ -2069,7 +1566,7 @@ class NostrClient $msg = (string) ($item->message ?? ''); $this->logger->error(sprintf( '[%s] %s while getting articles: %s', - self::relayLogLabel($relayUrl), + NostrRelayQuery::relayLogLabel($relayUrl), $item->type, $msg !== '' ? $msg : '(no message)' ), ['relay' => $relayUrl, 'response' => $item]); @@ -2078,15 +1575,15 @@ class NostrClient } } } catch (\Exception $e) { - $relaysTried = $this->configuredArticleRelayUrlList(); - $relaysStr = implode(', ', array_map(self::relayLogLabel(...), $relaysTried)); + $relaysTried = $this->relayListFactory->getConfiguredArticleRelayUrlList(); + $relaysStr = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $relaysTried)); $this->logger->error(sprintf('Error querying relays (%s): %s', $relaysStr, $e->getMessage()), [ 'error' => $e->getMessage(), 'relays' => $relaysTried, ]); // Fall back to default relay set - $request = $this->newTimedRequest($this->defaultRelaySet, $requestMessage); + $request = $this->relayRequestFactory->createTimedRequest($this->defaultRelaySet, $requestMessage); $response = $request->send(); foreach ($response as $relayUrl => $value) { @@ -2154,7 +1651,7 @@ class NostrClient } // Ensure we use a RelaySet - $relaySet = $this->createRelaySet($relayList); + $relaySet = $this->relayListFactory->createRelaySetMergedWithArticleList($relayList); // Create subscription and filter $subscription = new Subscription(); @@ -2165,11 +1662,11 @@ class NostrClient $filter->setTag('#d', [$slug]); $requestMessage = new RequestMessage($subscriptionId, [$filter]); $relaysForLog = $this->plannedRelayUrlsForSet($relayList); - $relaysLogStr = implode(', ', array_map(self::relayLogLabel(...), $relaysForLog)); + $relaysLogStr = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $relaysForLog)); try { - $request = $this->newTimedRequest($relaySet, $requestMessage); - $events = $this->processResponse( + $request = $this->relayRequestFactory->createTimedRequest($relaySet, $requestMessage); + $events = $this->nostrRelayQuery->processResponse( $request->send(), static fn (object $event) => $event, ); @@ -2182,8 +1679,8 @@ class NostrClient $this->logger->info('Article not found in author relays, trying default relays', [ 'coordinate' => $coordinate ]); - $request2 = $this->newTimedRequest($this->defaultRelaySet, $requestMessage); - $events2 = $this->processResponse( + $request2 = $this->relayRequestFactory->createTimedRequest($this->defaultRelaySet, $requestMessage); + $events2 = $this->nostrRelayQuery->processResponse( $request2->send(), static fn (object $event) => $event, ); @@ -2208,117 +1705,6 @@ class NostrClient return $articlesMap; } - private function createNostrRequest(array $kinds, array $filters = [], ?RelaySet $relaySet = null): Request - { - $subscription = new Subscription(); - $subscriptionId = $subscription->setId(); - $filter = new Filter(); - $filter->setKinds($kinds); - - foreach ($filters as $key => $value) { - $method = 'set' . ucfirst($key); - if (method_exists($filter, $method)) { - // If it's tags, we need to handle it differently - if ($key === 'tag') { - $filter->setTag($value[0], $value[1]); - } else { - // Call the method with the value - $filter->$method($value); - } - } - } - - $requestMessage = new RequestMessage($subscriptionId, [$filter]); - - return $this->newTimedRequest($relaySet ?? $this->defaultRelaySet, $requestMessage); - } - - private function processResponse(array $response, callable $eventHandler): array - { - $results = []; - foreach ($response as $relayUrl => $relayRes) { - if ($relayRes instanceof \Throwable) { - $this->logger->error(sprintf( - 'Relay error at %s: %s', - self::relayLogLabel($relayUrl), - $relayRes->getMessage() - ), [ - 'relay' => $relayUrl, - 'error' => $relayRes->getMessage(), - ]); - continue; - } - - $itemEstimate = \is_countable($relayRes) ? \count($relayRes) : null; - $this->logger->debug(sprintf('Processing relay response from %s', self::relayLogLabel($relayUrl)), [ - 'relay' => $relayUrl, - 'item_count' => $itemEstimate, - ]); - - foreach ($relayRes as $item) { - try { - if (!is_object($item)) { - $this->logger->warning(sprintf( - 'Invalid response item from %s', - self::relayLogLabel($relayUrl) - ), [ - 'relay' => $relayUrl, - 'item' => $item, - ]); - continue; - } - - switch ($item->type) { - case 'EVENT': - $this->logger->debug(sprintf('Processing event from %s', self::relayLogLabel($relayUrl)), [ - 'relay' => $relayUrl, - 'event_id' => $item->event->id ?? 'unknown', - ]); - $result = $eventHandler($item->event); - if ($result !== null) { - $results[] = $result; - } - break; - case 'AUTH': - $this->logger->warning(sprintf( - 'Relay %s requires authentication', - self::relayLogLabel($relayUrl) - ), [ - 'relay' => $relayUrl, - 'response' => $item, - ]); - break; - case 'ERROR': - case 'NOTICE': - $msg = (string) ($item->message ?? 'No message'); - $this->logger->warning(sprintf( - '[%s] %s: %s', - self::relayLogLabel($relayUrl), - $item->type, - $msg - ), [ - 'relay' => $relayUrl, - 'type' => $item->type, - 'message' => $msg, - ]); - break; - } - } catch (\Exception $e) { - $this->logger->error(sprintf( - 'Error processing event from relay %s: %s', - self::relayLogLabel($relayUrl), - $e->getMessage() - ), [ - 'relay' => $relayUrl, - 'error' => $e->getMessage(), - ]); - continue; // Skip this item but continue processing others - } - } - } - return $results; - } - /** * @param Article $article * @return void @@ -2547,7 +1933,8 @@ class NostrClient if ($byEventId) { // NIP-01: filter by "ids", not "#e" (which matches *tags* named "e"). $kind = isset($data->kind) ? (int) $data->kind : 1; - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [$kind], filters: ['ids' => [$data->id]], relaySet: $this->defaultRelaySet @@ -2563,7 +1950,8 @@ class NostrClient return null; } $kind = (int) ($data->kind ?? KindsEnum::LONGFORM->value); - $request = $this->createNostrRequest( + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, kinds: [$kind], filters: [ 'authors' => [$pubkey], @@ -2573,7 +1961,7 @@ class NostrClient ); } - $events = $this->processResponse($request->send(), function($received) { + $events = $this->nostrRelayQuery->processResponse($request->send(), function($received) { $this->logger->info('Getting event', ['item' => $received]); return $received; @@ -2826,18 +2214,18 @@ class NostrClient */ public function getMagazineIndex(mixed $npub, mixed $dTag): ?PublicationEventEntity { - $urls = $this->configuredArticleRelayUrlList(); - $relaysForLog = implode(', ', array_map(self::relayLogLabel(...), $urls)); + $urls = $this->relayListFactory->getConfiguredArticleRelayUrlList(); + $relaysForLog = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $urls)); $result = $this->queryMagazineIndex($npub, $dTag, $this->defaultRelaySet, $relaysForLog); if ($result !== null) { return $result; } - $profileExtra = $this->profileRelayUrlsExcludedFromArticleRelays(); + $profileExtra = $this->relayListFactory->getProfileRelayUrlsExcludedFromArticleRelays(); if ($profileExtra === []) { return null; } - $pfSet = $this->createRelaySetFromUrlsOnly($profileExtra); - $relaysForLog2 = implode(', ', array_map(self::relayLogLabel(...), $profileExtra)).' (profile_relays)'; + $pfSet = $this->relayListFactory->createRelaySetFromUrlsOnly($profileExtra); + $relaysForLog2 = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $profileExtra)).' (profile_relays)'; return $this->queryMagazineIndex($npub, $dTag, $pfSet, $relaysForLog2); } @@ -2853,10 +2241,11 @@ class NostrClient return null; } - $request = $this->createNostrRequest( - [KindsEnum::PUBLICATION_INDEX], - ['authors' => [(string) $npub], 'tag' => ['#d', [(string) $dTag]]], - $relaySet, + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, + relaySet: $relaySet, + kinds: [KindsEnum::PUBLICATION_INDEX], + filters: ['authors' => [(string) $npub], 'tag' => ['#d', [(string) $dTag]]], ); $this->logger->info(sprintf('Magazine index query (relays: %s)', $relaysForLog), [ 'npub' => $npub, @@ -2864,7 +2253,7 @@ class NostrClient 'relays' => $relaysForLog, ]); $response = $request->send(); - $events = $this->processResponse($response, function ($received) { + $events = $this->nostrRelayQuery->processResponse($response, function ($received) { return $received; }); if (empty($events)) { @@ -2895,7 +2284,7 @@ class NostrClient */ private function tryFetchLongformCoordinateOnProfileRelays(string $coordinate): ?object { - $extra = $this->profileRelayUrlsExcludedFromArticleRelays(); + $extra = $this->relayListFactory->getProfileRelayUrlsExcludedFromArticleRelays(); if ($extra === []) { return null; } @@ -2910,14 +2299,15 @@ class NostrClient if ($kindEnum === null || $pubkey === '' || $slug === '') { return null; } - $pfSet = $this->createRelaySetFromUrlsOnly($extra); + $pfSet = $this->relayListFactory->createRelaySetFromUrlsOnly($extra); try { - $request = $this->createNostrRequest( - [$kindEnum], - ['authors' => [$pubkey], 'tag' => ['#d', [$slug]]], - $pfSet, + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, + relaySet: $pfSet, + kinds: [$kindEnum], + filters: ['authors' => [$pubkey], 'tag' => ['#d', [$slug]]], ); - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $request->send(), static fn (object $event) => $event, ); @@ -2925,12 +2315,13 @@ class NostrClient if ($ev !== null) { return $ev; } - $fallbackReq = $this->createNostrRequest( - [$kindEnum], - ['tag' => ['#d', [$slug]]], - $pfSet, + $fallbackReq = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, + relaySet: $pfSet, + kinds: [$kindEnum], + filters: ['tag' => ['#d', [$slug]]], ); - $fallbackEvents = $this->processResponse( + $fallbackEvents = $this->nostrRelayQuery->processResponse( $fallbackReq->send(), static fn (object $event) => $event, ); @@ -2975,7 +2366,7 @@ class NostrClient return; } - $relaysForLog = implode(', ', array_map(self::relayLogLabel(...), $this->configuredArticleRelayUrlList())); + $relaysForLog = implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $this->relayListFactory->getConfiguredArticleRelayUrlList())); $this->logger->info('[longform_ingest] ingestLongform: start', [ 'address_count' => \count($addresses), 'relays' => $relaysForLog, @@ -3026,13 +2417,13 @@ class NostrClient $dTags ), ]); - $request = $this->createNostrRequest( - [$kindEnum], - ['authors' => [(string) $g['pubkey']], 'tag' => ['#d', $dTags]], - $this->defaultRelaySet, + $request = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, + kinds: [$kindEnum], + filters: ['authors' => [(string) $g['pubkey']], 'tag' => ['#d', $dTags]], ); try { - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $request->send(), static fn (object $event) => $event, ); @@ -3059,12 +2450,12 @@ class NostrClient ]); // Some relays fail to satisfy combined authors+#d filters for parameterized replaceables. // Fallback: query by #d only, then enforce author and d-tag match client-side. - $fallbackReq = $this->createNostrRequest( - [$kindEnum], - ['tag' => ['#d', $dTags]], - $this->defaultRelaySet, + $fallbackReq = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, + kinds: [$kindEnum], + filters: ['tag' => ['#d', $dTags]], ); - $fallbackEvents = $this->processResponse( + $fallbackEvents = $this->nostrRelayQuery->processResponse( $fallbackReq->send(), static fn (object $event) => $event, ); @@ -3096,30 +2487,32 @@ class NostrClient } } if ($rawCount === 0) { - $profileExtra = $this->profileRelayUrlsExcludedFromArticleRelays(); + $profileExtra = $this->relayListFactory->getProfileRelayUrlsExcludedFromArticleRelays(); if ($profileExtra !== []) { - $pfSet = $this->createRelaySetFromUrlsOnly($profileExtra); + $pfSet = $this->relayListFactory->createRelaySetFromUrlsOnly($profileExtra); $this->logger->info('[longform_ingest] ingestLongform: no rows on article relays; trying profile_relays', [ 'group_key' => $gkey, - 'relays' => implode(', ', array_map(self::relayLogLabel(...), $profileExtra)), + 'relays' => implode(', ', array_map(NostrRelayQuery::relayLogLabel(...), $profileExtra)), ]); - $requestPf = $this->createNostrRequest( - [$kindEnum], - ['authors' => [(string) $g['pubkey']], 'tag' => ['#d', $dTags]], - $pfSet, + $requestPf = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, + relaySet: $pfSet, + kinds: [$kindEnum], + filters: ['authors' => [(string) $g['pubkey']], 'tag' => ['#d', $dTags]], ); - $events = $this->processResponse( + $events = $this->nostrRelayQuery->processResponse( $requestPf->send(), static fn (object $event) => $event, ); $rawCount = \count($events); if ($rawCount === 0) { - $fallbackPf = $this->createNostrRequest( - [$kindEnum], - ['tag' => ['#d', $dTags]], - $pfSet, + $fallbackPf = $this->nostrRelayQuery->createNostrRequest( + defaultRelaySet: $this->defaultRelaySet, + relaySet: $pfSet, + kinds: [$kindEnum], + filters: ['tag' => ['#d', $dTags]], ); - $fallbackEventsPf = $this->processResponse( + $fallbackEventsPf = $this->nostrRelayQuery->processResponse( $fallbackPf->send(), static fn (object $event) => $event, ); diff --git a/src/Service/NostrRelayFanoutTransport.php b/src/Service/NostrRelayFanoutTransport.php new file mode 100644 index 0000000..2dbd294 --- /dev/null +++ b/src/Service/NostrRelayFanoutTransport.php @@ -0,0 +1,230 @@ + $relayUrls + * + * @return list + */ + public function capUrlsForSequential(array $relayUrls): array + { + if (\count($relayUrls) <= self::MAX_SEQUENTIAL_RELAY_URLS) { + return $relayUrls; + } + $this->logger->notice('nostr.article_discussion.sequential_relay_cap', [ + 'used' => self::MAX_SEQUENTIAL_RELAY_URLS, + 'had' => \count($relayUrls), + ]); + + return \array_slice($relayUrls, 0, self::MAX_SEQUENTIAL_RELAY_URLS); + } + + /** + * One {@see Request} over all relays in the set (library visits each wss:// in series). + * + * @return array Same shape as {@see Request::send()} + */ + public function sendSequential(RelaySet $relaySet, RequestMessage $requestMessage): array + { + $request = $this->relayRequestFactory->createTimedRequest($relaySet, $requestMessage); + + return $request->send(); + } + + /** + * One short-lived CLI worker per relay URL (parallel WebSocket I/O). + * + * @param list $relayUrls + * + * @return array Same shape as {@see Request::send()} + */ + public function sendParallelWorkers(array $relayUrls, RequestMessage $requestMessage): array + { + $worker = $this->projectDir.'/bin/nostr_relay_request_worker.php'; + $phpBinary = (new PhpExecutableFinder())->find() ?: 'php'; + $timeout = $this->relayRequestFactory->getRelayRequestTimeoutSec() + (int) self::DISCUSSION_WORKER_GRACE_SEC; + $workerTimeoutEnv = ['NOSTR_RELAY_REQUEST_TIMEOUT' => (string) $this->relayRequestFactory->getRelayRequestTimeoutSec()]; + + $rawPayload = serialize($requestMessage); + $tmp = tempnam(sys_get_temp_dir(), 'nrq_'); + if ($tmp === false) { + throw new \RuntimeException('tempnam failed for Nostr discussion payload'); + } + try { + if (file_put_contents($tmp, $rawPayload) === false) { + throw new \RuntimeException('Could not write Nostr discussion temp payload'); + } + + /** @var array $procs */ + $procs = []; + foreach ($relayUrls as $wss) { + if ($wss === '') { + continue; + } + $p = new Process( + [$phpBinary, $worker, $wss, $tmp], + $this->projectDir, + null, + null, + (float) $timeout + ); + $p->start(null, $workerTimeoutEnv); + $procs[$wss] = $p; + } + + $merged = []; + $pending = $procs; + $deadlineAt = microtime(true) + self::DISCUSSION_PARALLEL_SOFT_DEADLINE_SEC; + while ($pending !== []) { + foreach ($pending as $wss => $p) { + if ($p->isRunning()) { + continue; + } + unset($pending[$wss]); + if (!$p->isSuccessful()) { + $err = $p->getErrorOutput(); + $this->logger->warning('nostr.article_discussion.relay_worker_failed', [ + 'relay' => $wss, + 'exit_code' => $p->getExitCode(), + 'stderr' => $err !== '' ? $err : null, + ]); + + continue; + } + $out = trim($p->getOutput()); + if ($out === '') { + continue; + } + $decoded = base64_decode($out, true); + if ($decoded === false || $decoded === '') { + continue; + } + $chunk = unserialize($decoded, ['allowed_classes' => true]); + if (!\is_array($chunk)) { + continue; + } + $merged = array_replace($merged, $chunk); + } + if ($pending === []) { + break; + } + if (microtime(true) >= $deadlineAt) { + foreach ($pending as $wss => $p) { + $this->logger->warning('nostr.article_discussion.relay_worker_soft_timeout', [ + 'relay' => $wss, + 'soft_deadline_sec' => self::DISCUSSION_PARALLEL_SOFT_DEADLINE_SEC, + ]); + $p->stop(0.2); + } + break; + } + usleep(100_000); + } + + return $merged; + } finally { + if (\is_file($tmp)) { + @unlink($tmp); + } + } + } + + /** + * One line per relay after {@see Request::send()}: errors vs message-type counts (EVENT, EOSE, …). + * + * @param array $response + */ + public function logWireResponseSummary(string $context, array $response): void + { + foreach ($response as $relayUrl => $relayRes) { + if ($relayRes instanceof \Throwable) { + $this->logger->warning(sprintf( + 'nostr.wire.relay_throwable [%s]: %s', + NostrRelayQuery::relayLogLabel($relayUrl), + $relayRes->getMessage() + ), [ + 'context' => $context, + 'relay' => $relayUrl, + 'message' => $relayRes->getMessage(), + 'class' => \get_class($relayRes), + ]); + + continue; + } + if (!\is_iterable($relayRes)) { + $this->logger->warning(sprintf( + 'nostr.wire.relay_not_iterable [%s]: %s', + NostrRelayQuery::relayLogLabel($relayUrl), + \get_debug_type($relayRes) + ), [ + 'context' => $context, + 'relay' => $relayUrl, + 'php_type' => \get_debug_type($relayRes), + ]); + + continue; + } + $counts = [ + 'EVENT' => 0, + 'EOSE' => 0, + 'NOTICE' => 0, + 'ERROR' => 0, + 'AUTH' => 0, + 'CLOSED' => 0, + 'other' => 0, + ]; + foreach ($relayRes as $item) { + if (!\is_object($item)) { + ++$counts['other']; + + continue; + } + $t = (string) ($item->type ?? 'other'); + if (\array_key_exists($t, $counts)) { + ++$counts[$t]; + } else { + ++$counts['other']; + } + } + $this->logger->info(sprintf('nostr.wire.relay_messages [%s]', NostrRelayQuery::relayLogLabel($relayUrl)), [ + 'context' => $context, + 'relay' => $relayUrl, + 'counts' => $counts, + ]); + } + } +} diff --git a/src/Service/NostrRelayListFactory.php b/src/Service/NostrRelayListFactory.php new file mode 100644 index 0000000..1a9c2f8 --- /dev/null +++ b/src/Service/NostrRelayListFactory.php @@ -0,0 +1,317 @@ + $articleRelayUrls + * @param list $profileRelayUrls kind-0 / profile; merged for metadata (see {@see getProfileMetadataQueryRelayUrlList()}) + */ + public function __construct( + private string $defaultRelayUrl, + private array $articleRelayUrls, + private array $profileRelayUrls, + private TokenStorageInterface $tokenStorage, + private LoggerInterface $logger, + ) { + } + + public function getDefaultRelayUrl(): string + { + return $this->defaultRelayUrl; + } + + /** + * default_relay + article_relays from config, in order, deduplicated. Used for the static + * default set and as the base when merging author/extra relay URLs in {@see createRelaySetMergedWithArticleList()}. + * + * @return list + */ + public function getConfiguredArticleRelayUrlList(): array + { + $seen = []; + $out = []; + if ($this->defaultRelayUrl !== '') { + $seen[$this->defaultRelayUrl] = true; + $out[] = $this->defaultRelayUrl; + } + foreach ($this->articleRelayUrls as $url) { + if ($url === '' || isset($seen[$url])) { + continue; + } + $seen[$url] = true; + $out[] = $url; + } + if ($out === []) { + $out[] = $this->defaultRelayUrl; + } + + return $out; + } + + public function getDefaultArticleRelaySet(): RelaySet + { + $relaySet = new RelaySet(); + foreach ($this->getConfiguredArticleRelayUrlList() as $url) { + $relaySet->addRelay(new Relay($url)); + } + + return $relaySet; + } + + /** + * Configured profile relays (kind-0 / NIP-05 hints) that are not already in the article relay list. + * Used as a second pass for magazine 30040 and category long-form ingest when article relays return nothing. + * Intentionally excludes merging article URLs again — {@see createRelaySetMergedWithArticleList()} prepends article relays. + * + * @return list + */ + public function getProfileRelayUrlsExcludedFromArticleRelays(): array + { + $article = array_fill_keys($this->getConfiguredArticleRelayUrlList(), true); + $out = []; + foreach ($this->getProfileRelayUrlList() as $u) { + if (!isset($article[$u])) { + $out[] = $u; + } + } + + return $out; + } + + /** + * Relay set built only from the given URLs (no implicit article-relay merge). + */ + public function createRelaySetFromUrlsOnly(array $relayUrls): RelaySet + { + $relaySet = new RelaySet(); + $seen = []; + foreach ($relayUrls as $relayUrl) { + if (!\is_string($relayUrl) || $relayUrl === '' || isset($seen[$relayUrl])) { + continue; + } + $seen[$relayUrl] = true; + $relaySet->addRelay(new Relay($relayUrl)); + } + + return $relaySet; + } + + /** + * Merges all configured article relays (default + article_relays) with the given URLs in order, deduped. + * Used for comment threads, per-author fetches, etc. + */ + public function createRelaySetMergedWithArticleList(array $relayUrls): RelaySet + { + $relaySet = new RelaySet(); + $seen = []; + foreach (array_merge($this->getConfiguredArticleRelayUrlList(), $relayUrls) as $relayUrl) { + if (!\is_string($relayUrl) || $relayUrl === '' || isset($seen[$relayUrl])) { + continue; + } + $seen[$relayUrl] = true; + $relaySet->addRelay(new Relay($relayUrl)); + } + + return $relaySet; + } + + /** + * Suffix to segregate HTTP caches: aggr is only used for some logged-in readers, so results differ. + * + * @return string empty when aggr is not used, else a short token + */ + public function getNostrLandAggrReaderCacheSuffix(): string + { + return $this->loggedInUserHasNostrLandInRelayList() ? 'a1' : ''; + } + + public function loggedInUserHasNostrLandInRelayList(): bool + { + $token = $this->tokenStorage->getToken(); + if ($token === null) { + return false; + } + $user = $token->getUser(); + if (!$user instanceof User) { + return false; + } + + return $this->userRelayListContainsNostrLand($user->getRelays()); + } + + /** + * @param list|array|null $relays + */ + private function userRelayListContainsNostrLand(?array $relays): bool + { + if ($relays === null || $relays === []) { + return false; + } + $target = $this->normalizeWssUrlForNostrLandMatch(self::NOSTR_LAND); + foreach ($relays as $row) { + if (!\is_array($row) || !isset($row[1]) || !\is_string($row[1])) { + continue; + } + if ($this->normalizeWssUrlForNostrLandMatch($row[1]) === $target) { + return true; + } + } + + return false; + } + + private function normalizeWssUrlForNostrLandMatch(string $url): string + { + return rtrim(trim($url), '/'); + } + + /** + * Appends wss://aggr.nostr.land when the current user listed wss://nostr.land (session). + * + * @param list $urls + * + * @return list + */ + public function withAggrNostrLandIfUserSubscribesNostrLand(array $urls): array + { + if (!$this->loggedInUserHasNostrLandInRelayList()) { + return $urls; + } + $seen = array_fill_keys($urls, true); + if (isset($seen[self::AGGR_NOSTR_LAND])) { + return $urls; + } + $this->logger->debug('nostr.relay.append_aggr_nostr_land', [ + 'user_has_nostr_land' => true, + ]); + $out = $urls; + $out[] = self::AGGR_NOSTR_LAND; + + return $out; + } + + /** + * @param list $urls + */ + public function relaySetFromDistinctUrlList(array $urls): RelaySet + { + $relaySet = new RelaySet(); + $seen = []; + foreach ($urls as $relayUrl) { + if ($relayUrl === '' || isset($seen[$relayUrl])) { + continue; + } + $seen[$relayUrl] = true; + $relaySet->addRelay(new Relay($relayUrl)); + } + + return $relaySet; + } + + /** + * @param list $urls + * + * @return list + */ + public function capSequentialRelaysForProfileFetches(array $urls): array + { + if (\count($urls) <= self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS) { + return $urls; + } + $this->logger->notice('nostr.relay_list_capped', [ + 'context' => 'profile_sequential', + 'max' => self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS, + 'had' => \count($urls), + ]); + + return \array_slice($urls, 0, self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS); + } + + /** + * @return list Deduplicated profile relay URLs from config + */ + public function getProfileRelayUrlList(): array + { + $seen = []; + $out = []; + foreach ($this->profileRelayUrls as $url) { + if ($url === '' || isset($seen[$url])) { + continue; + } + if (!str_starts_with($url, 'wss:')) { + continue; + } + $seen[$url] = true; + $out[] = $url; + } + + return $out; + } + + /** + * Profile (kind-0) queries: {@see getProfileRelayUrlList()} first (Damus, nos.lol, …), then default + article set. + * Order matters: {@see \swentel\nostr\Request\Request::send()} walks relays sequentially. + * + * @return list + */ + public function getProfileMetadataQueryRelayUrlList(): array + { + $seen = []; + $ordered = []; + foreach (array_merge($this->getProfileRelayUrlList(), $this->getConfiguredArticleRelayUrlList()) as $u) { + if ($u === '' || isset($seen[$u])) { + continue; + } + $seen[$u] = true; + $ordered[] = $u; + } + if ($ordered === []) { + $ordered[] = $this->defaultRelayUrl; + } + + return $this->withAggrNostrLandIfUserSubscribesNostrLand($ordered); + } + + /** + * Same relays for kind-0 metadata, without mutating the default article relay set from {@see NostrClient}. + */ + public function getRelaySetForProfileMetadataFetch(): RelaySet + { + $relaySet = new RelaySet(); + foreach ($this->getProfileMetadataQueryRelayUrlList() as $url) { + $relaySet->addRelay(new Relay($url)); + } + + return $relaySet; + } +} diff --git a/src/Service/NostrRelayQuery.php b/src/Service/NostrRelayQuery.php new file mode 100644 index 0000000..f8e1c62 --- /dev/null +++ b/src/Service/NostrRelayQuery.php @@ -0,0 +1,165 @@ + $kinds Integers or PHP 8.1 enums backed by int (e.g. {@see \App\Enum\KindsEnum}) + * @param array $filters Filter builder keys (e.g. authors, ids, tag, limit, …) + */ + public function createNostrRequest( + RelaySet $defaultRelaySet, + ?RelaySet $relaySet = null, + array $kinds = [], + array $filters = [], + ): Request { + $subscription = new Subscription(); + $subscriptionId = $subscription->setId(); + $filter = new Filter(); + $kindInts = []; + foreach ($kinds as $k) { + $kindInts[] = $k instanceof \BackedEnum ? (int) $k->value : (int) $k; + } + $filter->setKinds($kindInts); + + foreach ($filters as $key => $value) { + $method = 'set' . ucfirst($key); + if (method_exists($filter, $method)) { + if ($key === 'tag') { + $filter->setTag($value[0], $value[1]); + } else { + $filter->$method($value); + } + } + } + + $requestMessage = new RequestMessage($subscriptionId, [$filter]); + $set = $relaySet ?? $defaultRelaySet; + + return $this->relayRequestFactory->createTimedRequest($set, $requestMessage); + } + + /** + * @param array $response Return value of {@see Request::send()}: relay URL → message list|Throwable + * @return list + */ + public function processResponse(array $response, callable $eventHandler): array + { + $results = []; + foreach ($response as $relayUrl => $relayRes) { + if ($relayRes instanceof \Throwable) { + $this->logger->error(sprintf( + 'Relay error at %s: %s', + self::relayLogLabel($relayUrl), + $relayRes->getMessage() + ), [ + 'relay' => $relayUrl, + 'error' => $relayRes->getMessage(), + ]); + continue; + } + + $itemEstimate = \is_countable($relayRes) ? \count($relayRes) : null; + $this->logger->debug(sprintf('Processing relay response from %s', self::relayLogLabel($relayUrl)), [ + 'relay' => $relayUrl, + 'item_count' => $itemEstimate, + ]); + + foreach ($relayRes as $item) { + try { + if (!\is_object($item)) { + $this->logger->warning(sprintf( + 'Invalid response item from %s', + self::relayLogLabel($relayUrl) + ), [ + 'relay' => $relayUrl, + 'item' => $item, + ]); + continue; + } + + switch ($item->type) { + case 'EVENT': + $this->logger->debug(sprintf('Processing event from %s', self::relayLogLabel($relayUrl)), [ + 'relay' => $relayUrl, + 'event_id' => $item->event->id ?? 'unknown', + ]); + $result = $eventHandler($item->event); + if ($result !== null) { + $results[] = $result; + } + break; + case 'AUTH': + $this->logger->warning(sprintf( + 'Relay %s requires authentication', + self::relayLogLabel($relayUrl) + ), [ + 'relay' => $relayUrl, + 'response' => $item, + ]); + break; + case 'ERROR': + case 'NOTICE': + $msg = (string) ($item->message ?? 'No message'); + $this->logger->warning(sprintf( + '[%s] %s: %s', + self::relayLogLabel($relayUrl), + $item->type, + $msg + ), [ + 'relay' => $relayUrl, + 'type' => $item->type, + 'message' => $msg, + ]); + break; + } + } catch (\Exception $e) { + $this->logger->error(sprintf( + 'Error processing event from relay %s: %s', + self::relayLogLabel($relayUrl), + $e->getMessage() + ), [ + 'relay' => $relayUrl, + 'error' => $e->getMessage(), + ]); + continue; + } + } + } + + return $results; + } +} diff --git a/src/Service/NostrRelayRequestFactory.php b/src/Service/NostrRelayRequestFactory.php new file mode 100644 index 0000000..5466fea --- /dev/null +++ b/src/Service/NostrRelayRequestFactory.php @@ -0,0 +1,49 @@ +relayRequestTimeoutSec; + } + + /** + * {@see Request::setTimeout()} drives per-relay WebSocket I/O for {@see Request::send()}. + */ + public function createTimedRequest(RelaySet $relaySet, RequestMessage $requestMessage): Request + { + $request = new Request($relaySet, $requestMessage); + + return $request->setTimeout($this->relayRequestTimeoutSec); + } + + /** + * For paths that use {@see RelaySet::send()} with a custom message and bypass {@see Request}. + */ + public function applySocketTimeoutToRelaySet(RelaySet $relaySet): void + { + foreach ($relaySet->getRelays() as $relay) { + $client = $relay->getClient(); + if (method_exists($client, 'setTimeout')) { + $client->setTimeout($this->relayRequestTimeoutSec); + } + } + } +} diff --git a/tests/Service/NostrRelayFanoutTransportTest.php b/tests/Service/NostrRelayFanoutTransportTest.php new file mode 100644 index 0000000..fce881c --- /dev/null +++ b/tests/Service/NostrRelayFanoutTransportTest.php @@ -0,0 +1,37 @@ +makeTransport(); + $in = ['wss://a', 'wss://b']; + $this->assertSame($in, $t->capUrlsForSequential($in)); + } + + public function testCapUrlsForSequentialTrimsToThreeRelays(): void + { + $t = $this->makeTransport(); + $in = ['wss://1', 'wss://2', 'wss://3', 'wss://4']; + $out = $t->capUrlsForSequential($in); + $this->assertSame(['wss://1', 'wss://2', 'wss://3'], $out); + } + + private function makeTransport(): NostrRelayFanoutTransport + { + return new NostrRelayFanoutTransport( + new NullLogger(), + new NostrRelayRequestFactory(10), + \sys_get_temp_dir() + ); + } +} diff --git a/tests/Service/NostrRelayListFactoryTest.php b/tests/Service/NostrRelayListFactoryTest.php new file mode 100644 index 0000000..2449bcc --- /dev/null +++ b/tests/Service/NostrRelayListFactoryTest.php @@ -0,0 +1,35 @@ +createMock(TokenStorageInterface::class); + $tokenStorage->method('getToken')->willReturn(null); + $f = new NostrRelayListFactory( + 'wss://main', + ['wss://extra', 'wss://main', 'wss://extra'], + ['wss://profile'], + $tokenStorage, + new NullLogger() + ); + $this->assertSame(['wss://main', 'wss://extra'], $f->getConfiguredArticleRelayUrlList()); + } + + public function testGetDefaultRelayUrl(): void + { + $ts = $this->createMock(TokenStorageInterface::class); + $ts->method('getToken')->willReturn(null); + $f = new NostrRelayListFactory('wss://d', [], [], $ts, new NullLogger()); + $this->assertSame('wss://d', $f->getDefaultRelayUrl()); + } +} diff --git a/tests/Service/NostrRelayQueryTest.php b/tests/Service/NostrRelayQueryTest.php new file mode 100644 index 0000000..ac35878 --- /dev/null +++ b/tests/Service/NostrRelayQueryTest.php @@ -0,0 +1,38 @@ +assertSame( + 'relay.example.com', + NostrRelayQuery::relayLogLabel('wss://relay.example.com/nostr') + ); + } + + public function testCreateNostrRequestAcceptsBackedEnumKinds(): void + { + $factory = new NostrRelayRequestFactory(12); + $q = new NostrRelayQuery(new NullLogger(), $factory); + $set = new RelaySet(); + $set->addRelay(new Relay('wss://127.0.0.1:0')); + $req = $q->createNostrRequest( + defaultRelaySet: $set, + kinds: [KindsEnum::METADATA], + filters: [], + ); + $this->assertInstanceOf(\swentel\nostr\Request\Request::class, $req); + } +} diff --git a/tests/Service/NostrRelayRequestFactoryTest.php b/tests/Service/NostrRelayRequestFactoryTest.php new file mode 100644 index 0000000..9791474 --- /dev/null +++ b/tests/Service/NostrRelayRequestFactoryTest.php @@ -0,0 +1,33 @@ +assertSame(21, $f->getRelayRequestTimeoutSec()); + } + + public function testCreateTimedRequestReturnsWiredRequest(): void + { + $f = new NostrRelayRequestFactory(12); + $sub = new Subscription(); + $msg = new RequestMessage($sub->getId(), []); + $set = new RelaySet(); + $set->addRelay(new Relay('wss://127.0.0.1:0')); + + $req = $f->createTimedRequest($set, $msg); + $this->assertInstanceOf(\swentel\nostr\Request\Request::class, $req); + } +}