Browse Source

bug-fixes

outboxes
create parallel relay subprocesses
imwald
Silberengel 1 week ago
parent
commit
2d011a0553
  1. 1
      assets/controllers/comment_reply_controller.js
  2. 60
      bin/nostr_relay_request_worker.php
  3. 1
      composer.json
  4. 1
      config/services.yaml
  5. 15
      src/Command/PrewarmCommand.php
  6. 6
      src/Controller/ArticleController.php
  7. 21
      src/Controller/SeoController.php
  8. 22
      src/Service/CommentReplyService.php
  9. 55
      src/Service/MagazineContentService.php
  10. 229
      src/Service/NostrClient.php

1
assets/controllers/comment_reply_controller.js

@ -74,6 +74,7 @@ export default class extends Controller { @@ -74,6 +74,7 @@ export default class extends Controller {
expected_coordinate: this.expectedCoordinateValue,
parent_kind: parseInt(String(this.parentKindValue), 10),
parent_id: this.parentIdValue,
parent_author_pubkey: this.authorPubkeyValue,
article_event_id: this.articleEventIdValue || null,
csrf: this.csrfValue,
};

60
bin/nostr_relay_request_worker.php

@ -0,0 +1,60 @@ @@ -0,0 +1,60 @@
<?php
/**
* One isolated Nostr REQ/EOSE over a single relay (used for parallel article discussion).
* Invoked as: php bin/nostr_relay_request_worker.php <wss-url> <file-with-serialized-RequestMessage>
*/
declare(strict_types=1);
if (PHP_SAPI !== 'cli') {
fwrite(STDERR, "CLI only\n");
exit(1);
}
if ($argc < 3) {
fwrite(STDERR, "Usage: nostr_relay_request_worker.php <wss-url> <serialized-request-file>\n");
exit(1);
}
$relayUrl = $argv[1];
$path = $argv[2];
if (!\is_file($path) || !\is_readable($path)) {
fwrite(STDERR, "Payload file missing or unreadable\n");
exit(1);
}
$root = \dirname(__DIR__);
require $root.'/vendor/autoload.php';
$raw = file_get_contents($path);
if ($raw === false || $raw === '') {
exit(1);
}
try {
/** @var mixed $msg */
$msg = unserialize($raw, ['allowed_classes' => true]);
} catch (\Throwable $e) {
fwrite(STDERR, 'unserialize: '.$e->getMessage()."\n");
exit(1);
}
if (!\is_object($msg) || !($msg instanceof \swentel\nostr\Message\RequestMessage)) {
fwrite(STDERR, "Invalid request message\n");
exit(1);
}
$relaySet = new \swentel\nostr\Relay\RelaySet();
$relaySet->addRelay(new \swentel\nostr\Relay\Relay($relayUrl));
$request = new \swentel\nostr\Request\Request($relaySet, $msg);
if (method_exists($request, 'setTimeout')) {
$request->setTimeout(15);
}
try {
$out = $request->send();
} catch (\Throwable $e) {
fwrite(STDERR, $e->getMessage()."\n");
exit(1);
}
fwrite(STDOUT, base64_encode(serialize($out)));

1
composer.json

@ -35,6 +35,7 @@ @@ -35,6 +35,7 @@
"symfony/intl": "7.1.*",
"symfony/property-access": "7.1.*",
"symfony/property-info": "7.1.*",
"symfony/process": "7.1.*",
"symfony/runtime": "7.1.*",
"symfony/security-bundle": "7.1.*",
"symfony/serializer": "7.1.*",

1
config/services.yaml

@ -36,6 +36,7 @@ services: @@ -36,6 +36,7 @@ services:
$defaultRelayUrl: '%default_relay%'
$articleRelayUrls: '%article_relays%'
$profileRelayUrls: '%profile_relays%'
$projectDir: '%kernel.project_dir%'
App\Twig\FooterLinksExtension:
arguments:
$footerLinksPath: '%footer_links%'

15
src/Command/PrewarmCommand.php

@ -8,6 +8,7 @@ use App\Entity\Article; @@ -8,6 +8,7 @@ use App\Entity\Article;
use App\Repository\ArticleRepository;
use App\Service\ArticleCommentThreadLoader;
use App\Service\CacheService;
use App\Service\MagazineContentService;
use App\Service\MagazineRefresher;
use App\Service\Nip09DeletionApplier;
use App\Service\NostrClient;
@ -38,6 +39,7 @@ final class PrewarmCommand extends Command @@ -38,6 +39,7 @@ final class PrewarmCommand extends Command
private readonly CacheService $cacheService,
private readonly NostrClient $nostrClient,
private readonly ArticleRepository $articleRepository,
private readonly MagazineContentService $magazineContent,
private readonly ArticleCommentThreadLoader $commentThreadLoader,
private readonly ParameterBagInterface $params,
private readonly LoggerInterface $logger,
@ -56,7 +58,7 @@ final class PrewarmCommand extends Command @@ -56,7 +58,7 @@ final class PrewarmCommand extends Command
->addOption('magazine-budget', null, InputOption::VALUE_REQUIRED, 'Seconds wall time for magazine relay refresh', '30')
->addOption('metadata-limit', null, InputOption::VALUE_REQUIRED, 'Max distinct author pubkeys to warm (0 = all)', '0')
->addOption('metadata-batch', null, InputOption::VALUE_REQUIRED, 'Kind-0 metadata: pubkeys per Nostr REQ (batched)', '50')
->addOption('comments-max', null, InputOption::VALUE_REQUIRED, 'Newest N articles to warm comment cache for (0 = all, order: createdAt DESC)', '10')
->addOption('comments-max', null, InputOption::VALUE_REQUIRED, 'Newest N magazine category articles to warm comment cache for (0 = all, order: createdAt DESC; excludes generic /articles feed-only rows)', '10')
->addOption('comments-budget', null, InputOption::VALUE_REQUIRED, 'Wall-clock seconds for the whole comments phase (Nostr fetches are slow; a single long thread can exceed a short budget; use 1200+ if prewarming many articles)', '600');
}
@ -257,16 +259,11 @@ final class PrewarmCommand extends Command @@ -257,16 +259,11 @@ final class PrewarmCommand extends Command
$commentBudgetSeconds = max(1, (int) $input->getOption('comments-budget'));
$commentPhaseStart = microtime(true);
$deadline = $commentPhaseStart + $commentBudgetSeconds;
$qb = $this->articleRepository->createQueryBuilder('a')
->where('a.slug IS NOT NULL')
->andWhere("a.slug != ''")
->andWhere('a.pubkey IS NOT NULL')
->andWhere("a.pubkey != ''")
->orderBy('a.createdAt', 'DESC');
$magazineList = $this->magazineContent->getAllMagazineCategoryArticlesForSyndication();
if ($maxArticles > 0) {
$qb->setMaxResults($maxArticles);
$magazineList = \array_slice($magazineList, 0, $maxArticles);
}
$articles = $qb->getQuery()->getResult();
$articles = $magazineList;
$articleCount = \count($articles);
$w = 0;
if ($articleCount === 0) {

6
src/Controller/ArticleController.php

@ -33,8 +33,10 @@ class ArticleController extends AbstractController @@ -33,8 +33,10 @@ class ArticleController extends AbstractController
#[Route('/fragment/comments', name: 'article_comments_fragment', methods: ['GET'])]
public function commentsFragment(Request $request, ArticleCommentThreadLoader $loader, LoggerInterface $logger): Response
{
// Article body may raise the global limit; keep this sub-request bounded so relay I/O cannot hit max_execution_time (500).
set_time_limit(45);
// {@see NostrClient::getArticleDiscussion} runs per-relay work in parallel CLI workers; allow headroom
// for all processes + Symfony (45s was too low and caused an uncatchable max-execution fatal → HTTP 500).
@set_time_limit(120);
@ini_set('max_execution_time', '120');
$t0 = microtime(true);
$coordinate = $request->query->getString('coordinate');

21
src/Controller/SeoController.php

@ -94,24 +94,7 @@ final class SeoController extends AbstractController @@ -94,24 +94,7 @@ final class SeoController extends AbstractController
public function feedMagazine(Request $request): Response
{
$site = (string) $this->params->get('name');
$articles = $this->articleRepository->findPublishedForSyndication(8000);
$bySlug = $this->dedupeArticlesByLatestRevision($articles);
$list = \array_values($bySlug);
usort($list, static function (Article $a, Article $b): int {
$ca = $a->getCreatedAt();
$cb = $b->getCreatedAt();
if ($ca === null && $cb === null) {
return 0;
}
if ($ca === null) {
return 1;
}
if ($cb === null) {
return -1;
}
return $cb <=> $ca;
});
$list = $this->magazineContent->getAllMagazineCategoryArticlesForSyndication();
$list = \array_slice($list, 0, self::FEED_MAX_ITEMS);
$feedUrl = $this->absoluteUrlForRoute('feed_magazine');
$homeUrl = $this->absoluteUrlForRoute('home');
@ -119,7 +102,7 @@ final class SeoController extends AbstractController @@ -119,7 +102,7 @@ final class SeoController extends AbstractController
$updated = $this->newestArticleUpdate($list);
$body = $this->buildAtomFeed(
$site.': all articles',
$site.': all categories',
(string) $this->params->get('description'),
$selfId,
$feedUrl,

22
src/Service/CommentReplyService.php

@ -92,7 +92,27 @@ final readonly class CommentReplyService @@ -92,7 +92,27 @@ final readonly class CommentReplyService
return ['ok' => false, 'error' => 'Reply must start with a quote line (>) linking the parent via nostr:nevent1 / naddr1 (reply blurb)', 'code' => 400];
}
$relays = $this->nostrClient->getArticleWriteRelayUrls();
$rawParentAuthor = isset($payload['parent_author_pubkey']) && \is_string($payload['parent_author_pubkey'])
? strtolower(trim($payload['parent_author_pubkey']))
: '';
$clientParentOk = 64 === \strlen($rawParentAuthor) && ctype_xdigit($rawParentAuthor);
$coordBits = explode(':', $expectedCoordinate, 3);
$articleAuthor = \count($coordBits) >= 2 ? strtolower((string) $coordBits[1]) : '';
$articleAuthorOk = 64 === \strlen($articleAuthor) && ctype_xdigit($articleAuthor);
if ((int) $parentKind === KindsEnum::COMMENTS->value) {
if (!$clientParentOk) {
return ['ok' => false, 'error' => 'parent_author_pubkey (64 hex) is required when replying to a comment', 'code' => 400];
}
$parentAuthorHex = $rawParentAuthor;
} else {
$parentAuthorHex = $clientParentOk ? $rawParentAuthor : $articleAuthor;
if (!$clientParentOk && !$articleAuthorOk) {
return ['ok' => false, 'error' => 'Invalid article coordinate; cannot determine author relays', 'code' => 400];
}
}
$relays = $this->nostrClient->getRelayUrlsForCommentPublish($expectedCoordinate, $parentAuthorHex);
$result = $this->nostrClient->publishEvent($wire, $relays);
$this->logger->info('comment_reply.published', [
'id' => $wire->getId(),

55
src/Service/MagazineContentService.php

@ -6,6 +6,7 @@ namespace App\Service; @@ -6,6 +6,7 @@ namespace App\Service;
use App\Entity\Article;
use App\Entity\Event;
use App\Enum\EventStatusEnum;
use App\Repository\ArticleRepository;
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
@ -222,4 +223,58 @@ final class MagazineContentService @@ -222,4 +223,58 @@ final class MagazineContentService
'category' => $category,
];
}
/**
* Union of every article referenced by a category index (root 30040). Use this for magazine-wide
* Atom and comment prewarm so "newest" tracks the magazine, not the generic community list.
*
* Dedupes by slug (newest {@see Article::getCreatedAt} wins). Only PUBLISHED/ARCHIVED rows.
*
* @return list<Article> Newest first
*/
public function getAllMagazineCategoryArticlesForSyndication(): array
{
$bySlug = [];
foreach ($this->getCategorySlugsFromStore() as $catSlug) {
$data = $this->getCategoryPageData($catSlug);
foreach ($data['list'] as $article) {
$s = $article->getEventStatus();
if ($s === null || ($s !== EventStatusEnum::PUBLISHED && $s !== EventStatusEnum::ARCHIVED)) {
continue;
}
$slug = \trim((string) $article->getSlug());
if ($slug === '') {
continue;
}
$c = $article->getCreatedAt();
if (!isset($bySlug[$slug])) {
$bySlug[$slug] = $article;
continue;
}
$prev = $bySlug[$slug]->getCreatedAt();
if ($c !== null && (null === $prev || $c > $prev)) {
$bySlug[$slug] = $article;
}
}
}
$list = \array_values($bySlug);
usort($list, static function (Article $a, Article $b): int {
$ca = $a->getCreatedAt();
$cb = $b->getCreatedAt();
if ($ca === null && $cb === null) {
return 0;
}
if ($ca === null) {
return 1;
}
if ($cb === null) {
return -1;
}
return $cb <=> $ca;
});
return $list;
}
}

229
src/Service/NostrClient.php

@ -19,6 +19,8 @@ use swentel\nostr\Relay\Relay; @@ -19,6 +19,8 @@ use swentel\nostr\Relay\Relay;
use swentel\nostr\Relay\RelaySet;
use swentel\nostr\Request\Request;
use swentel\nostr\Subscription\Subscription;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\ItemInterface;
@ -28,6 +30,9 @@ class NostrClient @@ -28,6 +30,9 @@ class NostrClient
/** Per-relay WebSocket I/O cap (seconds), applied on each relay’s {@see \WebSocket\Client}. */
private const RELAY_REQUEST_TIMEOUT_SEC = 15;
/** Extra wall time for {@see bin/nostr_relay_request_worker.php} process vs. WebSocket timeout. */
private const DISCUSSION_WORKER_GRACE_SEC = 5.0;
/** When a logged-in user lists this relay, also use {@see self::AGGR_NOSTR_LAND} for comment + profile reads. */
private const NOSTR_LAND = 'wss://nostr.land';
@ -53,6 +58,7 @@ class NostrClient @@ -53,6 +58,7 @@ class NostrClient
private readonly array $articleRelayUrls,
private readonly array $profileRelayUrls,
private readonly CacheInterface $relayQueryCache,
private readonly string $projectDir,
) {
$this->defaultRelaySet = $this->buildArticleRelaySet();
}
@ -67,6 +73,51 @@ class NostrClient @@ -67,6 +73,51 @@ class NostrClient
return $this->configuredArticleRelayUrlList();
}
/**
* Relays to publish a kind-1111 reply: site defaults plus NIP-65 (kind-10002) for the
* article author and, when the direct parent is another pubkey (nested comment), that
* author’s relays as well.
*
* @param string $articleCoordinate kind:pubkey:identifier
* @param string $parentEventAuthorHex 64-char hex of the event being replied to
*
* @return list<string>
*/
public function getRelayUrlsForCommentPublish(string $articleCoordinate, string $parentEventAuthorHex): array
{
$base = $this->configuredArticleRelayUrlList();
$parts = explode(':', $articleCoordinate, 3);
$articlePk = \count($parts) >= 2 ? strtolower((string) $parts[1]) : '';
if (64 !== \strlen($articlePk) || !ctype_xdigit($articlePk)) {
$articlePk = '';
}
$parentPk = strtolower(trim($parentEventAuthorHex));
if (64 !== \strlen($parentPk) || !ctype_xdigit($parentPk)) {
$parentPk = '';
}
$pubkeys = [];
if ($articlePk !== '') {
$pubkeys[] = $articlePk;
}
if ($parentPk !== '' && $parentPk !== $articlePk) {
$pubkeys[] = $parentPk;
}
$seen = array_fill_keys($base, true);
$out = $base;
foreach ($pubkeys as $pk) {
foreach ($this->getAuthorNip65RelaysList($pk) as $wss) {
if (!\is_string($wss) || $wss === '' || isset($seen[$wss])) {
continue;
}
$seen[$wss] = true;
$out[] = $wss;
}
}
return $out;
}
/**
* default_relay + article_relays from config, in order, deduplicated. Used for the static
* default set and as the base when merging author/extra relay URLs in {@see createRelaySet()}.
@ -265,40 +316,67 @@ class NostrClient @@ -265,40 +316,67 @@ class NostrClient
}
/**
* Get top 3 reputable relays from an author's relay list (cached; avoids a kind-10002 round trip per page view).
* Full NIP-65 (kind-10002) wss:// list for a hex pubkey, cached. Used for comment fetches; prefer
* {@see getTopReputableRelaysForAuthor} when you only need a few relays.
*
* @return list<string>
*/
private function getTopReputableRelaysForAuthor(string $pubkey, int $limit = 3): array
private function getAuthorNip65RelaysList(string $pubkey): array
{
$cacheKey = 'nostr_author_relays_'.hash('sha256', $pubkey);
$cacheKey = 'nostr_kind10002_relays_v1_'.hash('sha256', $pubkey);
return $this->relayQueryCache->get($cacheKey, function (ItemInterface $item) use ($pubkey, $limit): array {
return $this->relayQueryCache->get($cacheKey, function (ItemInterface $item) use ($pubkey): array {
$item->expiresAfter(3600);
try {
$authorRelays = $this->getNpubRelays($pubkey);
} catch (\Exception $e) {
$this->logger->error('Error getting author relays', [
$this->logger->error('Error getting author NIP-65 relay list', [
'pubkey' => $pubkey,
'error' => $e->getMessage(),
]);
$authorRelays = [];
}
if ($authorRelays === []) {
return [$this->defaultRelayUrl];
}
$authorRelays = array_filter($authorRelays, static function ($relay): bool {
$authorRelays = array_values(array_filter(
is_array($authorRelays) ? $authorRelays : [],
static function ($relay): bool {
return \is_string($relay)
&& str_starts_with($relay, 'wss:')
&& !str_contains($relay, 'localhost');
});
}
));
if ($authorRelays === []) {
return [$this->defaultRelayUrl];
return [];
}
$seen = [];
$out = [];
foreach ($authorRelays as $u) {
if (isset($seen[$u])) {
continue;
}
$seen[$u] = true;
$out[] = $u;
}
return array_values(array_slice($authorRelays, 0, $limit));
return $out;
});
}
/**
* A short prefix of the author NIP-65 list (or default relay) for queries that do not need every home relay.
*/
private function getTopReputableRelaysForAuthor(string $pubkey, int $limit = 3): array
{
$all = $this->getAuthorNip65RelaysList($pubkey);
if ($all === []) {
return [$this->defaultRelayUrl];
}
if ($limit < 1) {
$limit = 1;
}
return \array_values(\array_slice($all, 0, $limit));
}
/**
* @return list<string> Deduplicated profile relay URLs from config
*/
@ -883,23 +961,22 @@ class NostrClient @@ -883,23 +961,22 @@ class NostrClient
$pubkey = $parts[1];
$tRelays = microtime(true);
$authorRelays = $this->getTopReputableRelaysForAuthor($pubkey, 1);
$authorRelays = $this->getAuthorNip65RelaysList($pubkey);
$this->logger->info('nostr.article_discussion.author_relays_ready', [
'elapsed_ms' => (int) round((microtime(true) - $tRelays) * 1000),
'author_relays' => $authorRelays,
'author_relay_count' => \count($authorRelays),
]);
$baseForDiscussion = $this->configuredArticleRelayUrlList();
$mergedForDiscussion = $this->withAggrNostrLandIfUserSubscribesNostrLand(
array_merge($this->configuredArticleRelayUrlList(), $authorRelays)
array_merge($baseForDiscussion, $authorRelays)
);
$relaySet = $this->relaySetFromDistinctUrlList($mergedForDiscussion);
$plannedRelayUrls = $mergedForDiscussion;
$plannedRelayUrls = array_values(array_unique($mergedForDiscussion, \SORT_REGULAR));
$filters = $this->createArticleDiscussionFilters($coordinate, $rootEventHexId);
$subscription = new Subscription();
$subscriptionId = $subscription->setId();
$requestMessage = new RequestMessage($subscriptionId, $filters);
$request = $this->newTimedRequest($relaySet, $requestMessage);
$this->logger->info('nostr.article_discussion.req_sending', [
'subscription_id' => $subscriptionId,
@ -911,7 +988,20 @@ class NostrClient @@ -911,7 +988,20 @@ class NostrClient
$byId = [];
try {
$tSend = microtime(true);
$response = $request->send();
$workerPath = $this->projectDir.'/bin/nostr_relay_request_worker.php';
if (!\is_file($workerPath) || \count($plannedRelayUrls) <= 1) {
$response = $this->sendArticleDiscussionToRelaysSequential($plannedRelayUrls, $requestMessage);
} else {
try {
$response = $this->sendArticleDiscussionToRelaysParallel($plannedRelayUrls, $requestMessage);
} catch (\Throwable $e) {
$this->logger->warning('nostr.article_discussion.parallel_failed', [
'message' => $e->getMessage(),
'exception_class' => \get_class($e),
]);
$response = $this->sendArticleDiscussionToRelaysSequential($plannedRelayUrls, $requestMessage);
}
}
$sendMs = (int) round((microtime(true) - $tSend) * 1000);
$this->logger->info('nostr.article_discussion.req_response_envelope', [
'elapsed_ms' => $sendMs,
@ -994,6 +1084,105 @@ class NostrClient @@ -994,6 +1084,105 @@ class NostrClient
return ['thread' => $thread, 'quotes' => $quotes];
}
/**
* One {@see Request} over all relays (library visits each wss:// in series).
*
* @param list<string> $relayUrls
*
* @return array<string, mixed> Same shape as {@see Request::send()} (relay url → message list)
*/
private function sendArticleDiscussionToRelaysSequential(array $relayUrls, RequestMessage $requestMessage): array
{
$relaySet = $this->relaySetFromDistinctUrlList($relayUrls);
$request = $this->newTimedRequest($relaySet, $requestMessage);
return $request->send();
}
/**
* One short-lived CLI worker per relay so WebSocket I/O is parallel (vs. a single in-process
* {@see Request::send() loop).
*
* @param list<string> $relayUrls
*
* @return array<string, mixed> Same shape as {@see Request::send()}
*/
private function sendArticleDiscussionToRelaysParallel(array $relayUrls, RequestMessage $requestMessage): array
{
$worker = $this->projectDir.'/bin/nostr_relay_request_worker.php';
$phpBinary = (new PhpExecutableFinder())->find() ?: 'php';
$timeout = self::RELAY_REQUEST_TIMEOUT_SEC + (int) self::DISCUSSION_WORKER_GRACE_SEC;
$rawPayload = serialize($requestMessage);
$tmp = tempnam(sys_get_temp_dir(), 'nrq_');
if ($tmp === false) {
throw new \RuntimeException('tempnam failed for Nostr discussion payload');
}
try {
if (file_put_contents($tmp, $rawPayload) === false) {
throw new \RuntimeException('Could not write Nostr discussion temp payload');
}
/** @var array<string, Process> $procs */
$procs = [];
foreach ($relayUrls as $wss) {
if (!\is_string($wss) || $wss === '') {
continue;
}
$p = new Process(
[$phpBinary, $worker, $wss, $tmp],
$this->projectDir,
null,
null,
(float) $timeout
);
$p->start();
$procs[$wss] = $p;
}
$merged = [];
foreach ($procs as $wss => $p) {
$p->wait();
if (!$p->isSuccessful()) {
$err = $p->getErrorOutput();
$this->logger->warning('nostr.article_discussion.relay_worker_failed', [
'relay' => $wss,
'exit_code' => $p->getExitCode(),
'stderr' => $err !== '' ? $err : null,
]);
$merged[$wss] = [];
continue;
}
$out = trim($p->getOutput());
if ($out === '') {
$merged[$wss] = [];
continue;
}
$decoded = base64_decode($out, true);
if ($decoded === false || $decoded === '') {
$merged[$wss] = [];
continue;
}
$chunk = unserialize($decoded, ['allowed_classes' => true]);
if (!\is_array($chunk)) {
$merged[$wss] = [];
continue;
}
$merged = array_replace($merged, $chunk);
}
return $merged;
} finally {
if (\is_file($tmp)) {
@unlink($tmp);
}
}
}
/**
* Same merge/dedupe rules as {@see createRelaySet()} — used only for logging planned relay URLs.
*

Loading…
Cancel
Save