Linux native client
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

203 lines
8.0 KiB

"""Coordinates relay ingest (signals), ranking, and publish."""
from __future__ import annotations
import asyncio
import json
import logging
import threading
import time
from typing import Any, cast
from PySide6.QtCore import QObject, Signal
from imwald.core.accounts_store import StoredAccount, unlock_secret
from imwald.core.database import Database
from imwald.core.nostr_crypto import build_signed_event, verify_nostr_event
from imwald.core.relay_list import resolve_for_account
from imwald.core.nostr_publish import publish_to_relays_sync
from imwald.core.ranker import Ranker
from imwald.core.relay_manager import RelayManager
from imwald.core.relay_policy import (
AGGR_THREAD_RELAY,
DEFAULT_READ_RELAYS,
DEFAULT_WRITE_RELAYS,
WISP_TRENDING_FEED_KINDS,
default_feed_read_relays,
is_wisp_trending_relay_url,
use_aggr_for_threads,
)
log = logging.getLogger(__name__)
class NostrEngine(QObject):
"""Relay worker thread emits `event_ingested` — connect on UI thread to write SQLite."""
event_ingested = Signal(str, object)
relay_status = Signal(str)
def __init__(self, db: Database) -> None:
super().__init__()
self.db = db
self.ranker = Ranker(db)
self._thread: threading.Thread | None = None
self._loop: asyncio.AbstractEventLoop | None = None
self._manager: RelayManager | None = None
self._app_stop: asyncio.Event | None = None
def start_relays(
self,
read_urls: list[str] | None = None,
*,
user_write_urls: list[str] | None = None,
list30000_owner: str | None = None,
) -> None:
if self._thread and self._thread.is_alive():
self.stop_relays()
urls = list(read_urls or default_feed_read_relays())
aggr_writes = set(user_write_urls or list(DEFAULT_WRITE_RELAYS))
k3000_owner = (list30000_owner or "").strip().lower()
def runner() -> None:
async def on_ev(url: str, ev: dict[str, Any]) -> None:
self.event_ingested.emit(url, ev)
async def on_notice(url: str, text: str) -> None:
self.relay_status.emit(f"{url} NOTICE {text}")
async def amain() -> None:
app_stop = asyncio.Event()
self._app_stop = app_stop
mgr = RelayManager(on_event=on_ev, on_notice=on_notice)
self._manager = mgr
# Kind 0 metadata, 6 reposts, 7 reactions, 9735 zap receipts — for feed UI + engagement counts.
kinds_main = [0, 1, 6, 7, 16, 1111, 1244, 9735, 20, 21, 30023, 9802, 11]
for u in urls:
mgr.register(u)
kinds = list(WISP_TRENDING_FEED_KINDS) if is_wisp_trending_relay_url(u) else kinds_main
mgr.request_subscribe(
u,
f"imwald-{abs(hash(u)) % 10**8}",
[{"kinds": kinds, "limit": 220}],
)
if use_aggr_for_threads(aggr_writes):
mgr.register(AGGR_THREAD_RELAY)
mgr.request_subscribe(
AGGR_THREAD_RELAY,
"imwald-aggr",
[{"kinds": [1, 16, 1111, 1244], "limit": 120}],
)
if len(k3000_owner) == 64 and all(c in "0123456789abcdef" for c in k3000_owner):
for u in urls:
sid = f"imwald-k30000-{abs(hash(f'{u}:{k3000_owner}')) % 10**9}"
mgr.request_subscribe(
u,
sid,
[{"kinds": [30000], "authors": [k3000_owner], "limit": 150}],
)
await mgr.connect_all()
await app_stop.wait()
await mgr.stop()
loop = asyncio.new_event_loop()
self._loop = loop
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(amain())
finally:
self._loop = None
self._manager = None
self._app_stop = None
loop.close()
self._thread = threading.Thread(target=runner, name="nostr-relay", daemon=True)
self._thread.start()
def stop_relays(self) -> None:
if self._loop and self._app_stop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._app_stop.set)
if self._thread:
self._thread.join(timeout=5.0)
self._thread = None
@staticmethod
def apply_ingest_to_db(db: Database, ev: dict[str, Any], source_relay: str | None = None) -> None:
if "id" not in ev:
return
if not verify_nostr_event(ev):
return
if ev.get("kind") == 5:
raw_tags = ev.get("tags")
tag_rows: list[object] = cast(list[object], raw_tags) if isinstance(raw_tags, list) else []
for t_obj in tag_rows:
if not isinstance(t_obj, list):
continue
t = cast(list[object], t_obj)
if t and str(t[0]) == "e" and len(t) > 1:
db.tombstone_event(str(t[1]))
db.upsert_event(ev, source_relay=source_relay)
def publish_kind0_and_lists(
self,
account: StoredAccount,
password: str | None,
*,
username: str,
about: str,
interest_tags: list[str],
languages: list[str],
) -> None:
sec = unlock_secret(account, password)
now = int(time.time())
# NIP-01 metadata: unique handle ``imwald-anon-<created_at>``; friendly phrase in ``username``.
profile: dict[str, object] = {
"name": f"imwald-anon-{now}",
"username": username.strip() or "gentle ferns",
"about": about,
}
tags0: list[list[str]] = [["client", "imwald"]]
try:
from imwald.core.forest_avatar import build_forest_avatar_png
from imwald.core.nostr_nip96_upload import nip94_tags_to_imeta, upload_image_nip96_nostr_build
png = build_forest_avatar_png()
url, nip94_tags = upload_image_nip96_nostr_build(sec, png)
profile["picture"] = url
tags0.append(nip94_tags_to_imeta(nip94_tags))
except Exception as e: # noqa: BLE001
log.warning("Default forest avatar upload skipped: %s", e)
kind0 = build_signed_event(
sec,
created_at=now,
kind=0,
tags=tags0,
content=json.dumps(profile, ensure_ascii=False),
)
tags_10002: list[list[str]] = [["client", "imwald"]]
for r in DEFAULT_READ_RELAYS:
tags_10002.append(["r", r, "read"])
for r in DEFAULT_WRITE_RELAYS:
tags_10002.append(["r", r, "write"])
ev10002 = build_signed_event(sec, created_at=now, kind=10002, tags=tags_10002, content="")
tags_10015 = [["client", "imwald"]]
for t in interest_tags[:50]:
tt = t if t.startswith("#") else f"#{t}"
tags_10015.append(["t", tt[1:] if tt.startswith("#") else tt])
for lang in languages[:3]:
tags_10015.append(["l", lang])
ev10015 = build_signed_event(sec, created_at=now, kind=10015, tags=tags_10015, content="")
write_urls = list(DEFAULT_WRITE_RELAYS)
publish_to_relays_sync(write_urls, kind0)
publish_to_relays_sync(write_urls, ev10002)
publish_to_relays_sync(write_urls, ev10015)
for ev in (kind0, ev10002, ev10015):
self.db.upsert_event(ev)
def publish_nip09_deletion(self, account: StoredAccount, password: str | None, target_event_id: str) -> None:
sec = unlock_secret(account, password)
now = int(time.time())
tags = [["e", target_event_id], ["k", "1"]]
ev = build_signed_event(sec, created_at=now, kind=5, tags=tags, content="")
publish_to_relays_sync(resolve_for_account(self.db, account.pubkey).write_urls, ev)
self.db.upsert_event(ev)