From 2d011a05533c4780ce96e9bb2b387ad5773742b0 Mon Sep 17 00:00:00 2001 From: Silberengel Date: Thu, 23 Apr 2026 12:06:54 +0200 Subject: [PATCH] bug-fixes outboxes create parallel relay subprocesses --- .../controllers/comment_reply_controller.js | 1 + bin/nostr_relay_request_worker.php | 60 +++++ composer.json | 1 + config/services.yaml | 1 + src/Command/PrewarmCommand.php | 15 +- src/Controller/ArticleController.php | 6 +- src/Controller/SeoController.php | 21 +- src/Service/CommentReplyService.php | 22 +- src/Service/MagazineContentService.php | 55 +++++ src/Service/NostrClient.php | 233 ++++++++++++++++-- 10 files changed, 362 insertions(+), 53 deletions(-) create mode 100644 bin/nostr_relay_request_worker.php diff --git a/assets/controllers/comment_reply_controller.js b/assets/controllers/comment_reply_controller.js index 24856f0..6b4a885 100644 --- a/assets/controllers/comment_reply_controller.js +++ b/assets/controllers/comment_reply_controller.js @@ -74,6 +74,7 @@ export default class extends Controller { expected_coordinate: this.expectedCoordinateValue, parent_kind: parseInt(String(this.parentKindValue), 10), parent_id: this.parentIdValue, + parent_author_pubkey: this.authorPubkeyValue, article_event_id: this.articleEventIdValue || null, csrf: this.csrfValue, }; diff --git a/bin/nostr_relay_request_worker.php b/bin/nostr_relay_request_worker.php new file mode 100644 index 0000000..334e947 --- /dev/null +++ b/bin/nostr_relay_request_worker.php @@ -0,0 +1,60 @@ + + */ +declare(strict_types=1); + +if (PHP_SAPI !== 'cli') { + fwrite(STDERR, "CLI only\n"); + exit(1); +} +if ($argc < 3) { + fwrite(STDERR, "Usage: nostr_relay_request_worker.php \n"); + exit(1); +} + +$relayUrl = $argv[1]; +$path = $argv[2]; +if (!\is_file($path) || !\is_readable($path)) { + fwrite(STDERR, "Payload file missing or unreadable\n"); + exit(1); +} + +$root = \dirname(__DIR__); +require $root.'/vendor/autoload.php'; + +$raw = file_get_contents($path); +if ($raw === false || $raw === '') { + exit(1); +} + +try { + /** @var mixed $msg */ + $msg = unserialize($raw, ['allowed_classes' => true]); +} catch (\Throwable $e) { + fwrite(STDERR, 'unserialize: '.$e->getMessage()."\n"); + exit(1); +} + +if (!\is_object($msg) || !($msg instanceof \swentel\nostr\Message\RequestMessage)) { + fwrite(STDERR, "Invalid request message\n"); + exit(1); +} + +$relaySet = new \swentel\nostr\Relay\RelaySet(); +$relaySet->addRelay(new \swentel\nostr\Relay\Relay($relayUrl)); +$request = new \swentel\nostr\Request\Request($relaySet, $msg); +if (method_exists($request, 'setTimeout')) { + $request->setTimeout(15); +} + +try { + $out = $request->send(); +} catch (\Throwable $e) { + fwrite(STDERR, $e->getMessage()."\n"); + exit(1); +} + +fwrite(STDOUT, base64_encode(serialize($out))); diff --git a/composer.json b/composer.json index 924fc5b..1983ebd 100644 --- a/composer.json +++ b/composer.json @@ -35,6 +35,7 @@ "symfony/intl": "7.1.*", "symfony/property-access": "7.1.*", "symfony/property-info": "7.1.*", + "symfony/process": "7.1.*", "symfony/runtime": "7.1.*", "symfony/security-bundle": "7.1.*", "symfony/serializer": "7.1.*", diff --git a/config/services.yaml b/config/services.yaml index 5678955..0eb5071 100644 --- a/config/services.yaml +++ b/config/services.yaml @@ -36,6 +36,7 @@ services: $defaultRelayUrl: '%default_relay%' $articleRelayUrls: '%article_relays%' $profileRelayUrls: '%profile_relays%' + $projectDir: '%kernel.project_dir%' App\Twig\FooterLinksExtension: arguments: $footerLinksPath: '%footer_links%' diff --git a/src/Command/PrewarmCommand.php b/src/Command/PrewarmCommand.php index 23a144c..62f8e2b 100644 --- a/src/Command/PrewarmCommand.php +++ b/src/Command/PrewarmCommand.php @@ -8,6 +8,7 @@ use App\Entity\Article; use App\Repository\ArticleRepository; use App\Service\ArticleCommentThreadLoader; use App\Service\CacheService; +use App\Service\MagazineContentService; use App\Service\MagazineRefresher; use App\Service\Nip09DeletionApplier; use App\Service\NostrClient; @@ -38,6 +39,7 @@ final class PrewarmCommand extends Command private readonly CacheService $cacheService, private readonly NostrClient $nostrClient, private readonly ArticleRepository $articleRepository, + private readonly MagazineContentService $magazineContent, private readonly ArticleCommentThreadLoader $commentThreadLoader, private readonly ParameterBagInterface $params, private readonly LoggerInterface $logger, @@ -56,7 +58,7 @@ final class PrewarmCommand extends Command ->addOption('magazine-budget', null, InputOption::VALUE_REQUIRED, 'Seconds wall time for magazine relay refresh', '30') ->addOption('metadata-limit', null, InputOption::VALUE_REQUIRED, 'Max distinct author pubkeys to warm (0 = all)', '0') ->addOption('metadata-batch', null, InputOption::VALUE_REQUIRED, 'Kind-0 metadata: pubkeys per Nostr REQ (batched)', '50') - ->addOption('comments-max', null, InputOption::VALUE_REQUIRED, 'Newest N articles to warm comment cache for (0 = all, order: createdAt DESC)', '10') + ->addOption('comments-max', null, InputOption::VALUE_REQUIRED, 'Newest N magazine category articles to warm comment cache for (0 = all, order: createdAt DESC; excludes generic /articles feed-only rows)', '10') ->addOption('comments-budget', null, InputOption::VALUE_REQUIRED, 'Wall-clock seconds for the whole comments phase (Nostr fetches are slow; a single long thread can exceed a short budget; use 1200+ if prewarming many articles)', '600'); } @@ -257,16 +259,11 @@ final class PrewarmCommand extends Command $commentBudgetSeconds = max(1, (int) $input->getOption('comments-budget')); $commentPhaseStart = microtime(true); $deadline = $commentPhaseStart + $commentBudgetSeconds; - $qb = $this->articleRepository->createQueryBuilder('a') - ->where('a.slug IS NOT NULL') - ->andWhere("a.slug != ''") - ->andWhere('a.pubkey IS NOT NULL') - ->andWhere("a.pubkey != ''") - ->orderBy('a.createdAt', 'DESC'); + $magazineList = $this->magazineContent->getAllMagazineCategoryArticlesForSyndication(); if ($maxArticles > 0) { - $qb->setMaxResults($maxArticles); + $magazineList = \array_slice($magazineList, 0, $maxArticles); } - $articles = $qb->getQuery()->getResult(); + $articles = $magazineList; $articleCount = \count($articles); $w = 0; if ($articleCount === 0) { diff --git a/src/Controller/ArticleController.php b/src/Controller/ArticleController.php index 9bc7599..0b63952 100644 --- a/src/Controller/ArticleController.php +++ b/src/Controller/ArticleController.php @@ -33,8 +33,10 @@ class ArticleController extends AbstractController #[Route('/fragment/comments', name: 'article_comments_fragment', methods: ['GET'])] public function commentsFragment(Request $request, ArticleCommentThreadLoader $loader, LoggerInterface $logger): Response { - // Article body may raise the global limit; keep this sub-request bounded so relay I/O cannot hit max_execution_time (500). - set_time_limit(45); + // {@see NostrClient::getArticleDiscussion} runs per-relay work in parallel CLI workers; allow headroom + // for all processes + Symfony (45s was too low and caused an uncatchable max-execution fatal → HTTP 500). + @set_time_limit(120); + @ini_set('max_execution_time', '120'); $t0 = microtime(true); $coordinate = $request->query->getString('coordinate'); diff --git a/src/Controller/SeoController.php b/src/Controller/SeoController.php index b525c00..658b287 100644 --- a/src/Controller/SeoController.php +++ b/src/Controller/SeoController.php @@ -94,24 +94,7 @@ final class SeoController extends AbstractController public function feedMagazine(Request $request): Response { $site = (string) $this->params->get('name'); - $articles = $this->articleRepository->findPublishedForSyndication(8000); - $bySlug = $this->dedupeArticlesByLatestRevision($articles); - $list = \array_values($bySlug); - usort($list, static function (Article $a, Article $b): int { - $ca = $a->getCreatedAt(); - $cb = $b->getCreatedAt(); - if ($ca === null && $cb === null) { - return 0; - } - if ($ca === null) { - return 1; - } - if ($cb === null) { - return -1; - } - - return $cb <=> $ca; - }); + $list = $this->magazineContent->getAllMagazineCategoryArticlesForSyndication(); $list = \array_slice($list, 0, self::FEED_MAX_ITEMS); $feedUrl = $this->absoluteUrlForRoute('feed_magazine'); $homeUrl = $this->absoluteUrlForRoute('home'); @@ -119,7 +102,7 @@ final class SeoController extends AbstractController $updated = $this->newestArticleUpdate($list); $body = $this->buildAtomFeed( - $site.': all articles', + $site.': all categories', (string) $this->params->get('description'), $selfId, $feedUrl, diff --git a/src/Service/CommentReplyService.php b/src/Service/CommentReplyService.php index 9867fac..96fab56 100644 --- a/src/Service/CommentReplyService.php +++ b/src/Service/CommentReplyService.php @@ -92,7 +92,27 @@ final readonly class CommentReplyService return ['ok' => false, 'error' => 'Reply must start with a quote line (>) linking the parent via nostr:nevent1 / naddr1 (reply blurb)', 'code' => 400]; } - $relays = $this->nostrClient->getArticleWriteRelayUrls(); + $rawParentAuthor = isset($payload['parent_author_pubkey']) && \is_string($payload['parent_author_pubkey']) + ? strtolower(trim($payload['parent_author_pubkey'])) + : ''; + $clientParentOk = 64 === \strlen($rawParentAuthor) && ctype_xdigit($rawParentAuthor); + $coordBits = explode(':', $expectedCoordinate, 3); + $articleAuthor = \count($coordBits) >= 2 ? strtolower((string) $coordBits[1]) : ''; + $articleAuthorOk = 64 === \strlen($articleAuthor) && ctype_xdigit($articleAuthor); + + if ((int) $parentKind === KindsEnum::COMMENTS->value) { + if (!$clientParentOk) { + return ['ok' => false, 'error' => 'parent_author_pubkey (64 hex) is required when replying to a comment', 'code' => 400]; + } + $parentAuthorHex = $rawParentAuthor; + } else { + $parentAuthorHex = $clientParentOk ? $rawParentAuthor : $articleAuthor; + if (!$clientParentOk && !$articleAuthorOk) { + return ['ok' => false, 'error' => 'Invalid article coordinate; cannot determine author relays', 'code' => 400]; + } + } + + $relays = $this->nostrClient->getRelayUrlsForCommentPublish($expectedCoordinate, $parentAuthorHex); $result = $this->nostrClient->publishEvent($wire, $relays); $this->logger->info('comment_reply.published', [ 'id' => $wire->getId(), diff --git a/src/Service/MagazineContentService.php b/src/Service/MagazineContentService.php index 2a9daf4..d7f160c 100644 --- a/src/Service/MagazineContentService.php +++ b/src/Service/MagazineContentService.php @@ -6,6 +6,7 @@ namespace App\Service; use App\Entity\Article; use App\Entity\Event; +use App\Enum\EventStatusEnum; use App\Repository\ArticleRepository; use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface; @@ -222,4 +223,58 @@ final class MagazineContentService 'category' => $category, ]; } + + /** + * Union of every article referenced by a category index (root 30040). Use this for magazine-wide + * Atom and comment prewarm so "newest" tracks the magazine, not the generic community list. + * + * Dedupes by slug (newest {@see Article::getCreatedAt} wins). Only PUBLISHED/ARCHIVED rows. + * + * @return list
Newest first + */ + public function getAllMagazineCategoryArticlesForSyndication(): array + { + $bySlug = []; + foreach ($this->getCategorySlugsFromStore() as $catSlug) { + $data = $this->getCategoryPageData($catSlug); + foreach ($data['list'] as $article) { + $s = $article->getEventStatus(); + if ($s === null || ($s !== EventStatusEnum::PUBLISHED && $s !== EventStatusEnum::ARCHIVED)) { + continue; + } + $slug = \trim((string) $article->getSlug()); + if ($slug === '') { + continue; + } + $c = $article->getCreatedAt(); + if (!isset($bySlug[$slug])) { + $bySlug[$slug] = $article; + + continue; + } + $prev = $bySlug[$slug]->getCreatedAt(); + if ($c !== null && (null === $prev || $c > $prev)) { + $bySlug[$slug] = $article; + } + } + } + $list = \array_values($bySlug); + usort($list, static function (Article $a, Article $b): int { + $ca = $a->getCreatedAt(); + $cb = $b->getCreatedAt(); + if ($ca === null && $cb === null) { + return 0; + } + if ($ca === null) { + return 1; + } + if ($cb === null) { + return -1; + } + + return $cb <=> $ca; + }); + + return $list; + } } diff --git a/src/Service/NostrClient.php b/src/Service/NostrClient.php index 6f88c23..4467d52 100644 --- a/src/Service/NostrClient.php +++ b/src/Service/NostrClient.php @@ -19,6 +19,8 @@ use swentel\nostr\Relay\Relay; use swentel\nostr\Relay\RelaySet; use swentel\nostr\Request\Request; use swentel\nostr\Subscription\Subscription; +use Symfony\Component\Process\PhpExecutableFinder; +use Symfony\Component\Process\Process; use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface; use Symfony\Contracts\Cache\CacheInterface; use Symfony\Contracts\Cache\ItemInterface; @@ -28,6 +30,9 @@ class NostrClient /** Per-relay WebSocket I/O cap (seconds), applied on each relay’s {@see \WebSocket\Client}. */ private const RELAY_REQUEST_TIMEOUT_SEC = 15; + /** Extra wall time for {@see bin/nostr_relay_request_worker.php} process vs. WebSocket timeout. */ + private const DISCUSSION_WORKER_GRACE_SEC = 5.0; + /** When a logged-in user lists this relay, also use {@see self::AGGR_NOSTR_LAND} for comment + profile reads. */ private const NOSTR_LAND = 'wss://nostr.land'; @@ -53,6 +58,7 @@ class NostrClient private readonly array $articleRelayUrls, private readonly array $profileRelayUrls, private readonly CacheInterface $relayQueryCache, + private readonly string $projectDir, ) { $this->defaultRelaySet = $this->buildArticleRelaySet(); } @@ -67,6 +73,51 @@ class NostrClient return $this->configuredArticleRelayUrlList(); } + /** + * Relays to publish a kind-1111 reply: site defaults plus NIP-65 (kind-10002) for the + * article author and, when the direct parent is another pubkey (nested comment), that + * author’s relays as well. + * + * @param string $articleCoordinate kind:pubkey:identifier + * @param string $parentEventAuthorHex 64-char hex of the event being replied to + * + * @return list + */ + public function getRelayUrlsForCommentPublish(string $articleCoordinate, string $parentEventAuthorHex): array + { + $base = $this->configuredArticleRelayUrlList(); + $parts = explode(':', $articleCoordinate, 3); + $articlePk = \count($parts) >= 2 ? strtolower((string) $parts[1]) : ''; + if (64 !== \strlen($articlePk) || !ctype_xdigit($articlePk)) { + $articlePk = ''; + } + $parentPk = strtolower(trim($parentEventAuthorHex)); + if (64 !== \strlen($parentPk) || !ctype_xdigit($parentPk)) { + $parentPk = ''; + } + $pubkeys = []; + if ($articlePk !== '') { + $pubkeys[] = $articlePk; + } + if ($parentPk !== '' && $parentPk !== $articlePk) { + $pubkeys[] = $parentPk; + } + + $seen = array_fill_keys($base, true); + $out = $base; + foreach ($pubkeys as $pk) { + foreach ($this->getAuthorNip65RelaysList($pk) as $wss) { + if (!\is_string($wss) || $wss === '' || isset($seen[$wss])) { + continue; + } + $seen[$wss] = true; + $out[] = $wss; + } + } + + return $out; + } + /** * default_relay + article_relays from config, in order, deduplicated. Used for the static * default set and as the base when merging author/extra relay URLs in {@see createRelaySet()}. @@ -265,40 +316,67 @@ class NostrClient } /** - * Get top 3 reputable relays from an author's relay list (cached; avoids a kind-10002 round trip per page view). + * Full NIP-65 (kind-10002) wss:// list for a hex pubkey, cached. Used for comment fetches; prefer + * {@see getTopReputableRelaysForAuthor} when you only need a few relays. + * + * @return list */ - private function getTopReputableRelaysForAuthor(string $pubkey, int $limit = 3): array + private function getAuthorNip65RelaysList(string $pubkey): array { - $cacheKey = 'nostr_author_relays_'.hash('sha256', $pubkey); + $cacheKey = 'nostr_kind10002_relays_v1_'.hash('sha256', $pubkey); - return $this->relayQueryCache->get($cacheKey, function (ItemInterface $item) use ($pubkey, $limit): array { + return $this->relayQueryCache->get($cacheKey, function (ItemInterface $item) use ($pubkey): array { $item->expiresAfter(3600); try { $authorRelays = $this->getNpubRelays($pubkey); } catch (\Exception $e) { - $this->logger->error('Error getting author relays', [ + $this->logger->error('Error getting author NIP-65 relay list', [ 'pubkey' => $pubkey, 'error' => $e->getMessage(), ]); $authorRelays = []; } + $authorRelays = array_values(array_filter( + is_array($authorRelays) ? $authorRelays : [], + static function ($relay): bool { + return \is_string($relay) + && str_starts_with($relay, 'wss:') + && !str_contains($relay, 'localhost'); + } + )); if ($authorRelays === []) { - return [$this->defaultRelayUrl]; + return []; } - - $authorRelays = array_filter($authorRelays, static function ($relay): bool { - return \is_string($relay) - && str_starts_with($relay, 'wss:') - && !str_contains($relay, 'localhost'); - }); - if ($authorRelays === []) { - return [$this->defaultRelayUrl]; + $seen = []; + $out = []; + foreach ($authorRelays as $u) { + if (isset($seen[$u])) { + continue; + } + $seen[$u] = true; + $out[] = $u; } - return array_values(array_slice($authorRelays, 0, $limit)); + return $out; }); } + /** + * A short prefix of the author NIP-65 list (or default relay) for queries that do not need every home relay. + */ + private function getTopReputableRelaysForAuthor(string $pubkey, int $limit = 3): array + { + $all = $this->getAuthorNip65RelaysList($pubkey); + if ($all === []) { + return [$this->defaultRelayUrl]; + } + if ($limit < 1) { + $limit = 1; + } + + return \array_values(\array_slice($all, 0, $limit)); + } + /** * @return list Deduplicated profile relay URLs from config */ @@ -883,23 +961,22 @@ class NostrClient $pubkey = $parts[1]; $tRelays = microtime(true); - $authorRelays = $this->getTopReputableRelaysForAuthor($pubkey, 1); + $authorRelays = $this->getAuthorNip65RelaysList($pubkey); $this->logger->info('nostr.article_discussion.author_relays_ready', [ 'elapsed_ms' => (int) round((microtime(true) - $tRelays) * 1000), - 'author_relays' => $authorRelays, + 'author_relay_count' => \count($authorRelays), ]); + $baseForDiscussion = $this->configuredArticleRelayUrlList(); $mergedForDiscussion = $this->withAggrNostrLandIfUserSubscribesNostrLand( - array_merge($this->configuredArticleRelayUrlList(), $authorRelays) + array_merge($baseForDiscussion, $authorRelays) ); - $relaySet = $this->relaySetFromDistinctUrlList($mergedForDiscussion); - $plannedRelayUrls = $mergedForDiscussion; + $plannedRelayUrls = array_values(array_unique($mergedForDiscussion, \SORT_REGULAR)); $filters = $this->createArticleDiscussionFilters($coordinate, $rootEventHexId); $subscription = new Subscription(); $subscriptionId = $subscription->setId(); $requestMessage = new RequestMessage($subscriptionId, $filters); - $request = $this->newTimedRequest($relaySet, $requestMessage); $this->logger->info('nostr.article_discussion.req_sending', [ 'subscription_id' => $subscriptionId, @@ -911,7 +988,20 @@ class NostrClient $byId = []; try { $tSend = microtime(true); - $response = $request->send(); + $workerPath = $this->projectDir.'/bin/nostr_relay_request_worker.php'; + if (!\is_file($workerPath) || \count($plannedRelayUrls) <= 1) { + $response = $this->sendArticleDiscussionToRelaysSequential($plannedRelayUrls, $requestMessage); + } else { + try { + $response = $this->sendArticleDiscussionToRelaysParallel($plannedRelayUrls, $requestMessage); + } catch (\Throwable $e) { + $this->logger->warning('nostr.article_discussion.parallel_failed', [ + 'message' => $e->getMessage(), + 'exception_class' => \get_class($e), + ]); + $response = $this->sendArticleDiscussionToRelaysSequential($plannedRelayUrls, $requestMessage); + } + } $sendMs = (int) round((microtime(true) - $tSend) * 1000); $this->logger->info('nostr.article_discussion.req_response_envelope', [ 'elapsed_ms' => $sendMs, @@ -994,6 +1084,105 @@ class NostrClient return ['thread' => $thread, 'quotes' => $quotes]; } + /** + * One {@see Request} over all relays (library visits each wss:// in series). + * + * @param list $relayUrls + * + * @return array Same shape as {@see Request::send()} (relay url → message list) + */ + private function sendArticleDiscussionToRelaysSequential(array $relayUrls, RequestMessage $requestMessage): array + { + $relaySet = $this->relaySetFromDistinctUrlList($relayUrls); + $request = $this->newTimedRequest($relaySet, $requestMessage); + + return $request->send(); + } + + /** + * One short-lived CLI worker per relay so WebSocket I/O is parallel (vs. a single in-process + * {@see Request::send() loop). + * + * @param list $relayUrls + * + * @return array Same shape as {@see Request::send()} + */ + private function sendArticleDiscussionToRelaysParallel(array $relayUrls, RequestMessage $requestMessage): array + { + $worker = $this->projectDir.'/bin/nostr_relay_request_worker.php'; + $phpBinary = (new PhpExecutableFinder())->find() ?: 'php'; + $timeout = self::RELAY_REQUEST_TIMEOUT_SEC + (int) self::DISCUSSION_WORKER_GRACE_SEC; + + $rawPayload = serialize($requestMessage); + $tmp = tempnam(sys_get_temp_dir(), 'nrq_'); + if ($tmp === false) { + throw new \RuntimeException('tempnam failed for Nostr discussion payload'); + } + try { + if (file_put_contents($tmp, $rawPayload) === false) { + throw new \RuntimeException('Could not write Nostr discussion temp payload'); + } + + /** @var array $procs */ + $procs = []; + foreach ($relayUrls as $wss) { + if (!\is_string($wss) || $wss === '') { + continue; + } + $p = new Process( + [$phpBinary, $worker, $wss, $tmp], + $this->projectDir, + null, + null, + (float) $timeout + ); + $p->start(); + $procs[$wss] = $p; + } + + $merged = []; + foreach ($procs as $wss => $p) { + $p->wait(); + if (!$p->isSuccessful()) { + $err = $p->getErrorOutput(); + $this->logger->warning('nostr.article_discussion.relay_worker_failed', [ + 'relay' => $wss, + 'exit_code' => $p->getExitCode(), + 'stderr' => $err !== '' ? $err : null, + ]); + $merged[$wss] = []; + + continue; + } + $out = trim($p->getOutput()); + if ($out === '') { + $merged[$wss] = []; + + continue; + } + $decoded = base64_decode($out, true); + if ($decoded === false || $decoded === '') { + $merged[$wss] = []; + + continue; + } + $chunk = unserialize($decoded, ['allowed_classes' => true]); + if (!\is_array($chunk)) { + $merged[$wss] = []; + + continue; + } + $merged = array_replace($merged, $chunk); + } + + return $merged; + } finally { + if (\is_file($tmp)) { + @unlink($tmp); + } + } + } + /** * Same merge/dedupe rules as {@see createRelaySet()} — used only for logging planned relay URLs. *