Browse Source

fix prewarm bug and add progress bars

imwald
Silberengel 1 week ago
parent
commit
4cf93e60a0
  1. 163
      src/Command/PrewarmCommand.php
  2. 14
      src/Service/MagazineIndexStore.php
  3. 21
      src/Service/MagazineRefresher.php
  4. 9
      src/Service/NostrClient.php

163
src/Command/PrewarmCommand.php

@ -15,6 +15,7 @@ use Psr\Log\LoggerInterface;
use swentel\nostr\Key\Key; use swentel\nostr\Key\Key;
use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Output\OutputInterface;
@ -70,11 +71,45 @@ final class PrewarmCommand extends Command
$budget = max(1, (int) $input->getOption('magazine-budget')); $budget = max(1, (int) $input->getOption('magazine-budget'));
$io->section('Magazine index (kinds 30040)'); $io->section('Magazine index (kinds 30040)');
try { try {
$this->magazineRefresher->refreshFromRelays($budget, []); $hb = (object) ['silent' => false];
$bar = null;
$this->runWithPcntlHeartbeat($io, 5, function () use ($io, $budget, &$bar, $hb): void {
$this->magazineRefresher->refreshFromRelays($budget, [], function (string $phase, array $p) use ($io, &$bar, $hb): void {
if ($phase === 'before_root') {
$io->writeln(' <comment>Fetching magazine root index (30040) from relays…</comment>');
} elseif ($phase === 'aborted') {
$hb->silent = true;
$this->cancelPcntlAlarm();
} elseif ($phase === 'after_root') {
$hb->silent = true;
$this->cancelPcntlAlarm();
$bar = $this->createPrewarmProgressBar(
$io,
max(1, (int) ($p['total_steps'] ?? 1)),
'Magazine index',
);
$bar->start();
$bar->setProgress(1);
} elseif ($phase === 'category_fetched' && $bar !== null) {
$bar->advance(1);
$slug = (string) ($p['slug'] ?? '');
if (strlen($slug) > 70) {
$slug = substr($slug, 0, 67).'…';
}
$bar->setMessage($slug !== '' ? 'Category: '.$slug : 'Category');
}
});
}, $hb);
if ($bar !== null) {
$bar->finish();
$io->newLine(2);
}
$io->success('Magazine indices refreshed (within budget).'); $io->success('Magazine indices refreshed (within budget).');
} catch (\Throwable $e) { } catch (\Throwable $e) {
$this->logger->error('app:prewarm magazine failed', ['e' => $e]); $this->logger->error('app:prewarm magazine failed', ['e' => $e]);
$io->warning('Magazine refresh failed: '.$e->getMessage()); $io->warning('Magazine refresh failed: '.$e->getMessage());
} finally {
$this->cancelPcntlAlarm();
} }
} else { } else {
$io->note('Skipping magazine (--no-magazine).'); $io->note('Skipping magazine (--no-magazine).');
@ -110,13 +145,26 @@ final class PrewarmCommand extends Command
if ($deletionPubkeys === []) { if ($deletionPubkeys === []) {
$io->note('No author pubkeys; skipping kind 5 deletion fetch.'); $io->note('No author pubkeys; skipping kind 5 deletion fetch.');
} else { } else {
$chunks = array_chunk($deletionPubkeys, 40);
$nChunks = \count($chunks);
$nipBar = $this->createPrewarmProgressBar($io, max(1, $nChunks), 'NIP-09 kind 5 (by author batch)');
$nipBar->start();
try { try {
$kind5 = $this->nostrClient->fetchKind5DeletionEventsForAuthors( $kind5 = $this->nostrClient->fetchKind5DeletionEventsForAuthors(
$deletionPubkeys, $deletionPubkeys,
$since, $since,
$until, $until,
40 40,
function (int $index, int $totalChunks, int $pubkeyCount) use ($nipBar): void {
$nipBar->advance(1);
$nipBar->setMessage(sprintf('Batch %d/%d · %d pubkeys', $index, $totalChunks, $pubkeyCount));
}
); );
} finally {
$nipBar->finish();
$io->newLine(2);
}
try {
$st = $this->nip09DeletionApplier->apply($kind5); $st = $this->nip09DeletionApplier->apply($kind5);
$io->writeln(sprintf( $io->writeln(sprintf(
'Kind 5 events: <info>%d</info> (deduped). Articles removed: <info>%d</info>; magazine root/category cache entries removed: <info>%d</info> / <info>%d</info>.', 'Kind 5 events: <info>%d</info> (deduped). Articles removed: <info>%d</info>; magazine root/category cache entries removed: <info>%d</info> / <info>%d</info>.',
@ -171,13 +219,15 @@ final class PrewarmCommand extends Command
$total, $total,
$batchSize $batchSize
)); ));
$bar = $io->createProgressBar($total); $bar = $this->createPrewarmProgressBar($io, $total, 'Kind-0 metadata');
$bar->start(); $bar->start();
try { try {
foreach (array_chunk($toWarm, $batchSize) as $chunk) { foreach (array_chunk($toWarm, $batchSize) as $chunk) {
$fetched = $this->nostrClient->fetchKind0MetadataForAuthors($chunk, $batchSize); $fetched = $this->nostrClient->fetchKind0MetadataForAuthors($chunk, $batchSize);
$n += $this->cacheService->putPrewarmMetadataBatch($chunk, $fetched, $keys); $n += $this->cacheService->putPrewarmMetadataBatch($chunk, $fetched, $keys);
$bar->advance(\count($chunk)); $bar->advance(\count($chunk));
$p0 = (string) ($chunk[0] ?? '');
$bar->setMessage('Batch up to · '.substr($p0, 0, 8).'…');
} }
} catch (\Throwable $e) { } catch (\Throwable $e) {
$this->logger->error('app:prewarm metadata batch failed', ['exception' => $e]); $this->logger->error('app:prewarm metadata batch failed', ['exception' => $e]);
@ -215,33 +265,102 @@ final class PrewarmCommand extends Command
$qb->setMaxResults($maxArticles); $qb->setMaxResults($maxArticles);
} }
$articles = $qb->getQuery()->getResult(); $articles = $qb->getQuery()->getResult();
$articleCount = \count($articles);
$w = 0; $w = 0;
/** @var Article $article */ if ($articleCount === 0) {
foreach ($articles as $article) { $io->note('No articles in DB to scan for comment cache.');
if (microtime(true) >= $deadline) { } else {
$io->warning('Comment phase stopped: comments-budget reached.'); $cBar = $this->createPrewarmProgressBar($io, $articleCount, 'Comment threads');
break; $cBar->start();
}
$slug = trim((string) $article->getSlug());
$pubkey = (string) $article->getPubkey();
if ($slug === '' || strlen($pubkey) !== 64) {
continue;
}
$kind = $article->getKind()?->value ?? 30023;
$coordinate = $kind.':'.$pubkey.':'.$slug;
$eventHex = (string) ($article->getEventId() ?? '');
try { try {
$this->commentThreadLoader->load($coordinate, $eventHex !== '' ? $eventHex : null); /** @var Article $article */
++$w; foreach ($articles as $article) {
} catch (\Throwable $e) { if (microtime(true) >= $deadline) {
$this->logger->warning('app:prewarm comment load', ['coord' => $coordinate, 'error' => $e->getMessage()]); $io->warning('Comment phase stopped: comments-budget reached.');
break;
}
$slug = trim((string) $article->getSlug());
$pubkey = (string) $article->getPubkey();
if ($slug === '' || strlen($pubkey) !== 64) {
$cBar->advance(1);
$cBar->setMessage('skip · invalid row');
continue;
}
$kind = $article->getKind()?->value ?? 30023;
$coordinate = $kind.':'.$pubkey.':'.$slug;
$msg = $slug;
if (strlen($msg) > 56) {
$msg = substr($msg, 0, 53).'…';
}
$cBar->setMessage($msg);
$eventHex = (string) ($article->getEventId() ?? '');
try {
$this->commentThreadLoader->load($coordinate, $eventHex !== '' ? $eventHex : null);
++$w;
} catch (\Throwable $e) {
$this->logger->warning('app:prewarm comment load', ['coord' => $coordinate, 'error' => $e->getMessage()]);
}
$cBar->advance(1);
}
} finally {
$cBar->finish();
$io->newLine(2);
} }
} }
$io->success(sprintf('Warmed comment cache for %d of %d article(s).', $w, \count($articles))); $io->success(sprintf('Warmed comment cache for %d of %d article(s).', $w, $articleCount));
return Command::SUCCESS; return Command::SUCCESS;
} }
private function createPrewarmProgressBar(SymfonyStyle $io, int $max, string $message = ''): ProgressBar
{
$bar = $io->createProgressBar($max);
if (method_exists($bar, 'setMinSecondsBetweenRedraws')) {
$bar->setMinSecondsBetweenRedraws(5.0);
}
$bar->setFormat(' %current%/%max% [%bar%] %percent:3s%%'."\n".' <comment>%message%</comment> <info>%elapsed:6s%</info> ');
$bar->setMessage($message);
return $bar;
}
/**
* @param object{silent?: bool} $controller
*/
private function runWithPcntlHeartbeat(SymfonyStyle $io, int $sec, callable $work, ?object $controller = null): void
{
if (!\function_exists('pcntl_async_signals') || !\function_exists('pcntl_signal') || !\function_exists('pcntl_alarm')) {
$work();
return;
}
$t0 = time();
$handler = function () use ($io, $t0, $sec, $controller): void {
if ($controller !== null && !empty($controller->silent)) {
return;
}
$io->writeln(sprintf(' <comment>… %ds elapsed</comment>', time() - $t0));
\pcntl_alarm((int) ceil($sec));
};
\pcntl_async_signals(true);
\pcntl_signal(\SIGALRM, $handler);
\pcntl_alarm((int) ceil($sec));
try {
$work();
} finally {
\pcntl_alarm(0);
\pcntl_signal(\SIGALRM, \SIG_DFL);
}
}
private function cancelPcntlAlarm(): void
{
if (\function_exists('pcntl_alarm')) {
@\pcntl_alarm(0);
}
}
private function disableCliExecutionTimeLimit(): void private function disableCliExecutionTimeLimit(): void
{ {
@set_time_limit(0); @set_time_limit(0);

14
src/Service/MagazineIndexStore.php

@ -40,7 +40,7 @@ final class MagazineIndexStore
if ($slug === '') { if ($slug === '') {
return null; return null;
} }
$item = $this->pool->getItem(self::CAT_PREFIX.$slug); $item = $this->pool->getItem($this->categoryKey($slug));
if (!$item->isHit()) { if (!$item->isHit()) {
return null; return null;
} }
@ -67,7 +67,7 @@ final class MagazineIndexStore
if ($slug === '') { if ($slug === '') {
return; return;
} }
$item = $this->pool->getItem(self::CAT_PREFIX.$slug); $item = $this->pool->getItem($this->categoryKey($slug));
$item->set(serialize($event)); $item->set(serialize($event));
$item->expiresAfter(self::PERSIST_TTL); $item->expiresAfter(self::PERSIST_TTL);
$this->pool->save($item); $this->pool->save($item);
@ -83,7 +83,7 @@ final class MagazineIndexStore
if ($slug === '') { if ($slug === '') {
return; return;
} }
$this->pool->deleteItem(self::CAT_PREFIX.$slug); $this->pool->deleteItem($this->categoryKey($slug));
} }
/** /**
@ -101,6 +101,14 @@ final class MagazineIndexStore
return self::ROOT_PREFIX.hash('sha256', $npub."\0".$dTag); return self::ROOT_PREFIX.hash('sha256', $npub."\0".$dTag);
} }
/**
* Category `d` / slug strings may contain colons (NIP-33 `a` segments); PSR-6 keys must not use `{}()/\@:`.
*/
private function categoryKey(string $slug): string
{
return self::CAT_PREFIX.hash('sha256', $slug);
}
private function unwrap(mixed $value): ?Event private function unwrap(mixed $value): ?Event
{ {
if (!\is_string($value) || $value === '') { if (!\is_string($value) || $value === '') {

21
src/Service/MagazineRefresher.php

@ -29,8 +29,11 @@ final class MagazineRefresher
/** /**
* Fetches the root index then each category index until $budgetSeconds elapses. $preferSlugs * Fetches the root index then each category index until $budgetSeconds elapses. $preferSlugs
* are requested first (e.g. current /cat route) so they are less likely to miss the budget. * are requested first (e.g. current /cat route) so they are less likely to miss the budget.
*
* @param (callable(string, array<string, int|string|bool|null>): void)|null $onProgress
* Phases: `before_root`, `after_root` (total_steps, step, slug_count), `category_fetched` (step, total_steps, slug)
*/ */
public function refreshFromRelays(int $budgetSeconds = 8, array $preferSlugs = []): void public function refreshFromRelays(int $budgetSeconds = 8, array $preferSlugs = [], ?callable $onProgress = null): void
{ {
$budgetSeconds = max(1, min(30, $budgetSeconds)); $budgetSeconds = max(1, min(30, $budgetSeconds));
$deadline = microtime(true) + $budgetSeconds; $deadline = microtime(true) + $budgetSeconds;
@ -45,8 +48,10 @@ final class MagazineRefresher
$defaultRelay = (string) $this->params->get('default_relay'); $defaultRelay = (string) $this->params->get('default_relay');
$relayLabel = (string) (parse_url($defaultRelay, \PHP_URL_HOST) ?: $defaultRelay); $relayLabel = (string) (parse_url($defaultRelay, \PHP_URL_HOST) ?: $defaultRelay);
$onProgress?->__invoke('before_root', []);
$root = $this->nostrClient->getMagazineIndex($npub, $dTag); $root = $this->nostrClient->getMagazineIndex($npub, $dTag);
if ($root === null) { if ($root === null) {
$onProgress?->__invoke('aborted', ['reason' => 'no_root']);
$this->logger->warning(sprintf( $this->logger->warning(sprintf(
'MagazineRefresher: root index not returned (tried from %s)', 'MagazineRefresher: root index not returned (tried from %s)',
$relayLabel $relayLabel
@ -61,6 +66,13 @@ final class MagazineRefresher
$this->store->putRoot($npub, $dTag, $root); $this->store->putRoot($npub, $dTag, $root);
$slugs = $this->orderedCategorySlugs($this->categorySlugsFromRoot($root), $preferSlugs); $slugs = $this->orderedCategorySlugs($this->categorySlugsFromRoot($root), $preferSlugs);
$totalSteps = 1 + \count($slugs);
$onProgress?->__invoke('after_root', [
'total_steps' => $totalSteps,
'step' => 1,
'slug_count' => \count($slugs),
]);
$step = 1;
foreach ($slugs as $slug) { foreach ($slugs as $slug) {
if (microtime(true) >= $deadline) { if (microtime(true) >= $deadline) {
$this->logger->notice('MagazineRefresher: stopped at time budget; some categories not fetched', [ $this->logger->notice('MagazineRefresher: stopped at time budget; some categories not fetched', [
@ -83,6 +95,13 @@ final class MagazineRefresher
'message' => $e->getMessage(), 'message' => $e->getMessage(),
'relay' => $defaultRelay, 'relay' => $defaultRelay,
]); ]);
} finally {
++$step;
$onProgress?->__invoke('category_fetched', [
'step' => $step,
'total_steps' => $totalSteps,
'slug' => $slug,
]);
} }
} }

9
src/Service/NostrClient.php

@ -431,6 +431,7 @@ class NostrClient
/** /**
* NIP-09 kind 5 deletion requests in $since..$until (unix), batched by author pubkey (hex). * NIP-09 kind 5 deletion requests in $since..$until (unix), batched by author pubkey (hex).
* *
* @param (callable(int, int, int): void)|null $afterChunk 1-based index, total chunks, pubkeys in chunk
* @param list<string> $authorPubkeyHex * @param list<string> $authorPubkeyHex
* @return list<stdClass> Deduplicated by event `id` (highest {@see created_at} kept) * @return list<stdClass> Deduplicated by event `id` (highest {@see created_at} kept)
*/ */
@ -439,6 +440,7 @@ class NostrClient
int $since, int $since,
int $until, int $until,
int $authorsPerRequest = 40, int $authorsPerRequest = 40,
?callable $afterChunk = null,
): array { ): array {
$authorPubkeyHex = \array_values(\array_unique(\array_filter( $authorPubkeyHex = \array_values(\array_unique(\array_filter(
$authorPubkeyHex, $authorPubkeyHex,
@ -449,7 +451,9 @@ class NostrClient
} }
$authorsPerRequest = max(1, min(100, $authorsPerRequest)); $authorsPerRequest = max(1, min(100, $authorsPerRequest));
$byId = []; $byId = [];
foreach (array_chunk($authorPubkeyHex, $authorsPerRequest) as $chunk) { $chunks = array_chunk($authorPubkeyHex, $authorsPerRequest);
$numChunks = \count($chunks);
foreach ($chunks as $i => $chunk) {
$request = $this->createNostrRequest( $request = $this->createNostrRequest(
kinds: [KindsEnum::DELETION_REQUEST], kinds: [KindsEnum::DELETION_REQUEST],
filters: [ filters: [
@ -468,6 +472,9 @@ class NostrClient
'raw_events' => \count($events), 'raw_events' => \count($events),
'ms' => (int) round((microtime(true) - $t0) * 1000), 'ms' => (int) round((microtime(true) - $t0) * 1000),
]); ]);
if ($afterChunk !== null) {
$afterChunk(1 + (int) $i, $numChunks, \count($chunk));
}
foreach ($events as $ev) { foreach ($events as $ev) {
if (!\is_object($ev) || (int) ($ev->kind ?? 0) !== KindsEnum::DELETION_REQUEST->value) { if (!\is_object($ev) || (int) ($ev->kind ?? 0) !== KindsEnum::DELETION_REQUEST->value) {
continue; continue;

Loading…
Cancel
Save