11 changed files with 1098 additions and 800 deletions
@ -0,0 +1,230 @@
@@ -0,0 +1,230 @@
|
||||
<?php |
||||
|
||||
declare(strict_types=1); |
||||
|
||||
namespace App\Service; |
||||
|
||||
use Psr\Log\LoggerInterface; |
||||
use swentel\nostr\Message\RequestMessage; |
||||
use swentel\nostr\Relay\RelaySet; |
||||
use Symfony\Component\Process\PhpExecutableFinder; |
||||
use Symfony\Component\Process\Process; |
||||
|
||||
/** |
||||
* Multi-relay REQ fan-out: one in-process sequential {@see Request::send()} vs. one CLI worker per wss |
||||
* ({@see bin/nostr_relay_request_worker.php}). Used for article discussion and kind-9802 highlight fetches. |
||||
*/ |
||||
final readonly class NostrRelayFanoutTransport |
||||
{ |
||||
/** Extra wall time for {@see bin/nostr_relay_request_worker.php} vs. WebSocket timeout. */ |
||||
private const DISCUSSION_WORKER_GRACE_SEC = 5.0; |
||||
|
||||
/** Soft wall-time before stopping still-running parallel workers. */ |
||||
private const DISCUSSION_PARALLEL_SOFT_DEADLINE_SEC = 3.5; |
||||
|
||||
/** |
||||
* {@see Request::send()} visits relays sequentially; cap how many wss URLs we chain in one process |
||||
* so HTTP /fragment/comments do not hit long proxy timeouts. |
||||
*/ |
||||
private const MAX_SEQUENTIAL_RELAY_URLS = 3; |
||||
|
||||
public function __construct( |
||||
private LoggerInterface $logger, |
||||
private NostrRelayRequestFactory $relayRequestFactory, |
||||
private string $projectDir, |
||||
) { |
||||
} |
||||
|
||||
/** |
||||
* @param list<string> $relayUrls |
||||
* |
||||
* @return list<string> |
||||
*/ |
||||
public function capUrlsForSequential(array $relayUrls): array |
||||
{ |
||||
if (\count($relayUrls) <= self::MAX_SEQUENTIAL_RELAY_URLS) { |
||||
return $relayUrls; |
||||
} |
||||
$this->logger->notice('nostr.article_discussion.sequential_relay_cap', [ |
||||
'used' => self::MAX_SEQUENTIAL_RELAY_URLS, |
||||
'had' => \count($relayUrls), |
||||
]); |
||||
|
||||
return \array_slice($relayUrls, 0, self::MAX_SEQUENTIAL_RELAY_URLS); |
||||
} |
||||
|
||||
/** |
||||
* One {@see Request} over all relays in the set (library visits each wss:// in series). |
||||
* |
||||
* @return array<string, mixed> Same shape as {@see Request::send()} |
||||
*/ |
||||
public function sendSequential(RelaySet $relaySet, RequestMessage $requestMessage): array |
||||
{ |
||||
$request = $this->relayRequestFactory->createTimedRequest($relaySet, $requestMessage); |
||||
|
||||
return $request->send(); |
||||
} |
||||
|
||||
/** |
||||
* One short-lived CLI worker per relay URL (parallel WebSocket I/O). |
||||
* |
||||
* @param list<string> $relayUrls |
||||
* |
||||
* @return array<string, mixed> Same shape as {@see Request::send()} |
||||
*/ |
||||
public function sendParallelWorkers(array $relayUrls, RequestMessage $requestMessage): array |
||||
{ |
||||
$worker = $this->projectDir.'/bin/nostr_relay_request_worker.php'; |
||||
$phpBinary = (new PhpExecutableFinder())->find() ?: 'php'; |
||||
$timeout = $this->relayRequestFactory->getRelayRequestTimeoutSec() + (int) self::DISCUSSION_WORKER_GRACE_SEC; |
||||
$workerTimeoutEnv = ['NOSTR_RELAY_REQUEST_TIMEOUT' => (string) $this->relayRequestFactory->getRelayRequestTimeoutSec()]; |
||||
|
||||
$rawPayload = serialize($requestMessage); |
||||
$tmp = tempnam(sys_get_temp_dir(), 'nrq_'); |
||||
if ($tmp === false) { |
||||
throw new \RuntimeException('tempnam failed for Nostr discussion payload'); |
||||
} |
||||
try { |
||||
if (file_put_contents($tmp, $rawPayload) === false) { |
||||
throw new \RuntimeException('Could not write Nostr discussion temp payload'); |
||||
} |
||||
|
||||
/** @var array<string, Process> $procs */ |
||||
$procs = []; |
||||
foreach ($relayUrls as $wss) { |
||||
if ($wss === '') { |
||||
continue; |
||||
} |
||||
$p = new Process( |
||||
[$phpBinary, $worker, $wss, $tmp], |
||||
$this->projectDir, |
||||
null, |
||||
null, |
||||
(float) $timeout |
||||
); |
||||
$p->start(null, $workerTimeoutEnv); |
||||
$procs[$wss] = $p; |
||||
} |
||||
|
||||
$merged = []; |
||||
$pending = $procs; |
||||
$deadlineAt = microtime(true) + self::DISCUSSION_PARALLEL_SOFT_DEADLINE_SEC; |
||||
while ($pending !== []) { |
||||
foreach ($pending as $wss => $p) { |
||||
if ($p->isRunning()) { |
||||
continue; |
||||
} |
||||
unset($pending[$wss]); |
||||
if (!$p->isSuccessful()) { |
||||
$err = $p->getErrorOutput(); |
||||
$this->logger->warning('nostr.article_discussion.relay_worker_failed', [ |
||||
'relay' => $wss, |
||||
'exit_code' => $p->getExitCode(), |
||||
'stderr' => $err !== '' ? $err : null, |
||||
]); |
||||
|
||||
continue; |
||||
} |
||||
$out = trim($p->getOutput()); |
||||
if ($out === '') { |
||||
continue; |
||||
} |
||||
$decoded = base64_decode($out, true); |
||||
if ($decoded === false || $decoded === '') { |
||||
continue; |
||||
} |
||||
$chunk = unserialize($decoded, ['allowed_classes' => true]); |
||||
if (!\is_array($chunk)) { |
||||
continue; |
||||
} |
||||
$merged = array_replace($merged, $chunk); |
||||
} |
||||
if ($pending === []) { |
||||
break; |
||||
} |
||||
if (microtime(true) >= $deadlineAt) { |
||||
foreach ($pending as $wss => $p) { |
||||
$this->logger->warning('nostr.article_discussion.relay_worker_soft_timeout', [ |
||||
'relay' => $wss, |
||||
'soft_deadline_sec' => self::DISCUSSION_PARALLEL_SOFT_DEADLINE_SEC, |
||||
]); |
||||
$p->stop(0.2); |
||||
} |
||||
break; |
||||
} |
||||
usleep(100_000); |
||||
} |
||||
|
||||
return $merged; |
||||
} finally { |
||||
if (\is_file($tmp)) { |
||||
@unlink($tmp); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* One line per relay after {@see Request::send()}: errors vs message-type counts (EVENT, EOSE, …). |
||||
* |
||||
* @param array<string, mixed> $response |
||||
*/ |
||||
public function logWireResponseSummary(string $context, array $response): void |
||||
{ |
||||
foreach ($response as $relayUrl => $relayRes) { |
||||
if ($relayRes instanceof \Throwable) { |
||||
$this->logger->warning(sprintf( |
||||
'nostr.wire.relay_throwable [%s]: %s', |
||||
NostrRelayQuery::relayLogLabel($relayUrl), |
||||
$relayRes->getMessage() |
||||
), [ |
||||
'context' => $context, |
||||
'relay' => $relayUrl, |
||||
'message' => $relayRes->getMessage(), |
||||
'class' => \get_class($relayRes), |
||||
]); |
||||
|
||||
continue; |
||||
} |
||||
if (!\is_iterable($relayRes)) { |
||||
$this->logger->warning(sprintf( |
||||
'nostr.wire.relay_not_iterable [%s]: %s', |
||||
NostrRelayQuery::relayLogLabel($relayUrl), |
||||
\get_debug_type($relayRes) |
||||
), [ |
||||
'context' => $context, |
||||
'relay' => $relayUrl, |
||||
'php_type' => \get_debug_type($relayRes), |
||||
]); |
||||
|
||||
continue; |
||||
} |
||||
$counts = [ |
||||
'EVENT' => 0, |
||||
'EOSE' => 0, |
||||
'NOTICE' => 0, |
||||
'ERROR' => 0, |
||||
'AUTH' => 0, |
||||
'CLOSED' => 0, |
||||
'other' => 0, |
||||
]; |
||||
foreach ($relayRes as $item) { |
||||
if (!\is_object($item)) { |
||||
++$counts['other']; |
||||
|
||||
continue; |
||||
} |
||||
$t = (string) ($item->type ?? 'other'); |
||||
if (\array_key_exists($t, $counts)) { |
||||
++$counts[$t]; |
||||
} else { |
||||
++$counts['other']; |
||||
} |
||||
} |
||||
$this->logger->info(sprintf('nostr.wire.relay_messages [%s]', NostrRelayQuery::relayLogLabel($relayUrl)), [ |
||||
'context' => $context, |
||||
'relay' => $relayUrl, |
||||
'counts' => $counts, |
||||
]); |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,317 @@
@@ -0,0 +1,317 @@
|
||||
<?php |
||||
|
||||
declare(strict_types=1); |
||||
|
||||
namespace App\Service; |
||||
|
||||
use App\Entity\User; |
||||
use Psr\Log\LoggerInterface; |
||||
use swentel\nostr\Relay\Relay; |
||||
use swentel\nostr\Relay\RelaySet; |
||||
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; |
||||
|
||||
/** |
||||
* Config-driven relay URL lists and {@link RelaySet} construction: default + article + profile URLs, |
||||
* profile fetch ordering, sequential cap for slow in-process {@see \swentel\nostr\Request\Request::send()}, |
||||
* and Nostr Land → aggr.nostr.land for logged-in readers who list the former. |
||||
*/ |
||||
final readonly class NostrRelayListFactory |
||||
{ |
||||
/** When a logged-in user lists this relay, also use {@see self::AGGR_NOSTR_LAND} for comment + profile reads. */ |
||||
private const NOSTR_LAND = 'wss://nostr.land'; |
||||
|
||||
/** |
||||
* Aggregated / subscription relay (not for anonymous visitors). Only added when the session user |
||||
* has {@see self::NOSTR_LAND} in their NIP-65-style relay list. |
||||
*/ |
||||
private const AGGR_NOSTR_LAND = 'wss://aggr.nostr.land'; |
||||
|
||||
/** |
||||
* {@see \swentel\nostr\Request\Request::send()} hits relays sequentially; profile pages (metadata, long-form list, 10133) used |
||||
* the full default+article+profile list (~8–9 wss) → 2 slow relays can exceed PHP’s 30s default max_execution_time. |
||||
*/ |
||||
private const MAX_PROFILE_SEQUENTIAL_RELAY_URLS = 3; |
||||
|
||||
/** |
||||
* @param list<string> $articleRelayUrls |
||||
* @param list<string> $profileRelayUrls kind-0 / profile; merged for metadata (see {@see getProfileMetadataQueryRelayUrlList()}) |
||||
*/ |
||||
public function __construct( |
||||
private string $defaultRelayUrl, |
||||
private array $articleRelayUrls, |
||||
private array $profileRelayUrls, |
||||
private TokenStorageInterface $tokenStorage, |
||||
private LoggerInterface $logger, |
||||
) { |
||||
} |
||||
|
||||
public function getDefaultRelayUrl(): string |
||||
{ |
||||
return $this->defaultRelayUrl; |
||||
} |
||||
|
||||
/** |
||||
* default_relay + article_relays from config, in order, deduplicated. Used for the static |
||||
* default set and as the base when merging author/extra relay URLs in {@see createRelaySetMergedWithArticleList()}. |
||||
* |
||||
* @return list<string> |
||||
*/ |
||||
public function getConfiguredArticleRelayUrlList(): array |
||||
{ |
||||
$seen = []; |
||||
$out = []; |
||||
if ($this->defaultRelayUrl !== '') { |
||||
$seen[$this->defaultRelayUrl] = true; |
||||
$out[] = $this->defaultRelayUrl; |
||||
} |
||||
foreach ($this->articleRelayUrls as $url) { |
||||
if ($url === '' || isset($seen[$url])) { |
||||
continue; |
||||
} |
||||
$seen[$url] = true; |
||||
$out[] = $url; |
||||
} |
||||
if ($out === []) { |
||||
$out[] = $this->defaultRelayUrl; |
||||
} |
||||
|
||||
return $out; |
||||
} |
||||
|
||||
public function getDefaultArticleRelaySet(): RelaySet |
||||
{ |
||||
$relaySet = new RelaySet(); |
||||
foreach ($this->getConfiguredArticleRelayUrlList() as $url) { |
||||
$relaySet->addRelay(new Relay($url)); |
||||
} |
||||
|
||||
return $relaySet; |
||||
} |
||||
|
||||
/** |
||||
* Configured profile relays (kind-0 / NIP-05 hints) that are not already in the article relay list. |
||||
* Used as a second pass for magazine 30040 and category long-form ingest when article relays return nothing. |
||||
* Intentionally excludes merging article URLs again — {@see createRelaySetMergedWithArticleList()} prepends article relays. |
||||
* |
||||
* @return list<string> |
||||
*/ |
||||
public function getProfileRelayUrlsExcludedFromArticleRelays(): array |
||||
{ |
||||
$article = array_fill_keys($this->getConfiguredArticleRelayUrlList(), true); |
||||
$out = []; |
||||
foreach ($this->getProfileRelayUrlList() as $u) { |
||||
if (!isset($article[$u])) { |
||||
$out[] = $u; |
||||
} |
||||
} |
||||
|
||||
return $out; |
||||
} |
||||
|
||||
/** |
||||
* Relay set built only from the given URLs (no implicit article-relay merge). |
||||
*/ |
||||
public function createRelaySetFromUrlsOnly(array $relayUrls): RelaySet |
||||
{ |
||||
$relaySet = new RelaySet(); |
||||
$seen = []; |
||||
foreach ($relayUrls as $relayUrl) { |
||||
if (!\is_string($relayUrl) || $relayUrl === '' || isset($seen[$relayUrl])) { |
||||
continue; |
||||
} |
||||
$seen[$relayUrl] = true; |
||||
$relaySet->addRelay(new Relay($relayUrl)); |
||||
} |
||||
|
||||
return $relaySet; |
||||
} |
||||
|
||||
/** |
||||
* Merges all configured article relays (default + article_relays) with the given URLs in order, deduped. |
||||
* Used for comment threads, per-author fetches, etc. |
||||
*/ |
||||
public function createRelaySetMergedWithArticleList(array $relayUrls): RelaySet |
||||
{ |
||||
$relaySet = new RelaySet(); |
||||
$seen = []; |
||||
foreach (array_merge($this->getConfiguredArticleRelayUrlList(), $relayUrls) as $relayUrl) { |
||||
if (!\is_string($relayUrl) || $relayUrl === '' || isset($seen[$relayUrl])) { |
||||
continue; |
||||
} |
||||
$seen[$relayUrl] = true; |
||||
$relaySet->addRelay(new Relay($relayUrl)); |
||||
} |
||||
|
||||
return $relaySet; |
||||
} |
||||
|
||||
/** |
||||
* Suffix to segregate HTTP caches: aggr is only used for some logged-in readers, so results differ. |
||||
* |
||||
* @return string empty when aggr is not used, else a short token |
||||
*/ |
||||
public function getNostrLandAggrReaderCacheSuffix(): string |
||||
{ |
||||
return $this->loggedInUserHasNostrLandInRelayList() ? 'a1' : ''; |
||||
} |
||||
|
||||
public function loggedInUserHasNostrLandInRelayList(): bool |
||||
{ |
||||
$token = $this->tokenStorage->getToken(); |
||||
if ($token === null) { |
||||
return false; |
||||
} |
||||
$user = $token->getUser(); |
||||
if (!$user instanceof User) { |
||||
return false; |
||||
} |
||||
|
||||
return $this->userRelayListContainsNostrLand($user->getRelays()); |
||||
} |
||||
|
||||
/** |
||||
* @param list<array{0?: string, 1?: string, 2?: string}>|array<array-key, mixed>|null $relays |
||||
*/ |
||||
private function userRelayListContainsNostrLand(?array $relays): bool |
||||
{ |
||||
if ($relays === null || $relays === []) { |
||||
return false; |
||||
} |
||||
$target = $this->normalizeWssUrlForNostrLandMatch(self::NOSTR_LAND); |
||||
foreach ($relays as $row) { |
||||
if (!\is_array($row) || !isset($row[1]) || !\is_string($row[1])) { |
||||
continue; |
||||
} |
||||
if ($this->normalizeWssUrlForNostrLandMatch($row[1]) === $target) { |
||||
return true; |
||||
} |
||||
} |
||||
|
||||
return false; |
||||
} |
||||
|
||||
private function normalizeWssUrlForNostrLandMatch(string $url): string |
||||
{ |
||||
return rtrim(trim($url), '/'); |
||||
} |
||||
|
||||
/** |
||||
* Appends wss://aggr.nostr.land when the current user listed wss://nostr.land (session). |
||||
* |
||||
* @param list<string> $urls |
||||
* |
||||
* @return list<string> |
||||
*/ |
||||
public function withAggrNostrLandIfUserSubscribesNostrLand(array $urls): array |
||||
{ |
||||
if (!$this->loggedInUserHasNostrLandInRelayList()) { |
||||
return $urls; |
||||
} |
||||
$seen = array_fill_keys($urls, true); |
||||
if (isset($seen[self::AGGR_NOSTR_LAND])) { |
||||
return $urls; |
||||
} |
||||
$this->logger->debug('nostr.relay.append_aggr_nostr_land', [ |
||||
'user_has_nostr_land' => true, |
||||
]); |
||||
$out = $urls; |
||||
$out[] = self::AGGR_NOSTR_LAND; |
||||
|
||||
return $out; |
||||
} |
||||
|
||||
/** |
||||
* @param list<string> $urls |
||||
*/ |
||||
public function relaySetFromDistinctUrlList(array $urls): RelaySet |
||||
{ |
||||
$relaySet = new RelaySet(); |
||||
$seen = []; |
||||
foreach ($urls as $relayUrl) { |
||||
if ($relayUrl === '' || isset($seen[$relayUrl])) { |
||||
continue; |
||||
} |
||||
$seen[$relayUrl] = true; |
||||
$relaySet->addRelay(new Relay($relayUrl)); |
||||
} |
||||
|
||||
return $relaySet; |
||||
} |
||||
|
||||
/** |
||||
* @param list<string> $urls |
||||
* |
||||
* @return list<string> |
||||
*/ |
||||
public function capSequentialRelaysForProfileFetches(array $urls): array |
||||
{ |
||||
if (\count($urls) <= self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS) { |
||||
return $urls; |
||||
} |
||||
$this->logger->notice('nostr.relay_list_capped', [ |
||||
'context' => 'profile_sequential', |
||||
'max' => self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS, |
||||
'had' => \count($urls), |
||||
]); |
||||
|
||||
return \array_slice($urls, 0, self::MAX_PROFILE_SEQUENTIAL_RELAY_URLS); |
||||
} |
||||
|
||||
/** |
||||
* @return list<string> Deduplicated profile relay URLs from config |
||||
*/ |
||||
public function getProfileRelayUrlList(): array |
||||
{ |
||||
$seen = []; |
||||
$out = []; |
||||
foreach ($this->profileRelayUrls as $url) { |
||||
if ($url === '' || isset($seen[$url])) { |
||||
continue; |
||||
} |
||||
if (!str_starts_with($url, 'wss:')) { |
||||
continue; |
||||
} |
||||
$seen[$url] = true; |
||||
$out[] = $url; |
||||
} |
||||
|
||||
return $out; |
||||
} |
||||
|
||||
/** |
||||
* Profile (kind-0) queries: {@see getProfileRelayUrlList()} first (Damus, nos.lol, …), then default + article set. |
||||
* Order matters: {@see \swentel\nostr\Request\Request::send()} walks relays sequentially. |
||||
* |
||||
* @return list<string> |
||||
*/ |
||||
public function getProfileMetadataQueryRelayUrlList(): array |
||||
{ |
||||
$seen = []; |
||||
$ordered = []; |
||||
foreach (array_merge($this->getProfileRelayUrlList(), $this->getConfiguredArticleRelayUrlList()) as $u) { |
||||
if ($u === '' || isset($seen[$u])) { |
||||
continue; |
||||
} |
||||
$seen[$u] = true; |
||||
$ordered[] = $u; |
||||
} |
||||
if ($ordered === []) { |
||||
$ordered[] = $this->defaultRelayUrl; |
||||
} |
||||
|
||||
return $this->withAggrNostrLandIfUserSubscribesNostrLand($ordered); |
||||
} |
||||
|
||||
/** |
||||
* Same relays for kind-0 metadata, without mutating the default article relay set from {@see NostrClient}. |
||||
*/ |
||||
public function getRelaySetForProfileMetadataFetch(): RelaySet |
||||
{ |
||||
$relaySet = new RelaySet(); |
||||
foreach ($this->getProfileMetadataQueryRelayUrlList() as $url) { |
||||
$relaySet->addRelay(new Relay($url)); |
||||
} |
||||
|
||||
return $relaySet; |
||||
} |
||||
} |
||||
@ -0,0 +1,165 @@
@@ -0,0 +1,165 @@
|
||||
<?php |
||||
|
||||
declare(strict_types=1); |
||||
|
||||
namespace App\Service; |
||||
|
||||
use Psr\Log\LoggerInterface; |
||||
use swentel\nostr\Filter\Filter; |
||||
use swentel\nostr\Message\RequestMessage; |
||||
use swentel\nostr\Relay\RelaySet; |
||||
use swentel\nostr\Request\Request; |
||||
use swentel\nostr\Subscription\Subscription; |
||||
|
||||
/** |
||||
* NIP-01 style REQ construction and per-relay response iteration (EVENT / ERROR / …). |
||||
* Extracted from {@see NostrClient} for reuse; logging stays on this service. |
||||
*/ |
||||
final readonly class NostrRelayQuery |
||||
{ |
||||
public function __construct( |
||||
private LoggerInterface $logger, |
||||
private NostrRelayRequestFactory $relayRequestFactory, |
||||
) { |
||||
} |
||||
|
||||
/** |
||||
* Short host/URL for logs (e.g. fragment comments / prewarm) without full wss:// noise. |
||||
*/ |
||||
public static function relayLogLabel(string $relayUrl): string |
||||
{ |
||||
$host = parse_url($relayUrl, \PHP_URL_HOST); |
||||
if (\is_string($host) && $host !== '') { |
||||
return $host; |
||||
} |
||||
|
||||
return $relayUrl; |
||||
} |
||||
|
||||
/** |
||||
* @param list<int|\BackedEnum> $kinds Integers or PHP 8.1 enums backed by int (e.g. {@see \App\Enum\KindsEnum}) |
||||
* @param array<string, mixed> $filters Filter builder keys (e.g. authors, ids, tag, limit, …) |
||||
*/ |
||||
public function createNostrRequest( |
||||
RelaySet $defaultRelaySet, |
||||
?RelaySet $relaySet = null, |
||||
array $kinds = [], |
||||
array $filters = [], |
||||
): Request { |
||||
$subscription = new Subscription(); |
||||
$subscriptionId = $subscription->setId(); |
||||
$filter = new Filter(); |
||||
$kindInts = []; |
||||
foreach ($kinds as $k) { |
||||
$kindInts[] = $k instanceof \BackedEnum ? (int) $k->value : (int) $k; |
||||
} |
||||
$filter->setKinds($kindInts); |
||||
|
||||
foreach ($filters as $key => $value) { |
||||
$method = 'set' . ucfirst($key); |
||||
if (method_exists($filter, $method)) { |
||||
if ($key === 'tag') { |
||||
$filter->setTag($value[0], $value[1]); |
||||
} else { |
||||
$filter->$method($value); |
||||
} |
||||
} |
||||
} |
||||
|
||||
$requestMessage = new RequestMessage($subscriptionId, [$filter]); |
||||
$set = $relaySet ?? $defaultRelaySet; |
||||
|
||||
return $this->relayRequestFactory->createTimedRequest($set, $requestMessage); |
||||
} |
||||
|
||||
/** |
||||
* @param array<string, mixed> $response Return value of {@see Request::send()}: relay URL → message list|Throwable |
||||
* @return list<mixed> |
||||
*/ |
||||
public function processResponse(array $response, callable $eventHandler): array |
||||
{ |
||||
$results = []; |
||||
foreach ($response as $relayUrl => $relayRes) { |
||||
if ($relayRes instanceof \Throwable) { |
||||
$this->logger->error(sprintf( |
||||
'Relay error at %s: %s', |
||||
self::relayLogLabel($relayUrl), |
||||
$relayRes->getMessage() |
||||
), [ |
||||
'relay' => $relayUrl, |
||||
'error' => $relayRes->getMessage(), |
||||
]); |
||||
continue; |
||||
} |
||||
|
||||
$itemEstimate = \is_countable($relayRes) ? \count($relayRes) : null; |
||||
$this->logger->debug(sprintf('Processing relay response from %s', self::relayLogLabel($relayUrl)), [ |
||||
'relay' => $relayUrl, |
||||
'item_count' => $itemEstimate, |
||||
]); |
||||
|
||||
foreach ($relayRes as $item) { |
||||
try { |
||||
if (!\is_object($item)) { |
||||
$this->logger->warning(sprintf( |
||||
'Invalid response item from %s', |
||||
self::relayLogLabel($relayUrl) |
||||
), [ |
||||
'relay' => $relayUrl, |
||||
'item' => $item, |
||||
]); |
||||
continue; |
||||
} |
||||
|
||||
switch ($item->type) { |
||||
case 'EVENT': |
||||
$this->logger->debug(sprintf('Processing event from %s', self::relayLogLabel($relayUrl)), [ |
||||
'relay' => $relayUrl, |
||||
'event_id' => $item->event->id ?? 'unknown', |
||||
]); |
||||
$result = $eventHandler($item->event); |
||||
if ($result !== null) { |
||||
$results[] = $result; |
||||
} |
||||
break; |
||||
case 'AUTH': |
||||
$this->logger->warning(sprintf( |
||||
'Relay %s requires authentication', |
||||
self::relayLogLabel($relayUrl) |
||||
), [ |
||||
'relay' => $relayUrl, |
||||
'response' => $item, |
||||
]); |
||||
break; |
||||
case 'ERROR': |
||||
case 'NOTICE': |
||||
$msg = (string) ($item->message ?? 'No message'); |
||||
$this->logger->warning(sprintf( |
||||
'[%s] %s: %s', |
||||
self::relayLogLabel($relayUrl), |
||||
$item->type, |
||||
$msg |
||||
), [ |
||||
'relay' => $relayUrl, |
||||
'type' => $item->type, |
||||
'message' => $msg, |
||||
]); |
||||
break; |
||||
} |
||||
} catch (\Exception $e) { |
||||
$this->logger->error(sprintf( |
||||
'Error processing event from relay %s: %s', |
||||
self::relayLogLabel($relayUrl), |
||||
$e->getMessage() |
||||
), [ |
||||
'relay' => $relayUrl, |
||||
'error' => $e->getMessage(), |
||||
]); |
||||
continue; |
||||
} |
||||
} |
||||
} |
||||
|
||||
return $results; |
||||
} |
||||
} |
||||
@ -0,0 +1,49 @@
@@ -0,0 +1,49 @@
|
||||
<?php |
||||
|
||||
declare(strict_types=1); |
||||
|
||||
namespace App\Service; |
||||
|
||||
use swentel\nostr\Message\RequestMessage; |
||||
use swentel\nostr\Relay\RelaySet; |
||||
use swentel\nostr\Request\Request; |
||||
|
||||
/** |
||||
* Builds swentel {@see Request} instances with per-relay I/O timeout (config: `nostr_relay_request_timeout_sec`). |
||||
* Shared by {@see NostrClient} so request wiring stays in one place. |
||||
*/ |
||||
final readonly class NostrRelayRequestFactory |
||||
{ |
||||
public function __construct( |
||||
private int $relayRequestTimeoutSec = 12, |
||||
) { |
||||
} |
||||
|
||||
public function getRelayRequestTimeoutSec(): int |
||||
{ |
||||
return $this->relayRequestTimeoutSec; |
||||
} |
||||
|
||||
/** |
||||
* {@see Request::setTimeout()} drives per-relay WebSocket I/O for {@see Request::send()}. |
||||
*/ |
||||
public function createTimedRequest(RelaySet $relaySet, RequestMessage $requestMessage): Request |
||||
{ |
||||
$request = new Request($relaySet, $requestMessage); |
||||
|
||||
return $request->setTimeout($this->relayRequestTimeoutSec); |
||||
} |
||||
|
||||
/** |
||||
* For paths that use {@see RelaySet::send()} with a custom message and bypass {@see Request}. |
||||
*/ |
||||
public function applySocketTimeoutToRelaySet(RelaySet $relaySet): void |
||||
{ |
||||
foreach ($relaySet->getRelays() as $relay) { |
||||
$client = $relay->getClient(); |
||||
if (method_exists($client, 'setTimeout')) { |
||||
$client->setTimeout($this->relayRequestTimeoutSec); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,37 @@
@@ -0,0 +1,37 @@
|
||||
<?php |
||||
|
||||
declare(strict_types=1); |
||||
|
||||
namespace App\Tests\Service; |
||||
|
||||
use App\Service\NostrRelayFanoutTransport; |
||||
use App\Service\NostrRelayRequestFactory; |
||||
use PHPUnit\Framework\TestCase; |
||||
use Psr\Log\NullLogger; |
||||
|
||||
final class NostrRelayFanoutTransportTest extends TestCase |
||||
{ |
||||
public function testCapUrlsForSequentialLeavesShortListsUnchanged(): void |
||||
{ |
||||
$t = $this->makeTransport(); |
||||
$in = ['wss://a', 'wss://b']; |
||||
$this->assertSame($in, $t->capUrlsForSequential($in)); |
||||
} |
||||
|
||||
public function testCapUrlsForSequentialTrimsToThreeRelays(): void |
||||
{ |
||||
$t = $this->makeTransport(); |
||||
$in = ['wss://1', 'wss://2', 'wss://3', 'wss://4']; |
||||
$out = $t->capUrlsForSequential($in); |
||||
$this->assertSame(['wss://1', 'wss://2', 'wss://3'], $out); |
||||
} |
||||
|
||||
private function makeTransport(): NostrRelayFanoutTransport |
||||
{ |
||||
return new NostrRelayFanoutTransport( |
||||
new NullLogger(), |
||||
new NostrRelayRequestFactory(10), |
||||
\sys_get_temp_dir() |
||||
); |
||||
} |
||||
} |
||||
@ -0,0 +1,35 @@
@@ -0,0 +1,35 @@
|
||||
<?php |
||||
|
||||
declare(strict_types=1); |
||||
|
||||
namespace App\Tests\Service; |
||||
|
||||
use App\Service\NostrRelayListFactory; |
||||
use PHPUnit\Framework\TestCase; |
||||
use Psr\Log\NullLogger; |
||||
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; |
||||
|
||||
final class NostrRelayListFactoryTest extends TestCase |
||||
{ |
||||
public function testGetConfiguredArticleRelayUrlListDeduplicatesAndPreservesOrder(): void |
||||
{ |
||||
$tokenStorage = $this->createMock(TokenStorageInterface::class); |
||||
$tokenStorage->method('getToken')->willReturn(null); |
||||
$f = new NostrRelayListFactory( |
||||
'wss://main', |
||||
['wss://extra', 'wss://main', 'wss://extra'], |
||||
['wss://profile'], |
||||
$tokenStorage, |
||||
new NullLogger() |
||||
); |
||||
$this->assertSame(['wss://main', 'wss://extra'], $f->getConfiguredArticleRelayUrlList()); |
||||
} |
||||
|
||||
public function testGetDefaultRelayUrl(): void |
||||
{ |
||||
$ts = $this->createMock(TokenStorageInterface::class); |
||||
$ts->method('getToken')->willReturn(null); |
||||
$f = new NostrRelayListFactory('wss://d', [], [], $ts, new NullLogger()); |
||||
$this->assertSame('wss://d', $f->getDefaultRelayUrl()); |
||||
} |
||||
} |
||||
@ -0,0 +1,38 @@
@@ -0,0 +1,38 @@
|
||||
<?php |
||||
|
||||
declare(strict_types=1); |
||||
|
||||
namespace App\Tests\Service; |
||||
|
||||
use App\Enum\KindsEnum; |
||||
use App\Service\NostrRelayRequestFactory; |
||||
use App\Service\NostrRelayQuery; |
||||
use PHPUnit\Framework\TestCase; |
||||
use Psr\Log\NullLogger; |
||||
use swentel\nostr\Relay\Relay; |
||||
use swentel\nostr\Relay\RelaySet; |
||||
|
||||
final class NostrRelayQueryTest extends TestCase |
||||
{ |
||||
public function testRelayLogLabelUsesHost(): void |
||||
{ |
||||
$this->assertSame( |
||||
'relay.example.com', |
||||
NostrRelayQuery::relayLogLabel('wss://relay.example.com/nostr') |
||||
); |
||||
} |
||||
|
||||
public function testCreateNostrRequestAcceptsBackedEnumKinds(): void |
||||
{ |
||||
$factory = new NostrRelayRequestFactory(12); |
||||
$q = new NostrRelayQuery(new NullLogger(), $factory); |
||||
$set = new RelaySet(); |
||||
$set->addRelay(new Relay('wss://127.0.0.1:0')); |
||||
$req = $q->createNostrRequest( |
||||
defaultRelaySet: $set, |
||||
kinds: [KindsEnum::METADATA], |
||||
filters: [], |
||||
); |
||||
$this->assertInstanceOf(\swentel\nostr\Request\Request::class, $req); |
||||
} |
||||
} |
||||
@ -0,0 +1,33 @@
@@ -0,0 +1,33 @@
|
||||
<?php |
||||
|
||||
declare(strict_types=1); |
||||
|
||||
namespace App\Tests\Service; |
||||
|
||||
use App\Service\NostrRelayRequestFactory; |
||||
use PHPUnit\Framework\TestCase; |
||||
use swentel\nostr\Message\RequestMessage; |
||||
use swentel\nostr\Relay\Relay; |
||||
use swentel\nostr\Relay\RelaySet; |
||||
use swentel\nostr\Subscription\Subscription; |
||||
|
||||
final class NostrRelayRequestFactoryTest extends TestCase |
||||
{ |
||||
public function testExposesConfiguredTimeout(): void |
||||
{ |
||||
$f = new NostrRelayRequestFactory(21); |
||||
$this->assertSame(21, $f->getRelayRequestTimeoutSec()); |
||||
} |
||||
|
||||
public function testCreateTimedRequestReturnsWiredRequest(): void |
||||
{ |
||||
$f = new NostrRelayRequestFactory(12); |
||||
$sub = new Subscription(); |
||||
$msg = new RequestMessage($sub->getId(), []); |
||||
$set = new RelaySet(); |
||||
$set->addRelay(new Relay('wss://127.0.0.1:0')); |
||||
|
||||
$req = $f->createTimedRequest($set, $msg); |
||||
$this->assertInstanceOf(\swentel\nostr\Request\Request::class, $req); |
||||
} |
||||
} |
||||
Loading…
Reference in new issue