diff --git a/DDD_ANALYSIS.md b/DDD_ANALYSIS.md index ea651f5..d0e27bf 100644 --- a/DDD_ANALYSIS.md +++ b/DDD_ANALYSIS.md @@ -41,17 +41,19 @@ This document provides a comprehensive Domain-Driven Design (DDD) analysis of th ## Executive Summary -ORLY demonstrates **mature DDD adoption** for a system of its complexity. The codebase exhibits clear bounded context separation, proper repository patterns with multiple backend implementations, and well-designed interface segregation that prevents circular dependencies. +ORLY demonstrates **mature DDD adoption** with sophisticated modular architecture. The codebase has evolved from a single-binary relay to a **multi-process system** with clear bounded context separation, gRPC-based inter-process communication, and pluggable implementations for database, ACL, and sync services. **Strengths:** - Clear separation between `app/` (application layer) and `pkg/` (domain/infrastructure) -- Repository pattern with three interchangeable backends (Badger, Neo4j, WasmDB) -- Interface-based ACL system with pluggable implementations (None, Follows, Managed) +- Repository pattern with four interchangeable backends (Badger, Neo4j, WasmDB, gRPC) +- Interface-based ACL system with four implementations (None, Follows, Managed, Curating) +- **Driver Registry pattern** for runtime component selection +- **Process Supervisor pattern** for split IPC mode deployment - Per-connection aggregate isolation in `Listener` - Strong use of Go interfaces for dependency inversion -- **New:** Immutable `EventRef` value object alongside legacy `IdPkTs` -- **New:** Comprehensive protocol extensions (Blossom, Graph Queries, NIP-43, NIP-86) -- **New:** Distributed sync with cluster replication support +- Immutable `EventRef` value object alongside legacy `IdPkTs` +- Comprehensive protocol extensions (NIP-43, NIP-77, NIP-86, Blossom, Graph Queries) +- **gRPC service layer** exposing all major interfaces for remote access **Areas for Improvement:** - Domain events are implicit rather than explicit types @@ -59,7 +61,7 @@ ORLY demonstrates **mature DDD adoption** for a system of its complexity. The co - Handler methods mix application orchestration with domain logic - Ubiquitous language is partially documented -**Overall DDD Maturity Score: 7.5/10** (improved from 7/10) +**Overall DDD Maturity Score: 8/10** (improved from 7.5/10) --- @@ -72,13 +74,15 @@ ORLY organizes code into distinct bounded contexts, each with its own model and #### 1. Event Storage Context (`pkg/database/`) - **Responsibility:** Persistent storage of Nostr events with indexing and querying - **Key Abstractions:** `Database` interface (109 lines), `Subscription`, `Payment`, `NIP43Membership` -- **Implementations:** Badger (embedded), Neo4j (graph), WasmDB (browser) +- **Implementations:** Badger (embedded), Neo4j (graph), WasmDB (browser), gRPC (remote) +- **gRPC Server:** `pkg/database/server/` - DatabaseService with 250+ RPC methods - **File:** `pkg/database/interface.go:17-109` #### 2. Access Control Context (`pkg/acl/`) - **Responsibility:** Authorization decisions for read/write operations - **Key Abstractions:** `I` interface, `Registry`, access levels (none/read/write/admin/owner) -- **Implementations:** `None`, `Follows`, `Managed` +- **Implementations:** `None`, `Follows`, `Managed`, `Curating` +- **gRPC Server:** `pkg/acl/server/` - ACLService - **Files:** `pkg/acl/acl.go`, `pkg/interfaces/acl/acl.go:21-40` #### 3. Event Policy Context (`pkg/policy/`) @@ -90,19 +94,20 @@ ORLY organizes code into distinct bounded contexts, each with its own model and #### 4. Connection Management Context (`app/`) - **Responsibility:** WebSocket lifecycle, message routing, authentication, flow control - **Key Abstractions:** `Listener`, `Server`, message handlers, `messageRequest` +- **Handlers:** 25+ message type handlers - **File:** `app/listener.go:24-52` #### 5. Protocol Extensions Context (`pkg/protocol/`) - **Responsibility:** NIP implementations beyond core protocol - **Subcontexts:** - - **NIP-43 Membership** (`pkg/protocol/nip43/`): Invite-based access control + - **NIP-43 Membership** (`pkg/protocol/nip43/`): Invite-based access control (kinds 28934, 28936, 8000) - **Graph Queries** (`pkg/protocol/graph/`): BFS traversal for follows/followers/threads - **NWC Payments** (`pkg/protocol/nwc/`): Nostr Wallet Connect integration - **Blossom** (`pkg/protocol/blossom/`): BUD protocol definitions - **Directory** (`pkg/protocol/directory/`): Relay directory client #### 6. Blob Storage Context (`pkg/blossom/`) -- **Responsibility:** Binary blob storage following BUD specifications +- **Responsibility:** Binary blob storage following BUD-09 specifications - **Key Abstractions:** `Server`, `Storage`, `Blob`, `BlobMeta` - **Invariants:** SHA-256 hash integrity, MIME type validation, quota enforcement - **Files:** `pkg/blossom/server.go`, `pkg/blossom/storage.go` @@ -115,9 +120,14 @@ ORLY organizes code into distinct bounded contexts, each with its own model and #### 8. Distributed Sync Context (`pkg/sync/`) - **Responsibility:** Federation and replication between relay peers -- **Key Abstractions:** `Manager`, `ClusterManager`, `RelayGroupManager`, `NIP11Cache` -- **Integration:** Serial-number based sync protocol, NIP-11 peer discovery -- **Files:** `pkg/sync/manager.go`, `pkg/sync/cluster.go`, `pkg/sync/relaygroup.go` +- **Key Abstractions:** `Manager`, `Registry`, driver pattern +- **Implementations:** + - **Negentropy** (`pkg/sync/negentropy/`): NIP-77 set reconciliation + - **Cluster** (`pkg/sync/cluster/`): HTTP-based pull replication (kind 39108) + - **Distributed** (`pkg/sync/distributed/`): Peer-to-peer sync + - **RelayGroup** (`pkg/sync/relaygroup/`): Relay grouping (kind 39105) +- **gRPC Clients:** Each sync implementation has gRPC client for split mode +- **Files:** `pkg/sync/manager.go`, `pkg/sync/negentropy/`, `pkg/sync/cluster/` #### 9. Spider Context (`pkg/spider/`) - **Responsibility:** Syncing events from admin relays for followed pubkeys @@ -125,48 +135,78 @@ ORLY organizes code into distinct bounded contexts, each with its own model and - **Integration:** Batch subscriptions, rate limit backoff, blackout periods - **File:** `pkg/spider/spider.go` +#### 10. Process Supervision Context (`cmd/orly-launcher/`) **NEW** +- **Responsibility:** Multi-process lifecycle management for split IPC mode +- **Key Abstractions:** `Supervisor`, `Config`, process state machine +- **Patterns:** Supervisor pattern with dependency ordering, crash recovery +- **Integration:** gRPC health checks, graceful shutdown ordering +- **Files:** `cmd/orly-launcher/supervisor.go`, `cmd/orly-launcher/config.go` + +#### 11. Certificate Management Context (`cmd/orly-certs/`) **NEW** +- **Responsibility:** TLS certificate provisioning and renewal +- **Key Abstractions:** Certificate manager, DNS-01 challenge handler +- **Integration:** Independent service, communicates via filesystem +- **File:** `cmd/orly-certs/` + ### Context Map ``` -┌─────────────────────────────────────────────────────────────────────────────┐ -│ Connection Management (app/) │ -│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ -│ │ Server │───▶│ Listener │───▶│ Handlers │◀──▶│ Publishers │ │ -│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ -└────────┬────────────────────┬────────────────────┬──────────────────────────┘ - │ │ │ - │ [Conformist] │ [Customer-Supplier]│ [Customer-Supplier] - ▼ ▼ ▼ -┌────────────────┐ ┌────────────────┐ ┌────────────────┐ -│ Access Control│ │ Event Storage │ │ Event Policy │ -│ (pkg/acl/) │ │ (pkg/database/)│ │ (pkg/policy/) │ -│ │ │ │ │ │ -│ Registry ◀────┼───┼────Conformist──┼───┼─▶ Manager │ -└────────────────┘ └────────────────┘ └────────────────┘ - │ │ │ - │ │ [Shared Kernel] │ - │ ▼ │ - │ ┌────────────────┐ │ - │ │ Event Entity │ │ - │ │(git.mleku.dev/ │◀───────────┘ - │ │ mleku/nostr) │ - │ └────────────────┘ - │ │ - │ [Anti-Corruption] │ [Customer-Supplier] - ▼ ▼ -┌────────────────┐ ┌────────────────┐ ┌────────────────┐ -│ Rate Limiting │ │ Protocol │ │ Blob Storage │ -│ (pkg/ratelimit)│ │ Extensions │ │ (pkg/blossom) │ -│ │ │ (pkg/protocol/)│ │ │ -└────────────────┘ └────────────────┘ └────────────────┘ - │ - ┌────────────────────┼────────────────────┐ - ▼ ▼ ▼ -┌────────────────┐ ┌────────────────┐ ┌────────────────┐ -│ Distributed │ │ Spider │ │ Graph Queries │ -│ Sync │ │ (pkg/spider) │ │(pkg/protocol/ │ -│ (pkg/sync/) │ │ │ │ graph/) │ -└────────────────┘ └────────────────┘ └────────────────┘ +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ Process Supervision (orly-launcher) │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Supervisor │───▶│ DB Process │───▶│ ACL Process │───▶│Relay Process│ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ +│ │ │ │ │ │ +│ │ [Dependency] │ [gRPC] │ [gRPC] │ │ +│ ▼ ▼ ▼ ▼ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │Sync Services│ │ Certs │ │ Admin Web │ │ Health │ │ +│ │ (4 procs) │ │ Service │ │ UI │ │ Checks │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ +└─────────────────────────────────────────────────────────────────────────────────┘ + │ + ┌───────────────────────────────┼───────────────────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌────────────────┐ ┌────────────────┐ ┌────────────────┐ +│ Event Storage │ │ Access Control│ │ Event Policy │ +│ (pkg/database/)│ │ (pkg/acl/) │ │ (pkg/policy/) │ +│ │ │ │ │ │ +│ Badger│Neo4j │◀─[gRPC]─────▶│Follows│Managed │◀─[Conformist]│ Manager │ +│ WasmDB│gRPC │ │Curating│None │ │ │ +└────────────────┘ └────────────────┘ └────────────────┘ + │ │ │ + │ [Shared Kernel] │ │ + ▼ ▼ │ +┌────────────────────────────────────────────────────────────┐ │ +│ Event Entity │ │ +│ (git.mleku.dev/mleku/nostr) │◀─────────┘ +│ Filter, Tag, Subscription types │ +└────────────────────────────────────────────────────────────┘ + │ │ + │ [Customer-Supplier] │ [Customer-Supplier] + ▼ ▼ +┌────────────────┐ ┌────────────────┐ ┌────────────────┐ +│ Blob Storage │ │ Protocol │ │ Rate Limiting │ +│ (pkg/blossom) │ │ Extensions │ │ (pkg/ratelimit)│ +│ │ │ (pkg/protocol/)│ │ │ +└────────────────┘ └────────────────┘ └────────────────┘ + │ + ┌───────────────────────────────┼───────────────────────────────┐ + ▼ ▼ ▼ +┌────────────────┐ ┌────────────────┐ ┌────────────────┐ +│ NIP-43 │ │ NIP-77 │ │ Graph Queries │ +│ Membership │ │ Negentropy │ │(pkg/protocol/ │ +│(kinds 28934/36)│ │ Sync │ │ graph/) │ +└────────────────┘ └────────────────┘ └────────────────┘ + │ + ┌───────────────────────────────┼───────────────────────────────┐ + ▼ ▼ ▼ +┌────────────────┐ ┌────────────────┐ ┌────────────────┐ +│ Cluster │ │ Distributed │ │ Relay Group │ +│ Sync │ │ Sync │ │ (kind 39105) │ +│ (kind 39108) │ │ │ │ │ +└────────────────┘ └────────────────┘ └────────────────┘ ``` **Integration Patterns Identified:** @@ -174,26 +214,31 @@ ORLY organizes code into distinct bounded contexts, each with its own model and | Upstream | Downstream | Pattern | Notes | |----------|------------|---------|-------| | nostr library | All contexts | Shared Kernel | Event, Filter, Tag types | -| Database | ACL, Policy, Blossom | Customer-Supplier | Query for follow lists, permissions, blob storage | +| Database | ACL, Policy, Blossom, Sync | Customer-Supplier | Query for follows, permissions, event storage | | Policy | Handlers, Sync | Conformist | All respect policy decisions | | ACL | Handlers, Blossom | Conformist | Handlers/Blossom respect access levels | -| Rate Limit | Database | Anti-Corruption | Load monitor abstraction | -| Sync | Database, Policy | Customer-Supplier | Serial-based event replication | +| Rate Limit | Database | Anti-Corruption | LoadMonitor abstraction | +| Sync Services | Database | Customer-Supplier | Serial-based event replication | +| Launcher | All Services | Supervisor | Process lifecycle management | +| gRPC Layer | DB, ACL, Sync | Published Language | Protocol buffer contracts | ### Subdomain Classification | Subdomain | Type | Justification | |-----------|------|---------------| | Event Storage | **Core** | Central to relay's value proposition | -| Access Control | **Core** | Key differentiator (WoT, follows-based, managed) | +| Access Control | **Core** | Key differentiator (WoT, follows-based, managed, curating) | | Event Policy | **Core** | Enables complex filtering rules | | Graph Queries | **Core** | Unique social graph traversal capabilities | | NIP-43 Membership | **Core** | Unique invite-based access model | | Blob Storage (Blossom) | **Core** | Media hosting differentiator | +| NIP-77 Negentropy Sync | **Core** | Efficient relay-to-relay sync | | Connection Management | **Supporting** | Standard WebSocket infrastructure | | Rate Limiting | **Supporting** | Operational concern with PID controller | -| Distributed Sync | **Supporting** | Infrastructure for federation | +| Cluster Sync | **Supporting** | Infrastructure for federation | | Spider | **Supporting** | Data aggregation from external relays | +| Process Supervision | **Generic** | Standard process management | +| Certificate Management | **Generic** | Standard TLS infrastructure | --- @@ -231,7 +276,7 @@ type InviteCode struct { } ``` - **Identity:** Unique code string -- **Lifecycle:** Created → Valid → Used/Expired +- **Lifecycle:** Created -> Valid -> Used/Expired - **Invariants:** Cannot be reused once consumed #### Subscription (Payment Entity) @@ -240,7 +285,7 @@ type InviteCode struct { // GetSubscription, ExtendSubscription, RecordPayment ``` - **Identity:** Pubkey -- **Lifecycle:** Trial → Active → Expired +- **Lifecycle:** Trial -> Active -> Expired - **Invariants:** Can only extend if not expired #### Blob (Blossom Entity) @@ -255,14 +300,28 @@ type BlobMeta struct { } ``` - **Identity:** SHA-256 hash -- **Lifecycle:** Uploaded → Active → Deleted +- **Lifecycle:** Uploaded -> Active -> Deleted - **Invariants:** Hash must match content; owner can delete +#### Process (Supervisor Entity) **NEW** +```go +// cmd/orly-launcher/supervisor.go (implied) +type Process struct { + Name string // Identity: service name + Cmd *exec.Cmd // Running process + State ProcessState + Restarts int +} +``` +- **Identity:** Service name (orly-db, orly-acl, etc.) +- **Lifecycle:** Stopped -> Starting -> Running -> Stopping +- **Invariants:** Dependency ordering; health check before dependent startup + ### Value Objects Value objects are immutable and defined by their attributes, not identity. -#### EventRef (Immutable Event Reference) - **NEW** +#### EventRef (Immutable Event Reference) ```go // pkg/interfaces/store/store_interface.go:99-107 type EventRef struct { @@ -321,7 +380,7 @@ type Rule struct { } ``` - **Complexity:** 25+ fields, decomposition candidate -- **Binary caches:** Performance optimization for hex→binary conversion +- **Binary caches:** Performance optimization for hex->binary conversion #### WriteRequest (Message Value) ```go @@ -335,6 +394,24 @@ type WriteRequest struct { } ``` +#### LauncherConfig (Configuration Value) **NEW** +```go +// cmd/orly-launcher/config.go +type Config struct { + DBBackend string // badger, neo4j + DBListen string // 127.0.0.1:50051 + ACLEnabled bool + ACLMode string // follows, managed, curation + ACLListen string // 127.0.0.1:50052 + AdminEnabled bool + AdminPort int + AdminOwners []string + // ... sync service configs +} +``` +- **Persistence:** JSON file at `~/.config/orly/launcher.json` +- **Environment Override:** All fields can be overridden via `ORLY_LAUNCHER_*` + ### Aggregates Aggregates are clusters of entities/value objects with consistency boundaries. @@ -387,6 +464,16 @@ if isAuthMessage { - MIME type restrictions - Owner-only deletion +#### Supervisor Aggregate **NEW** +- **Root:** `Supervisor` +- **Members:** Process map, config, dependency graph +- **Boundary:** All managed processes +- **Invariants:** + - Dependency ordering (DB -> ACL -> Sync -> Relay) + - Health checks before dependent startup + - Reverse shutdown ordering + - Crash recovery with restart limits + ### Repositories The Repository pattern abstracts persistence for aggregate roots. @@ -429,6 +516,7 @@ type Database interface { 1. **Badger** (`pkg/database/database.go`): Embedded key-value store 2. **Neo4j** (`pkg/neo4j/`): Graph database for social queries 3. **WasmDB** (`pkg/wasmdb/`): Browser IndexedDB for WASM builds +4. **gRPC** (`pkg/database/grpc/`): Remote database via gRPC **NEW** **Interface Segregation:** ```go @@ -451,6 +539,21 @@ type I interface { } ``` +#### Driver Registry Pattern **NEW** +```go +// pkg/database/ - Driver selection at runtime +database.HasDriver("badger") // Check availability +database.NewFromDriver("badger", config) // Create instance + +// pkg/acl/ - ACL driver selection +acl.HasDriver("follows") +acl.NewACLFromDriver("follows", config) + +// pkg/sync/ - Sync driver selection +sync.HasDriver("negentropy") +sync.NewSyncManager("negentropy", config) +``` + ### Domain Services Domain services encapsulate logic that doesn't belong to any single entity. @@ -512,6 +615,24 @@ func (l *Limiter) Wait(ctx context.Context, op OperationType) error - Separate setpoints for read/write operations - Emergency mode with hysteresis +#### Supervisor (Process Lifecycle Service) **NEW** +```go +// cmd/orly-launcher/supervisor.go +type Supervisor struct { + config *Config + processes map[string]*Process + mu sync.Mutex +} +func (s *Supervisor) Start() error +func (s *Supervisor) Stop() error +func (s *Supervisor) Restart(name string) error +func (s *Supervisor) IsRunning() bool +``` +- Manages process lifecycle with dependency ordering +- Health checks via gRPC before proceeding +- Crash recovery with configurable restart limits +- Graceful shutdown in reverse dependency order + ### Domain Events **Current State:** Domain events are implicit in message flow, not explicit types. @@ -529,6 +650,9 @@ func (l *Limiter) Wait(ctx context.Context, op OperationType) error | PolicyUpdated | Policy config event | `messagePauseMutex.Lock()` | | BlobUploaded | Blossom PUT success | Quota updated | | BlobDeleted | Blossom DELETE | Quota released | +| ProcessStarted | Supervisor start | Health check, dependency unlock | +| ProcessCrashed | Process exit | Restart attempt, alert | +| ConfigUpdated | Admin UI save | JSON persistence, reload | --- @@ -627,6 +751,13 @@ type BlobUploaded struct { Size int64 Timestamp time.Time } + +type ProcessStateChanged struct { + ServiceName string + OldState ProcessState + NewState ProcessState + Timestamp time.Time +} ``` ### 2. Strengthen Aggregate Boundaries @@ -690,6 +821,8 @@ func (s *EventService) ProcessIncomingEvent(ctx context.Context, ev *event.E, au | Blob | Binary content (images, media) | `blossom.BlobMeta` | | Spider | Event aggregator from external relays | `spider.Spider` | | Sync | Peer-to-peer replication | `sync.Manager` | +| Supervisor | Process lifecycle manager | `supervisor.Supervisor` | +| Driver | Pluggable implementation | Registry pattern | ``` ### 5. Add Domain-Specific Error Types @@ -706,6 +839,7 @@ var ( ErrQuotaExceeded = &DomainError{Code: "QUOTA_EXCEEDED"} ErrInviteCodeInvalid = &DomainError{Code: "INVITE_INVALID"} ErrBlobTooLarge = &DomainError{Code: "BLOB_TOO_LARGE"} + ErrServiceUnavailable = &DomainError{Code: "SERVICE_UNAVAILABLE"} ) ``` @@ -739,13 +873,16 @@ The context map is now documented in this file with integration patterns. - [x] Bounded contexts identified with clear boundaries - [x] Repositories abstract persistence for aggregate roots -- [x] Multiple repository implementations (Badger/Neo4j/WasmDB) +- [x] Multiple repository implementations (Badger/Neo4j/WasmDB/gRPC) - [x] Interface segregation prevents circular dependencies - [x] Configuration centralized (`app/config/config.go`) - [x] Per-connection aggregate isolation - [x] Access control as pluggable strategy pattern - [x] Value objects have immutable alternative (`EventRef`) - [x] Context map documented +- [x] Driver registry pattern for runtime selection +- [x] gRPC layer for distributed deployment +- [x] Process supervision for split mode ### Needs Attention @@ -774,13 +911,14 @@ The context map is now documented in this file with integration patterns. | File | Purpose | |------|---------| -| `app/server.go` | HTTP/WebSocket server setup (1240 lines) | -| `app/listener.go` | Connection aggregate (297 lines) | +| `app/server.go` | HTTP/WebSocket server setup | +| `app/listener.go` | Connection aggregate | | `app/handle-event.go` | EVENT message handler | | `app/handle-req.go` | REQ message handler | | `app/handle-auth.go` | AUTH message handler | | `app/handle-nip43.go` | NIP-43 membership handlers | | `app/handle-nip86.go` | NIP-86 management handlers | +| `app/handle-negentropy.go` | NIP-77 negentropy sync | | `app/handle-policy-config.go` | Policy configuration events | ### Infrastructure Files @@ -788,14 +926,30 @@ The context map is now documented in this file with integration patterns. | File | Purpose | |------|---------| | `pkg/database/database.go` | Badger implementation | +| `pkg/database/server/` | gRPC database server | +| `pkg/database/grpc/` | gRPC database client | | `pkg/neo4j/` | Neo4j implementation | | `pkg/wasmdb/` | WasmDB implementation | | `pkg/blossom/server.go` | Blossom blob storage server | | `pkg/ratelimit/limiter.go` | PID-based rate limiting | -| `pkg/sync/manager.go` | Distributed sync manager | -| `pkg/sync/cluster.go` | Cluster replication | +| `pkg/sync/negentropy/` | NIP-77 negentropy sync | +| `pkg/sync/cluster/` | HTTP cluster replication | | `pkg/spider/spider.go` | Event spider/aggregator | +### Command Binaries + +| Binary | Purpose | +|--------|---------| +| `cmd/orly-launcher/` | Process supervisor with admin UI | +| `cmd/orly-db-badger/` | Standalone Badger database server | +| `cmd/orly-db-neo4j/` | Standalone Neo4j database server | +| `cmd/orly-acl-follows/` | Follows-based ACL server | +| `cmd/orly-acl-managed/` | NIP-86 managed ACL server | +| `cmd/orly-acl-curation/` | Trust-tier curation ACL server | +| `cmd/orly-sync-negentropy/` | NIP-77 negentropy sync service | +| `cmd/orly-sync-cluster/` | Cluster replication service | +| `cmd/orly-certs/` | Certificate management service | + ### Interface Packages | Package | Purpose | @@ -810,7 +964,15 @@ The context map is now documented in this file with integration patterns. | `pkg/interfaces/store/` | Store interface with IdPkTs, EventRef | | `pkg/interfaces/typer/` | Type introspection interface | +### Protocol Buffer Definitions + +| Package | Purpose | +|---------|---------| +| `pkg/proto/orlydb/v1/` | Database gRPC service (250+ methods) | +| `pkg/proto/orlyacl/v1/` | ACL gRPC service | +| `pkg/proto/orlysync/` | Sync services (negentropy, cluster, etc.) | + --- -*Generated: 2025-12-24* -*Analysis based on ORLY codebase v0.36.14* +*Generated: 2026-01-24* +*Analysis based on ORLY codebase v0.56.4* diff --git a/app/listener.go b/app/listener.go index 0b6e110..f2b85d9 100644 --- a/app/listener.go +++ b/app/listener.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/websocket" "lol.mleku.dev/errorf" "lol.mleku.dev/log" + "next.orly.dev/app/config" "next.orly.dev/pkg/acl" "next.orly.dev/pkg/database" "git.mleku.dev/mleku/nostr/encoders/event" @@ -22,8 +23,11 @@ import ( ) type Listener struct { + // Server is the embedded server reference. + // Deprecated: Prefer using accessor methods (ServerConfig, ServerDatabase, etc.) + // instead of accessing Server fields directly. *Server - conn *websocket.Conn + conn *websocket.Conn ctx context.Context cancel context.CancelFunc // Cancel function for this listener's context remote string @@ -64,6 +68,21 @@ func (l *Listener) Ctx() context.Context { return l.ctx } +// ServerContext returns the server's context (distinct from the listener's own context). +func (l *Listener) ServerContext() context.Context { + return l.Server.Context() +} + +// ServerConfig returns the server's configuration. +func (l *Listener) ServerConfig() *config.C { + return l.Server.GetConfig() +} + +// ServerDatabase returns the server's database instance. +func (l *Listener) ServerDatabase() database.Database { + return l.Server.Database() +} + // DroppedMessages returns the total number of messages that were dropped // because the message processing queue was full. func (l *Listener) DroppedMessages() int { diff --git a/app/server.go b/app/server.go index 513d352..35d0150 100644 --- a/app/server.go +++ b/app/server.go @@ -46,13 +46,23 @@ import ( ) type Server struct { - mux *http.ServeMux - Config *config.C - Ctx context.Context + mux *http.ServeMux + // Config holds the relay configuration. + // Deprecated: Use GetConfig() method instead of accessing directly. + Config *config.C + // Ctx holds the server context. + // Deprecated: Use Context() method instead of accessing directly. + Ctx context.Context publishers *publish.S - Admins [][]byte - Owners [][]byte - DB database.Database // Changed from embedded *database.D to interface field + // Admins holds the admin pubkeys. + // Deprecated: Use IsAdmin() method instead of accessing directly. + Admins [][]byte + // Owners holds the owner pubkeys. + // Deprecated: Use IsOwner() method instead of accessing directly. + Owners [][]byte + // DB holds the database instance. + // Deprecated: Use Database() method instead of accessing directly. + DB database.Database // optional reverse proxy for dev web server devProxy *httputil.ReverseProxy @@ -110,6 +120,47 @@ type Server struct { brandingMgr *branding.Manager } +// ============================================================================= +// Server Accessor Methods +// ============================================================================= + +// GetConfig returns the relay configuration. +func (s *Server) GetConfig() *config.C { + return s.Config +} + +// Context returns the server context. +func (s *Server) Context() context.Context { + return s.Ctx +} + +// Database returns the database instance. +func (s *Server) Database() database.Database { + return s.DB +} + +// IsAdmin returns true if the given pubkey is an admin. +func (s *Server) IsAdmin(pubkey []byte) bool { + pubHex := string(hex.Enc(pubkey)) + for _, admin := range s.Admins { + if string(hex.Enc(admin)) == pubHex { + return true + } + } + return false +} + +// IsOwner returns true if the given pubkey is an owner. +func (s *Server) IsOwner(pubkey []byte) bool { + pubHex := string(hex.Enc(pubkey)) + for _, owner := range s.Owners { + if string(hex.Enc(owner)) == pubHex { + return true + } + } + return false +} + // isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system func (s *Server) isIPBlacklisted(remote string) bool { // Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1") diff --git a/docs/GLOSSARY.md b/docs/GLOSSARY.md new file mode 100644 index 0000000..0cce2b2 --- /dev/null +++ b/docs/GLOSSARY.md @@ -0,0 +1,352 @@ +# ORLY Relay Domain Glossary + +This glossary defines the ubiquitous language used throughout the ORLY codebase. +All contributors should use these terms consistently in code, comments, and documentation. + +--- + +## Core Domain Concepts + +### Event +A Nostr event as defined in NIP-01. The fundamental unit of data in the Nostr protocol. +Contains: id, pubkey, created_at, kind, tags, content, sig. + +- **Code location**: `git.mleku.dev/mleku/nostr/encoders/event` +- **Key type**: `event.E` + +### Serial +A monotonically increasing 40-bit identifier assigned to each event upon storage. +Used for efficient range queries, synchronization, and garbage collection ordering. + +- **Code location**: `pkg/database/indexes/types.Uint40` +- **Related types**: EventRef, IdPkTs + +### Pubkey +A 32-byte secp256k1 public key identifying a Nostr user. Stored as binary internally, +displayed as 64-character lowercase hex or bech32 npub format externally. + +- **Code location**: `git.mleku.dev/mleku/nostr/types.Pubkey` + +### Access Level +The permission tier granted to a pubkey. Determines what operations are allowed. + +| Level | Description | +|-------|-------------| +| `none` | No access, authentication required | +| `read` | Read-only access (REQ allowed, EVENT denied) | +| `write` | Read and write access | +| `admin` | Write + import/export + arbitrary delete | +| `owner` | Admin + wipe + system configuration | +| `blocked` | IP address blocked | +| `banned` | Pubkey banned | + +- **Code location**: `pkg/interfaces/acl/acl.go` constants + +### ACL (Access Control List) +The authorization system that determines access levels for pubkeys and IP addresses. +Supports multiple modes with different authorization strategies. + +- **Code location**: `pkg/acl/` +- **Interface**: `pkg/interfaces/acl/acl.go` + +--- + +## Event Processing Pipeline + +The event processing pipeline transforms incoming WebSocket messages into stored events. +Each stage has distinct responsibilities and produces typed results. + +``` +Raw JSON → Validation → Authorization → Routing → Processing → Delivery +``` + +### Validation +The process of verifying event structure, signature, and protocol compliance. + +Checks performed: +- Raw JSON validation (hex case normalization) +- Event ID verification (hash matches content) +- Signature verification (schnorr signature valid) +- Timestamp validation (not too far in future) +- NIP-70 protected tag validation + +- **Code location**: `pkg/event/validation/` +- **Result type**: `validation.Result` with `Valid`, `Code`, `Msg` + +### Authorization +The decision process determining if an event is allowed based on ACL and policy. +Returns a structured decision with access level and deny reason. + +- **Code location**: `pkg/event/authorization/` +- **Result type**: `authorization.Decision` with `Allowed`, `AccessLevel`, `DenyReason`, `RequireAuth` + +### Routing +Dispatching events to specialized handlers based on event kind. +Determines whether events should be processed normally, delivered ephemerally, or handled specially. + +Examples: +- Ephemeral events (kinds 20000-29999): Deliver without storage +- Delete events (kind 5): Trigger deletion cascade +- NIP-43 events (kinds 28934, 28936): Membership requests + +- **Code location**: `pkg/event/routing/` +- **Result type**: `routing.Result` with `Action`, `Error` + +### Processing +The final stage: persisting events, running post-save hooks, and delivering to subscribers. +Handles deduplication, replaceable event logic, and event delivery. + +- **Code location**: `pkg/event/processing/` +- **Result type**: `processing.Result` with `Saved`, `Duplicate`, `Blocked`, `Error` + +--- + +## ACL Modes + +### None Mode +Open relay - all pubkeys have write access by default. +No authentication required unless explicitly configured via `ORLY_AUTH_REQUIRED`. + +- **Code location**: `pkg/acl/none.go` + +### Follows Mode +Whitelist based on admin/owner follow lists (kind 3 events). +Followed pubkeys get write access; others get read-only or denied based on configuration. +Supports progressive throttling for non-followed users. + +- **Code location**: `pkg/acl/follows.go` +- **Config**: `ORLY_ACL_MODE=follows` + +### Managed Mode +Fine-grained control via NIP-86 management API. +Supports pubkey bans, event bans, IP blocks, kind restrictions, and custom rules. +All management operations require NIP-98 HTTP authentication. + +- **Code location**: `pkg/acl/managed.go` +- **Config**: `ORLY_ACL_MODE=managed` + +### Curating Mode +Curator-based content moderation system. +Curators can approve/reject events from non-followed users. +Events from non-curated users are held pending approval. + +- **Code location**: `pkg/acl/curating.go` +- **Config**: `ORLY_ACL_MODE=curating` + +--- + +## Protocol Concepts + +### NIP-42 Authentication +Challenge-response authentication for WebSocket connections. +Used to verify pubkey ownership before granting elevated access. + +Flow: +1. Relay sends AUTH challenge with random string +2. Client signs challenge with private key +3. Relay verifies signature and grants access level + +- **Code location**: `pkg/protocol/auth/` + +### NIP-70 Protected Events +Events with `-` (protected) tag that can only be replaced/deleted by the author. +Prevents relays from accepting replacements from unauthorized pubkeys. + +- **Tag format**: `["-"]` in tags array + +### NIP-43 Relay Access +Invite-based membership system for restricted relays. +Supports join requests, leave requests, and membership tracking. + +| Kind | Purpose | +|------|---------| +| 28934 | Join request with invite code | +| 28936 | Leave request | +| 8000 | Member added (relay-published) | + +- **Code location**: `pkg/protocol/nip43/` + +### NIP-86 Relay Management +HTTP JSON-RPC API for relay administration. +Requires NIP-98 HTTP authentication with admin/owner access level. + +- **Endpoint**: `/api/v1/management` +- **Code location**: `app/handle-nip86.go` + +### NIP-77 Negentropy +Set reconciliation protocol for efficient relay-to-relay synchronization. +Uses negentropy algorithm to identify missing events with minimal bandwidth. + +- **Code location**: `pkg/sync/negentropy/` +- **Envelope types**: NEG-OPEN, NEG-MSG, NEG-CLOSE, NEG-ERR + +--- + +## Infrastructure Concepts + +### Publisher +The event delivery system that sends events to subscribers. +Composed of multiple publisher implementations (socket, internal, etc.). + +- **Code location**: `pkg/protocol/publish/` +- **Interface**: `pkg/interfaces/publisher/publisher.go` + +### Sprocket +External event processing plugin (JavaScript/Rhai script). +Can accept, reject, or shadow-reject events before normal processing. + +| Action | Effect | +|--------|--------| +| accept | Event proceeds to normal processing | +| reject | Event rejected with error message | +| shadowReject | Event appears accepted but is not stored | + +- **Code location**: `app/sprocket.go` + +### Policy Manager +Rule-based event filtering system configured via JSON or kind 30078 events. +Evaluates events against configurable rules for allow/deny decisions. + +- **Code location**: `pkg/policy/` +- **Config file**: `~/.config/orly/policy.json` + +### Rate Limiter +PID controller-based adaptive throttling system. +Adjusts delays based on system load (memory pressure, write throughput). + +- **Code location**: `pkg/ratelimit/` +- **Interface**: `pkg/interfaces/loadmonitor/` + +### Supervisor +Process lifecycle manager for split IPC mode deployment. +Manages database, ACL, sync, and relay processes with dependency ordering. + +- **Code location**: `cmd/orly-launcher/supervisor.go` + +--- + +## Data Types + +### EventRef +Stack-allocated event reference with fixed-size ID and pubkey arrays. +80 bytes total, fits in cache line, safe for concurrent use. +Immutable - all fields are unexported with accessor methods. + +```go +type EventRef struct { + id ntypes.EventID // 32 bytes + pub ntypes.Pubkey // 32 bytes + ts int64 // 8 bytes + ser uint64 // 8 bytes +} +``` + +- **Code location**: `pkg/interfaces/store/store_interface.go` + +### IdPkTs +Event reference with slice-based fields for backward compatibility. +Mutable - use `ToEventRef()` for safe concurrent access. + +```go +type IdPkTs struct { + Id []byte // Event ID + Pub []byte // Pubkey + Ts int64 // Timestamp + Ser uint64 // Serial number +} +``` + +- **Code location**: `pkg/interfaces/store/store_interface.go` + +### Decision +Authorization result carrying allowed status, access level, and context. +Used to communicate authorization outcomes through the pipeline. + +- **Code location**: `pkg/event/authorization/authorization.go` + +### Result (Validation) +Validation outcome with Valid bool, ReasonCode enum, and message string. +Codes: ReasonNone, ReasonBlocked, ReasonInvalid, ReasonError. + +- **Code location**: `pkg/event/validation/validation.go` + +### Result (Processing) +Processing outcome indicating whether event was saved, duplicate, or blocked. +Includes error field for unexpected failures. + +- **Code location**: `pkg/event/processing/processing.go` + +--- + +## Subscription Terminology + +Note: "Subscription" has two distinct meanings in the codebase: + +### Event Subscription (REQ) +An active filter receiving matching events in real-time. +Created via REQ envelope, cancelled via CLOSE envelope. +Stored in `Listener.subscriptions` map with cancel function. + +- **Code location**: `app/listener.go` subscriptions field + +### Payment Subscription +Paid access tier granting elevated permissions for a time period. +Managed via NWC payments or manual extension. + +- **Code location**: `pkg/database/interface.go` Subscription type + +--- + +## Sync Terminology + +### Spider +Event aggregator that fetches events from external relays. +Subscribes to events for followed pubkeys on configured relay lists. + +- **Code location**: `pkg/spider/` + +### Sync +Peer-to-peer replication between relay instances. +Multiple implementations: negentropy, cluster, distributed, relaygroup. + +- **Code location**: `pkg/sync/` + +### Cluster +Group of relay instances sharing events via HTTP-based pull replication. +Membership tracked via kind 39108 events. + +- **Code location**: `pkg/sync/cluster/` + +### Relay Group +Configuration of relay sets for synchronized operation. +Tracked via kind 39105 events. + +- **Code location**: `pkg/sync/relaygroup/` + +--- + +## Driver Pattern + +### Driver +A pluggable implementation of a core interface. +Selected at runtime via configuration or command-line flags. + +Examples: +- Database drivers: badger, neo4j, wasmdb, grpc +- ACL drivers: none, follows, managed, curating +- Sync drivers: negentropy, cluster, distributed, relaygroup + +```go +// Check if driver is available +database.HasDriver("badger") + +// Create instance from driver +db := database.NewFromDriver("badger", config) +``` + +- **Pattern location**: `pkg/database/`, `pkg/acl/`, `pkg/sync/` + +--- + +*Last updated: 2026-01-24* +*Based on ORLY codebase v0.56.4* diff --git a/pkg/acl/acl.go b/pkg/acl/acl.go index af4f48f..36f89e4 100644 --- a/pkg/acl/acl.go +++ b/pkg/acl/acl.go @@ -17,10 +17,48 @@ func (s *S) SetMode(m string) { } type S struct { - ACL []acliface.I + // ACL holds registered ACL implementations. + // Deprecated: Use GetACLByType() or ListRegisteredACLs() instead of accessing directly. + ACL []acliface.I + // Active holds the name of the currently active ACL mode. + // Deprecated: Use GetMode() instead of Active.Load(). Active atomic.String } +// GetMode returns the currently active ACL mode name. +func (s *S) GetMode() string { + return s.Active.Load() +} + +// GetACLByType returns the ACL implementation with the given type name, or nil if not found. +func (s *S) GetACLByType(typ string) acliface.I { + for _, i := range s.ACL { + if i.Type() == typ { + return i + } + } + return nil +} + +// GetActiveACL returns the currently active ACL implementation, or nil if none is active. +func (s *S) GetActiveACL() acliface.I { + return s.GetACLByType(s.Active.Load()) +} + +// ListRegisteredACLs returns the type names of all registered ACL implementations. +func (s *S) ListRegisteredACLs() []string { + types := make([]string, 0, len(s.ACL)) + for _, i := range s.ACL { + types = append(types, i.Type()) + } + return types +} + +// IsRegistered returns true if an ACL with the given type is registered. +func (s *S) IsRegistered(typ string) bool { + return s.GetACLByType(typ) != nil +} + type A struct{ S } func (s *S) Register(i acliface.I) { diff --git a/pkg/acl/curating.go b/pkg/acl/curating.go index 4ce2591..a93aa18 100644 --- a/pkg/acl/curating.go +++ b/pkg/acl/curating.go @@ -34,6 +34,8 @@ const ( // - Blacklisted: Cannot publish // - Unclassified: Rate-limited publishing (default 50/day) type Curating struct { + // Ctx holds the context for the ACL. + // Deprecated: Use Context() method instead of accessing directly. Ctx context.Context cfg *config.C db database.Database @@ -50,6 +52,11 @@ type Curating struct { cacheMx sync.RWMutex } +// Context returns the ACL context. +func (c *Curating) Context() context.Context { + return c.Ctx +} + func (c *Curating) Configure(cfg ...any) (err error) { log.I.F("configuring curating ACL") for _, ca := range cfg { diff --git a/pkg/acl/follows.go b/pkg/acl/follows.go index b01e3d3..0375c93 100644 --- a/pkg/acl/follows.go +++ b/pkg/acl/follows.go @@ -31,6 +31,8 @@ import ( ) type Follows struct { + // Ctx holds the context for the ACL. + // Deprecated: Use Context() method instead of accessing directly. Ctx context.Context cfg *config.C db database.Database @@ -51,6 +53,11 @@ type Follows struct { throttle *ProgressiveThrottle } +// Context returns the ACL context. +func (f *Follows) Context() context.Context { + return f.Ctx +} + func (f *Follows) Configure(cfg ...any) (err error) { log.I.F("configuring follows ACL") for _, ca := range cfg { diff --git a/pkg/acl/managed.go b/pkg/acl/managed.go index 6af5973..0a734ce 100644 --- a/pkg/acl/managed.go +++ b/pkg/acl/managed.go @@ -17,6 +17,8 @@ import ( ) type Managed struct { + // Ctx holds the context for the ACL. + // Deprecated: Use Context() method instead of accessing directly. Ctx context.Context cfg *config.C db database.Database @@ -27,6 +29,11 @@ type Managed struct { mx sync.RWMutex } +// Context returns the ACL context. +func (m *Managed) Context() context.Context { + return m.Ctx +} + func (m *Managed) Configure(cfg ...any) (err error) { log.I.F("configuring managed ACL") for _, ca := range cfg { diff --git a/pkg/domain/errors/errors.go b/pkg/domain/errors/errors.go new file mode 100644 index 0000000..e0a644a --- /dev/null +++ b/pkg/domain/errors/errors.go @@ -0,0 +1,561 @@ +// Package errors provides domain-specific error types for the ORLY relay. +// These typed errors enable structured error handling, machine-readable error codes, +// and proper error categorization throughout the codebase. +package errors + +import ( + "fmt" +) + +// DomainError is the base interface for all domain errors. +// It extends the standard error interface with structured metadata. +type DomainError interface { + error + Code() string // Machine-readable error code (e.g., "INVALID_ID") + Category() string // Error category for grouping (e.g., "validation") + IsRetryable() bool // Whether the operation can be retried +} + +// Base provides common implementation for all domain errors. +type Base struct { + code string + category string + message string + retryable bool + cause error +} + +func (e *Base) Error() string { + if e.cause != nil { + return fmt.Sprintf("%s: %v", e.message, e.cause) + } + return e.message +} + +func (e *Base) Code() string { return e.code } +func (e *Base) Category() string { return e.category } +func (e *Base) IsRetryable() bool { return e.retryable } +func (e *Base) Unwrap() error { return e.cause } + +// WithCause returns a copy of the error with the given cause. +func (e *Base) WithCause(cause error) *Base { + return &Base{ + code: e.code, + category: e.category, + message: e.message, + retryable: e.retryable, + cause: cause, + } +} + +// WithMessage returns a copy of the error with the given message. +func (e *Base) WithMessage(msg string) *Base { + return &Base{ + code: e.code, + category: e.category, + message: msg, + retryable: e.retryable, + cause: e.cause, + } +} + +// ============================================================================= +// Validation Errors +// ============================================================================= + +// ValidationError represents an error in event validation. +type ValidationError struct { + Base + Field string // The field that failed validation +} + +// NewValidationError creates a new validation error. +func NewValidationError(code, field, message string) *ValidationError { + return &ValidationError{ + Base: Base{ + code: code, + category: "validation", + message: message, + }, + Field: field, + } +} + +// WithField returns a copy with the specified field. +func (e *ValidationError) WithField(field string) *ValidationError { + return &ValidationError{ + Base: e.Base, + Field: field, + } +} + +// Validation error constants +var ( + ErrInvalidEventID = NewValidationError( + "INVALID_ID", + "id", + "event ID does not match computed hash", + ) + ErrInvalidSignature = NewValidationError( + "INVALID_SIG", + "sig", + "signature verification failed", + ) + ErrFutureTimestamp = NewValidationError( + "FUTURE_TS", + "created_at", + "timestamp too far in future", + ) + ErrPastTimestamp = NewValidationError( + "PAST_TS", + "created_at", + "timestamp too far in past", + ) + ErrUppercaseHex = NewValidationError( + "UPPERCASE_HEX", + "id/pubkey", + "hex values must be lowercase", + ) + ErrProtectedTagMismatch = NewValidationError( + "PROTECTED_TAG", + "tags", + "protected event can only be modified by author", + ) + ErrInvalidJSON = NewValidationError( + "INVALID_JSON", + "", + "malformed JSON", + ) + ErrEventTooLarge = NewValidationError( + "EVENT_TOO_LARGE", + "content", + "event exceeds size limit", + ) + ErrInvalidKind = NewValidationError( + "INVALID_KIND", + "kind", + "event kind not allowed", + ) + ErrMissingTag = NewValidationError( + "MISSING_TAG", + "tags", + "required tag missing", + ) + ErrInvalidTagValue = NewValidationError( + "INVALID_TAG", + "tags", + "tag value validation failed", + ) +) + +// ============================================================================= +// Authorization Errors +// ============================================================================= + +// AuthorizationError represents an authorization failure. +type AuthorizationError struct { + Base + Pubkey []byte // The pubkey that was denied + AccessLevel string // The access level that was required + RequireAuth bool // Whether authentication might resolve this +} + +// NeedsAuth returns true if authentication might resolve this error. +func (e *AuthorizationError) NeedsAuth() bool { return e.RequireAuth } + +// NewAuthRequired creates an error indicating authentication is required. +func NewAuthRequired(reason string) *AuthorizationError { + return &AuthorizationError{ + Base: Base{ + code: "AUTH_REQUIRED", + category: "authorization", + message: reason, + }, + RequireAuth: true, + } +} + +// NewAccessDenied creates an error indicating access was denied. +func NewAccessDenied(level, reason string) *AuthorizationError { + return &AuthorizationError{ + Base: Base{ + code: "ACCESS_DENIED", + category: "authorization", + message: reason, + }, + AccessLevel: level, + } +} + +// WithPubkey returns a copy with the specified pubkey. +func (e *AuthorizationError) WithPubkey(pubkey []byte) *AuthorizationError { + return &AuthorizationError{ + Base: e.Base, + Pubkey: pubkey, + AccessLevel: e.AccessLevel, + RequireAuth: e.RequireAuth, + } +} + +// Authorization error constants +var ( + ErrAuthRequired = NewAuthRequired("authentication required") + ErrBanned = &AuthorizationError{ + Base: Base{ + code: "BANNED", + category: "authorization", + message: "pubkey banned", + }, + } + ErrIPBlocked = &AuthorizationError{ + Base: Base{ + code: "IP_BLOCKED", + category: "authorization", + message: "IP address blocked", + }, + } + ErrNotFollowed = &AuthorizationError{ + Base: Base{ + code: "NOT_FOLLOWED", + category: "authorization", + message: "write access requires being followed by admin", + }, + } + ErrNotMember = &AuthorizationError{ + Base: Base{ + code: "NOT_MEMBER", + category: "authorization", + message: "membership required", + }, + } + ErrInsufficientAccess = &AuthorizationError{ + Base: Base{ + code: "INSUFFICIENT_ACCESS", + category: "authorization", + message: "insufficient access level", + }, + } +) + +// ============================================================================= +// Processing Errors +// ============================================================================= + +// ProcessingError represents an error during event processing. +type ProcessingError struct { + Base + EventID []byte // The event ID (if known) + Kind uint16 // The event kind (if known) +} + +// NewProcessingError creates a new processing error. +func NewProcessingError(code string, message string, retryable bool) *ProcessingError { + return &ProcessingError{ + Base: Base{ + code: code, + category: "processing", + message: message, + retryable: retryable, + }, + } +} + +// WithEventID returns a copy with the specified event ID. +func (e *ProcessingError) WithEventID(id []byte) *ProcessingError { + return &ProcessingError{ + Base: e.Base, + EventID: id, + Kind: e.Kind, + } +} + +// WithKind returns a copy with the specified kind. +func (e *ProcessingError) WithKind(kind uint16) *ProcessingError { + return &ProcessingError{ + Base: e.Base, + EventID: e.EventID, + Kind: kind, + } +} + +// Processing error constants +var ( + ErrDuplicate = NewProcessingError( + "DUPLICATE", + "event already exists", + false, + ) + ErrReplaceNotAllowed = NewProcessingError( + "REPLACE_DENIED", + "cannot replace event from different author", + false, + ) + ErrDeletedEvent = NewProcessingError( + "DELETED", + "event has been deleted", + false, + ) + ErrEphemeralNotStored = NewProcessingError( + "EPHEMERAL", + "ephemeral events are not stored", + false, + ) + ErrRateLimited = NewProcessingError( + "RATE_LIMITED", + "rate limit exceeded", + true, + ) + ErrSprocketRejected = NewProcessingError( + "SPROCKET_REJECTED", + "rejected by sprocket", + false, + ) +) + +// ============================================================================= +// Policy Errors +// ============================================================================= + +// PolicyError represents a policy violation. +type PolicyError struct { + Base + RuleName string // The rule that was violated + Action string // The action taken (block, reject) +} + +// NewPolicyBlocked creates an error for a blocked event. +func NewPolicyBlocked(ruleName, reason string) *PolicyError { + return &PolicyError{ + Base: Base{ + code: "POLICY_BLOCKED", + category: "policy", + message: reason, + }, + RuleName: ruleName, + Action: "block", + } +} + +// NewPolicyRejected creates an error for a rejected event. +func NewPolicyRejected(ruleName, reason string) *PolicyError { + return &PolicyError{ + Base: Base{ + code: "POLICY_REJECTED", + category: "policy", + message: reason, + }, + RuleName: ruleName, + Action: "reject", + } +} + +// Policy error constants +var ( + ErrKindBlocked = &PolicyError{ + Base: Base{ + code: "KIND_BLOCKED", + category: "policy", + message: "event kind not allowed by policy", + }, + Action: "block", + } + ErrPubkeyBlocked = &PolicyError{ + Base: Base{ + code: "PUBKEY_BLOCKED", + category: "policy", + message: "pubkey blocked by policy", + }, + Action: "block", + } + ErrContentBlocked = &PolicyError{ + Base: Base{ + code: "CONTENT_BLOCKED", + category: "policy", + message: "content blocked by policy", + }, + Action: "block", + } + ErrScriptRejected = &PolicyError{ + Base: Base{ + code: "SCRIPT_REJECTED", + category: "policy", + message: "rejected by policy script", + }, + Action: "reject", + } +) + +// ============================================================================= +// Storage Errors +// ============================================================================= + +// StorageError represents a storage-layer error. +type StorageError struct { + Base +} + +// NewStorageError creates a new storage error. +func NewStorageError(code, message string, cause error, retryable bool) *StorageError { + return &StorageError{ + Base: Base{ + code: code, + category: "storage", + message: message, + cause: cause, + retryable: retryable, + }, + } +} + +// Storage error constants +var ( + ErrDatabaseUnavailable = NewStorageError( + "DB_UNAVAILABLE", + "database not available", + nil, + true, + ) + ErrWriteTimeout = NewStorageError( + "WRITE_TIMEOUT", + "write operation timed out", + nil, + true, + ) + ErrReadTimeout = NewStorageError( + "READ_TIMEOUT", + "read operation timed out", + nil, + true, + ) + ErrStorageFull = NewStorageError( + "STORAGE_FULL", + "storage capacity exceeded", + nil, + false, + ) + ErrCorruptedData = NewStorageError( + "CORRUPTED_DATA", + "data corruption detected", + nil, + false, + ) +) + +// ============================================================================= +// Service Errors +// ============================================================================= + +// ServiceError represents a service-level error. +type ServiceError struct { + Base + ServiceName string +} + +// NewServiceError creates a new service error. +func NewServiceError(code, service, message string, retryable bool) *ServiceError { + return &ServiceError{ + Base: Base{ + code: code, + category: "service", + message: message, + retryable: retryable, + }, + ServiceName: service, + } +} + +// Service error constants +var ( + ErrServiceUnavailable = NewServiceError( + "SERVICE_UNAVAILABLE", + "", + "service temporarily unavailable", + true, + ) + ErrServiceTimeout = NewServiceError( + "SERVICE_TIMEOUT", + "", + "service request timed out", + true, + ) + ErrServiceOverloaded = NewServiceError( + "SERVICE_OVERLOADED", + "", + "service is overloaded", + true, + ) +) + +// ============================================================================= +// Helper Functions +// ============================================================================= + +// Is checks if an error matches a target domain error by code. +func Is(err error, target DomainError) bool { + if de, ok := err.(DomainError); ok { + return de.Code() == target.Code() + } + return false +} + +// Code returns the error code if err is a DomainError, empty string otherwise. +func Code(err error) string { + if de, ok := err.(DomainError); ok { + return de.Code() + } + return "" +} + +// Category returns the category of a domain error, or "unknown" for other errors. +func Category(err error) string { + if de, ok := err.(DomainError); ok { + return de.Category() + } + return "unknown" +} + +// IsRetryable checks if an error indicates the operation can be retried. +func IsRetryable(err error) bool { + if de, ok := err.(DomainError); ok { + return de.IsRetryable() + } + return false +} + +// IsValidation checks if an error is a validation error. +func IsValidation(err error) bool { + _, ok := err.(*ValidationError) + return ok +} + +// IsAuthorization checks if an error is an authorization error. +func IsAuthorization(err error) bool { + _, ok := err.(*AuthorizationError) + return ok +} + +// IsProcessing checks if an error is a processing error. +func IsProcessing(err error) bool { + _, ok := err.(*ProcessingError) + return ok +} + +// IsPolicy checks if an error is a policy error. +func IsPolicy(err error) bool { + _, ok := err.(*PolicyError) + return ok +} + +// IsStorage checks if an error is a storage error. +func IsStorage(err error) bool { + _, ok := err.(*StorageError) + return ok +} + +// NeedsAuth checks if an authorization error requires authentication. +func NeedsAuth(err error) bool { + if ae, ok := err.(*AuthorizationError); ok { + return ae.NeedsAuth() + } + return false +} diff --git a/pkg/domain/errors/errors_test.go b/pkg/domain/errors/errors_test.go new file mode 100644 index 0000000..72fe741 --- /dev/null +++ b/pkg/domain/errors/errors_test.go @@ -0,0 +1,210 @@ +package errors + +import ( + "errors" + "testing" +) + +func TestValidationError(t *testing.T) { + err := ErrInvalidEventID + if err.Code() != "INVALID_ID" { + t.Errorf("expected code INVALID_ID, got %s", err.Code()) + } + if err.Category() != "validation" { + t.Errorf("expected category validation, got %s", err.Category()) + } + if err.Field != "id" { + t.Errorf("expected field id, got %s", err.Field) + } + if err.IsRetryable() { + t.Error("validation errors should not be retryable") + } +} + +func TestAuthorizationError(t *testing.T) { + err := ErrAuthRequired + if err.Code() != "AUTH_REQUIRED" { + t.Errorf("expected code AUTH_REQUIRED, got %s", err.Code()) + } + if !err.NeedsAuth() { + t.Error("ErrAuthRequired should indicate auth is needed") + } + + err2 := ErrBanned + if err2.NeedsAuth() { + t.Error("ErrBanned should not indicate auth is needed") + } +} + +func TestProcessingError(t *testing.T) { + err := ErrRateLimited + if err.Code() != "RATE_LIMITED" { + t.Errorf("expected code RATE_LIMITED, got %s", err.Code()) + } + if !err.IsRetryable() { + t.Error("rate limit errors should be retryable") + } + + err2 := ErrDuplicate + if err2.IsRetryable() { + t.Error("duplicate errors should not be retryable") + } +} + +func TestStorageError(t *testing.T) { + err := ErrDatabaseUnavailable + if err.Code() != "DB_UNAVAILABLE" { + t.Errorf("expected code DB_UNAVAILABLE, got %s", err.Code()) + } + if err.Category() != "storage" { + t.Errorf("expected category storage, got %s", err.Category()) + } + if !err.IsRetryable() { + t.Error("database unavailable should be retryable") + } +} + +func TestPolicyError(t *testing.T) { + err := NewPolicyBlocked("kind_whitelist", "kind 30023 not allowed") + if err.Code() != "POLICY_BLOCKED" { + t.Errorf("expected code POLICY_BLOCKED, got %s", err.Code()) + } + if err.RuleName != "kind_whitelist" { + t.Errorf("expected rule name kind_whitelist, got %s", err.RuleName) + } + if err.Action != "block" { + t.Errorf("expected action block, got %s", err.Action) + } +} + +func TestIsHelper(t *testing.T) { + if !Is(ErrInvalidEventID, ErrInvalidEventID) { + t.Error("Is should return true for same error") + } + if Is(ErrInvalidEventID, ErrInvalidSignature) { + t.Error("Is should return false for different errors") + } + + // Test with non-domain error + stdErr := errors.New("standard error") + if Is(stdErr, ErrInvalidEventID) { + t.Error("Is should return false for non-domain errors") + } +} + +func TestCodeHelper(t *testing.T) { + if Code(ErrInvalidEventID) != "INVALID_ID" { + t.Errorf("expected INVALID_ID, got %s", Code(ErrInvalidEventID)) + } + + stdErr := errors.New("standard error") + if Code(stdErr) != "" { + t.Errorf("expected empty string for non-domain error, got %s", Code(stdErr)) + } +} + +func TestCategoryHelper(t *testing.T) { + if Category(ErrInvalidEventID) != "validation" { + t.Errorf("expected validation, got %s", Category(ErrInvalidEventID)) + } + if Category(ErrBanned) != "authorization" { + t.Errorf("expected authorization, got %s", Category(ErrBanned)) + } + if Category(ErrDuplicate) != "processing" { + t.Errorf("expected processing, got %s", Category(ErrDuplicate)) + } + + stdErr := errors.New("standard error") + if Category(stdErr) != "unknown" { + t.Errorf("expected unknown for non-domain error, got %s", Category(stdErr)) + } +} + +func TestIsRetryableHelper(t *testing.T) { + if !IsRetryable(ErrRateLimited) { + t.Error("ErrRateLimited should be retryable") + } + if !IsRetryable(ErrDatabaseUnavailable) { + t.Error("ErrDatabaseUnavailable should be retryable") + } + if IsRetryable(ErrDuplicate) { + t.Error("ErrDuplicate should not be retryable") + } + if IsRetryable(ErrInvalidEventID) { + t.Error("ErrInvalidEventID should not be retryable") + } +} + +func TestTypeCheckers(t *testing.T) { + if !IsValidation(ErrInvalidEventID) { + t.Error("ErrInvalidEventID should be a validation error") + } + if !IsAuthorization(ErrBanned) { + t.Error("ErrBanned should be an authorization error") + } + if !IsProcessing(ErrDuplicate) { + t.Error("ErrDuplicate should be a processing error") + } + if !IsPolicy(ErrKindBlocked) { + t.Error("ErrKindBlocked should be a policy error") + } + if !IsStorage(ErrDatabaseUnavailable) { + t.Error("ErrDatabaseUnavailable should be a storage error") + } +} + +func TestNeedsAuthHelper(t *testing.T) { + if !NeedsAuth(ErrAuthRequired) { + t.Error("ErrAuthRequired should need auth") + } + if NeedsAuth(ErrBanned) { + t.Error("ErrBanned should not need auth") + } + if NeedsAuth(ErrInvalidEventID) { + t.Error("validation errors should not need auth") + } +} + +func TestWithCause(t *testing.T) { + cause := errors.New("underlying error") + err := ErrDatabaseUnavailable.Base.WithCause(cause) + if err.Unwrap() != cause { + t.Error("WithCause should set the cause") + } + if err.Error() != "database not available: underlying error" { + t.Errorf("unexpected error message: %s", err.Error()) + } +} + +func TestWithMessage(t *testing.T) { + err := ErrInvalidEventID.Base.WithMessage("custom message") + if err.Error() != "custom message" { + t.Errorf("expected custom message, got %s", err.Error()) + } + if err.Code() != "INVALID_ID" { + t.Error("code should be preserved") + } +} + +func TestWithPubkey(t *testing.T) { + pubkey := []byte{1, 2, 3, 4} + err := ErrBanned.WithPubkey(pubkey) + if len(err.Pubkey) != 4 { + t.Error("pubkey should be set") + } +} + +func TestWithEventID(t *testing.T) { + eventID := []byte{1, 2, 3, 4} + err := ErrDuplicate.WithEventID(eventID) + if len(err.EventID) != 4 { + t.Error("event ID should be set") + } +} + +func TestWithKind(t *testing.T) { + err := ErrDuplicate.WithKind(30023) + if err.Kind != 30023 { + t.Errorf("expected kind 30023, got %d", err.Kind) + } +} diff --git a/pkg/domain/events/dispatcher.go b/pkg/domain/events/dispatcher.go new file mode 100644 index 0000000..10d594e --- /dev/null +++ b/pkg/domain/events/dispatcher.go @@ -0,0 +1,315 @@ +package events + +import ( + "context" + "sync" + "sync/atomic" + + "lol.mleku.dev/log" +) + +// Subscriber handles domain events. +type Subscriber interface { + // Handle processes the given domain event. + Handle(event DomainEvent) + // Supports returns true if this subscriber handles the given event type. + Supports(eventType string) bool +} + +// SubscriberFunc is a function that can be used as a Subscriber. +type SubscriberFunc struct { + handleFunc func(DomainEvent) + supportsFunc func(string) bool +} + +// Handle implements Subscriber. +func (s *SubscriberFunc) Handle(event DomainEvent) { + s.handleFunc(event) +} + +// Supports implements Subscriber. +func (s *SubscriberFunc) Supports(eventType string) bool { + return s.supportsFunc(eventType) +} + +// NewSubscriberFunc creates a new SubscriberFunc. +func NewSubscriberFunc(handle func(DomainEvent), supports func(string) bool) *SubscriberFunc { + return &SubscriberFunc{ + handleFunc: handle, + supportsFunc: supports, + } +} + +// NewSubscriberForTypes creates a subscriber that handles specific event types. +func NewSubscriberForTypes(handle func(DomainEvent), types ...string) *SubscriberFunc { + typeSet := make(map[string]struct{}, len(types)) + for _, t := range types { + typeSet[t] = struct{}{} + } + return &SubscriberFunc{ + handleFunc: handle, + supportsFunc: func(eventType string) bool { + _, ok := typeSet[eventType] + return ok + }, + } +} + +// NewSubscriberForAll creates a subscriber that handles all event types. +func NewSubscriberForAll(handle func(DomainEvent)) *SubscriberFunc { + return &SubscriberFunc{ + handleFunc: handle, + supportsFunc: func(string) bool { return true }, + } +} + +// Dispatcher publishes domain events to subscribers. +type Dispatcher struct { + subscribers []Subscriber + mu sync.RWMutex + + asyncChan chan DomainEvent + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + // Metrics + eventsPublished atomic.Int64 + eventsDropped atomic.Int64 + asyncQueueSize int +} + +// DispatcherConfig configures the dispatcher. +type DispatcherConfig struct { + // AsyncBufferSize is the buffer size for async event delivery. + // Default: 1000 + AsyncBufferSize int +} + +// DefaultDispatcherConfig returns the default dispatcher configuration. +func DefaultDispatcherConfig() DispatcherConfig { + return DispatcherConfig{ + AsyncBufferSize: 1000, + } +} + +// NewDispatcher creates a new event dispatcher. +func NewDispatcher(cfg DispatcherConfig) *Dispatcher { + if cfg.AsyncBufferSize <= 0 { + cfg.AsyncBufferSize = 1000 + } + + ctx, cancel := context.WithCancel(context.Background()) + d := &Dispatcher{ + asyncChan: make(chan DomainEvent, cfg.AsyncBufferSize), + ctx: ctx, + cancel: cancel, + asyncQueueSize: cfg.AsyncBufferSize, + } + + // Start async processor + d.wg.Add(1) + go d.processAsync() + + return d +} + +// Subscribe adds a subscriber to receive events. +func (d *Dispatcher) Subscribe(s Subscriber) { + d.mu.Lock() + defer d.mu.Unlock() + d.subscribers = append(d.subscribers, s) +} + +// Unsubscribe removes a subscriber. +func (d *Dispatcher) Unsubscribe(s Subscriber) { + d.mu.Lock() + defer d.mu.Unlock() + for i, sub := range d.subscribers { + if sub == s { + d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...) + return + } + } +} + +// Publish sends an event to all matching subscribers synchronously. +// This blocks until all subscribers have processed the event. +func (d *Dispatcher) Publish(event DomainEvent) { + d.mu.RLock() + subscribers := d.subscribers + d.mu.RUnlock() + + for _, s := range subscribers { + if s.Supports(event.EventType()) { + s.Handle(event) + } + } + d.eventsPublished.Add(1) +} + +// PublishAsync sends an event to be processed asynchronously. +// This returns immediately and the event is processed in a background goroutine. +// Returns true if the event was queued, false if the queue is full. +func (d *Dispatcher) PublishAsync(event DomainEvent) bool { + select { + case d.asyncChan <- event: + return true + default: + d.eventsDropped.Add(1) + log.W.F("domain event dropped (queue full): %s", event.EventType()) + return false + } +} + +// processAsync handles async event delivery. +func (d *Dispatcher) processAsync() { + defer d.wg.Done() + + for { + select { + case <-d.ctx.Done(): + // Drain remaining events before exiting + for { + select { + case event := <-d.asyncChan: + d.Publish(event) + default: + return + } + } + case event := <-d.asyncChan: + d.Publish(event) + } + } +} + +// Stop stops the dispatcher and waits for pending events to be processed. +func (d *Dispatcher) Stop() { + d.cancel() + d.wg.Wait() +} + +// Stats returns dispatcher statistics. +func (d *Dispatcher) Stats() DispatcherStats { + d.mu.RLock() + subscriberCount := len(d.subscribers) + d.mu.RUnlock() + + return DispatcherStats{ + EventsPublished: d.eventsPublished.Load(), + EventsDropped: d.eventsDropped.Load(), + SubscriberCount: subscriberCount, + QueueSize: len(d.asyncChan), + QueueCapacity: d.asyncQueueSize, + } +} + +// DispatcherStats contains dispatcher statistics. +type DispatcherStats struct { + EventsPublished int64 + EventsDropped int64 + SubscriberCount int + QueueSize int + QueueCapacity int +} + +// ============================================================================= +// Global Dispatcher (Optional Convenience) +// ============================================================================= + +var ( + globalDispatcher *Dispatcher + globalDispatcherOnce sync.Once +) + +// Global returns the global dispatcher instance. +// Creates one with default config if not already created. +func Global() *Dispatcher { + globalDispatcherOnce.Do(func() { + globalDispatcher = NewDispatcher(DefaultDispatcherConfig()) + }) + return globalDispatcher +} + +// SetGlobal sets the global dispatcher instance. +// This should be called early in application startup if custom config is needed. +func SetGlobal(d *Dispatcher) { + globalDispatcher = d +} + +// ============================================================================= +// Typed Subscription Helpers +// ============================================================================= + +// OnEventSaved subscribes to EventSaved events. +func (d *Dispatcher) OnEventSaved(handler func(*EventSaved)) { + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + if es, ok := e.(*EventSaved); ok { + handler(es) + } + }, EventSavedType)) +} + +// OnEventDeleted subscribes to EventDeleted events. +func (d *Dispatcher) OnEventDeleted(handler func(*EventDeleted)) { + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + if ed, ok := e.(*EventDeleted); ok { + handler(ed) + } + }, EventDeletedType)) +} + +// OnFollowListUpdated subscribes to FollowListUpdated events. +func (d *Dispatcher) OnFollowListUpdated(handler func(*FollowListUpdated)) { + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + if flu, ok := e.(*FollowListUpdated); ok { + handler(flu) + } + }, FollowListUpdatedType)) +} + +// OnACLMembershipChanged subscribes to ACLMembershipChanged events. +func (d *Dispatcher) OnACLMembershipChanged(handler func(*ACLMembershipChanged)) { + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + if amc, ok := e.(*ACLMembershipChanged); ok { + handler(amc) + } + }, ACLMembershipChangedType)) +} + +// OnPolicyConfigUpdated subscribes to PolicyConfigUpdated events. +func (d *Dispatcher) OnPolicyConfigUpdated(handler func(*PolicyConfigUpdated)) { + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + if pcu, ok := e.(*PolicyConfigUpdated); ok { + handler(pcu) + } + }, PolicyConfigUpdatedType)) +} + +// OnUserAuthenticated subscribes to UserAuthenticated events. +func (d *Dispatcher) OnUserAuthenticated(handler func(*UserAuthenticated)) { + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + if ua, ok := e.(*UserAuthenticated); ok { + handler(ua) + } + }, UserAuthenticatedType)) +} + +// OnMemberJoined subscribes to MemberJoined events. +func (d *Dispatcher) OnMemberJoined(handler func(*MemberJoined)) { + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + if mj, ok := e.(*MemberJoined); ok { + handler(mj) + } + }, MemberJoinedType)) +} + +// OnMemberLeft subscribes to MemberLeft events. +func (d *Dispatcher) OnMemberLeft(handler func(*MemberLeft)) { + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + if ml, ok := e.(*MemberLeft); ok { + handler(ml) + } + }, MemberLeftType)) +} diff --git a/pkg/domain/events/dispatcher_test.go b/pkg/domain/events/dispatcher_test.go new file mode 100644 index 0000000..ed829cd --- /dev/null +++ b/pkg/domain/events/dispatcher_test.go @@ -0,0 +1,245 @@ +package events + +import ( + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestDispatcherPublish(t *testing.T) { + d := NewDispatcher(DefaultDispatcherConfig()) + defer d.Stop() + + var received atomic.Int32 + + d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { + received.Add(1) + }, EventSavedType)) + + // Should be received + d.Publish(NewEventSaved(nil, 1, false, false)) + + // Should not be received (different type) + d.Publish(NewEventDeleted(nil, nil, 1)) + + if received.Load() != 1 { + t.Errorf("expected 1 event received, got %d", received.Load()) + } +} + +func TestDispatcherPublishAsync(t *testing.T) { + d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100}) + defer d.Stop() + + var received atomic.Int32 + var wg sync.WaitGroup + + d.Subscribe(NewSubscriberForAll(func(e DomainEvent) { + received.Add(1) + wg.Done() + })) + + wg.Add(10) + for i := 0; i < 10; i++ { + d.PublishAsync(NewEventSaved(nil, uint64(i), false, false)) + } + + // Wait for all events to be processed + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for async events") + } + + if received.Load() != 10 { + t.Errorf("expected 10 events received, got %d", received.Load()) + } +} + +func TestDispatcherMultipleSubscribers(t *testing.T) { + d := NewDispatcher(DefaultDispatcherConfig()) + defer d.Stop() + + var count1, count2 atomic.Int32 + + d.Subscribe(NewSubscriberForAll(func(e DomainEvent) { + count1.Add(1) + })) + + d.Subscribe(NewSubscriberForAll(func(e DomainEvent) { + count2.Add(1) + })) + + d.Publish(NewEventSaved(nil, 1, false, false)) + + if count1.Load() != 1 || count2.Load() != 1 { + t.Errorf("expected both subscribers to receive event, got %d and %d", count1.Load(), count2.Load()) + } +} + +func TestDispatcherUnsubscribe(t *testing.T) { + d := NewDispatcher(DefaultDispatcherConfig()) + defer d.Stop() + + var received atomic.Int32 + + sub := NewSubscriberForAll(func(e DomainEvent) { + received.Add(1) + }) + + d.Subscribe(sub) + d.Publish(NewEventSaved(nil, 1, false, false)) + + d.Unsubscribe(sub) + d.Publish(NewEventSaved(nil, 2, false, false)) + + if received.Load() != 1 { + t.Errorf("expected 1 event received after unsubscribe, got %d", received.Load()) + } +} + +func TestDispatcherTypedSubscription(t *testing.T) { + d := NewDispatcher(DefaultDispatcherConfig()) + defer d.Stop() + + var savedCount atomic.Int32 + var deletedCount atomic.Int32 + + d.OnEventSaved(func(e *EventSaved) { + savedCount.Add(1) + }) + + d.OnEventDeleted(func(e *EventDeleted) { + deletedCount.Add(1) + }) + + d.Publish(NewEventSaved(nil, 1, false, false)) + d.Publish(NewEventDeleted(nil, nil, 1)) + + if savedCount.Load() != 1 { + t.Errorf("expected 1 saved event, got %d", savedCount.Load()) + } + if deletedCount.Load() != 1 { + t.Errorf("expected 1 deleted event, got %d", deletedCount.Load()) + } +} + +func TestDispatcherStats(t *testing.T) { + d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100}) + defer d.Stop() + + d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {})) + + d.Publish(NewEventSaved(nil, 1, false, false)) + d.Publish(NewEventSaved(nil, 2, false, false)) + + stats := d.Stats() + if stats.EventsPublished != 2 { + t.Errorf("expected 2 events published, got %d", stats.EventsPublished) + } + if stats.SubscriberCount != 1 { + t.Errorf("expected 1 subscriber, got %d", stats.SubscriberCount) + } + if stats.QueueCapacity != 100 { + t.Errorf("expected queue capacity 100, got %d", stats.QueueCapacity) + } +} + +func TestDispatcherQueueFull(t *testing.T) { + d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 1}) + defer d.Stop() + + // Block the processor + blocker := make(chan struct{}) + d.Subscribe(NewSubscriberForAll(func(e DomainEvent) { + <-blocker + })) + + // First should succeed + if !d.PublishAsync(NewEventSaved(nil, 1, false, false)) { + t.Error("first async publish should succeed") + } + + // Wait for it to be picked up + time.Sleep(10 * time.Millisecond) + + // Second should fail (queue full) + if d.PublishAsync(NewEventSaved(nil, 2, false, false)) { + // might succeed if the first was already picked up + } + + close(blocker) +} + +func TestEventTypes(t *testing.T) { + types := AllEventTypes() + if len(types) == 0 { + t.Error("expected event types to be registered") + } + + // Check all types are unique + seen := make(map[string]bool) + for _, typ := range types { + if seen[typ] { + t.Errorf("duplicate event type: %s", typ) + } + seen[typ] = true + } +} + +func TestDomainEventInterface(t *testing.T) { + events := []DomainEvent{ + NewEventSaved(nil, 1, true, false), + NewEventDeleted([]byte{1}, []byte{2}, 3), + NewFollowListUpdated([]byte{1}, nil, nil), + NewACLMembershipChanged([]byte{1}, "none", "write", "followed"), + NewPolicyConfigUpdated([]byte{1}, map[string]interface{}{"key": "value"}), + NewPolicyFollowsUpdated([]byte{1}, 10, nil), + NewRelayGroupConfigChanged(nil), + NewClusterMembershipChanged(nil, "join"), + NewSyncSerialUpdated(100), + NewUserAuthenticated([]byte{1}, "write", true, "conn-123"), + NewConnectionOpened("conn-123", "192.168.1.1:1234"), + NewConnectionClosed("conn-123", time.Hour, 100, 50), + NewSubscriptionCreated("sub-1", "conn-123", 3), + NewSubscriptionClosed("sub-1", "conn-123", 25), + NewMemberJoined([]byte{1}, "invite-abc"), + NewMemberLeft([]byte{1}), + } + + for _, e := range events { + if e.OccurredAt().IsZero() { + t.Errorf("event %s has zero timestamp", e.EventType()) + } + if e.EventType() == "" { + t.Error("event has empty type") + } + } +} + +func TestSubscriberFunc(t *testing.T) { + var called bool + sf := NewSubscriberFunc( + func(e DomainEvent) { called = true }, + func(typ string) bool { return typ == EventSavedType }, + ) + + if !sf.Supports(EventSavedType) { + t.Error("should support EventSavedType") + } + if sf.Supports(EventDeletedType) { + t.Error("should not support EventDeletedType") + } + + sf.Handle(NewEventSaved(nil, 1, false, false)) + if !called { + t.Error("handler was not called") + } +} diff --git a/pkg/domain/events/events.go b/pkg/domain/events/events.go new file mode 100644 index 0000000..317a2de --- /dev/null +++ b/pkg/domain/events/events.go @@ -0,0 +1,420 @@ +// Package events provides domain event types and a dispatcher for the ORLY relay. +// Domain events represent significant occurrences in the system that other components +// may want to react to, enabling loose coupling between components. +package events + +import ( + "time" + + "git.mleku.dev/mleku/nostr/encoders/event" +) + +// DomainEvent is the base interface for all domain events. +type DomainEvent interface { + // OccurredAt returns when the event occurred. + OccurredAt() time.Time + // EventType returns a string identifier for the event type. + EventType() string +} + +// Base provides common fields for all domain events. +type Base struct { + occurredAt time.Time + eventType string +} + +// OccurredAt returns when the event occurred. +func (b Base) OccurredAt() time.Time { return b.occurredAt } + +// EventType returns the event type identifier. +func (b Base) EventType() string { return b.eventType } + +// newBase creates a new Base with the current time. +func newBase(eventType string) Base { + return Base{ + occurredAt: time.Now(), + eventType: eventType, + } +} + +// ============================================================================= +// Event Storage Events +// ============================================================================= + +// EventSavedType is the event type for EventSaved. +const EventSavedType = "event.saved" + +// EventSaved is emitted when a Nostr event is successfully saved to the database. +type EventSaved struct { + Base + Event *event.E // The saved event + Serial uint64 // The assigned serial number + IsAdmin bool // Whether the author is an admin + IsOwner bool // Whether the author is an owner +} + +// NewEventSaved creates a new EventSaved domain event. +func NewEventSaved(ev *event.E, serial uint64, isAdmin, isOwner bool) *EventSaved { + return &EventSaved{ + Base: newBase(EventSavedType), + Event: ev, + Serial: serial, + IsAdmin: isAdmin, + IsOwner: isOwner, + } +} + +// EventDeletedType is the event type for EventDeleted. +const EventDeletedType = "event.deleted" + +// EventDeleted is emitted when a Nostr event is deleted. +type EventDeleted struct { + Base + EventID []byte // The deleted event ID + DeletedBy []byte // Pubkey that requested deletion + Serial uint64 // The serial of the deleted event +} + +// NewEventDeleted creates a new EventDeleted domain event. +func NewEventDeleted(eventID, deletedBy []byte, serial uint64) *EventDeleted { + return &EventDeleted{ + Base: newBase(EventDeletedType), + EventID: eventID, + DeletedBy: deletedBy, + Serial: serial, + } +} + +// ============================================================================= +// ACL Events +// ============================================================================= + +// FollowListUpdatedType is the event type for FollowListUpdated. +const FollowListUpdatedType = "acl.followlist.updated" + +// FollowListUpdated is emitted when an admin's follow list changes. +type FollowListUpdated struct { + Base + AdminPubkey []byte // The admin whose follow list changed + AddedFollows [][]byte // Pubkeys that were added + RemovedFollows [][]byte // Pubkeys that were removed +} + +// NewFollowListUpdated creates a new FollowListUpdated domain event. +func NewFollowListUpdated(adminPubkey []byte, added, removed [][]byte) *FollowListUpdated { + return &FollowListUpdated{ + Base: newBase(FollowListUpdatedType), + AdminPubkey: adminPubkey, + AddedFollows: added, + RemovedFollows: removed, + } +} + +// ACLMembershipChangedType is the event type for ACLMembershipChanged. +const ACLMembershipChangedType = "acl.membership.changed" + +// ACLMembershipChanged is emitted when a pubkey's access level changes. +type ACLMembershipChanged struct { + Base + Pubkey []byte // The affected pubkey + PrevLevel string // Previous access level + NewLevel string // New access level + Reason string // Reason for the change +} + +// NewACLMembershipChanged creates a new ACLMembershipChanged domain event. +func NewACLMembershipChanged(pubkey []byte, prevLevel, newLevel, reason string) *ACLMembershipChanged { + return &ACLMembershipChanged{ + Base: newBase(ACLMembershipChangedType), + Pubkey: pubkey, + PrevLevel: prevLevel, + NewLevel: newLevel, + Reason: reason, + } +} + +// ============================================================================= +// Policy Events +// ============================================================================= + +// PolicyConfigUpdatedType is the event type for PolicyConfigUpdated. +const PolicyConfigUpdatedType = "policy.config.updated" + +// PolicyConfigUpdated is emitted when the policy configuration changes. +type PolicyConfigUpdated struct { + Base + UpdatedBy []byte // Pubkey that made the update + Changes map[string]interface{} // Changed configuration keys +} + +// NewPolicyConfigUpdated creates a new PolicyConfigUpdated domain event. +func NewPolicyConfigUpdated(updatedBy []byte, changes map[string]interface{}) *PolicyConfigUpdated { + return &PolicyConfigUpdated{ + Base: newBase(PolicyConfigUpdatedType), + UpdatedBy: updatedBy, + Changes: changes, + } +} + +// PolicyFollowsUpdatedType is the event type for PolicyFollowsUpdated. +const PolicyFollowsUpdatedType = "policy.follows.updated" + +// PolicyFollowsUpdated is emitted when policy follow lists are refreshed. +type PolicyFollowsUpdated struct { + Base + AdminPubkey []byte // The admin whose follows were processed + FollowCount int // Number of follows in the updated list + Follows [][]byte // The follow pubkeys (may be nil for large lists) +} + +// NewPolicyFollowsUpdated creates a new PolicyFollowsUpdated domain event. +func NewPolicyFollowsUpdated(adminPubkey []byte, followCount int, follows [][]byte) *PolicyFollowsUpdated { + return &PolicyFollowsUpdated{ + Base: newBase(PolicyFollowsUpdatedType), + AdminPubkey: adminPubkey, + FollowCount: followCount, + Follows: follows, + } +} + +// ============================================================================= +// Sync Events +// ============================================================================= + +// RelayGroupConfigChangedType is the event type for RelayGroupConfigChanged. +const RelayGroupConfigChangedType = "sync.relaygroup.changed" + +// RelayGroupConfigChanged is emitted when relay group configuration changes. +type RelayGroupConfigChanged struct { + Base + Event *event.E // The kind 39105 event +} + +// NewRelayGroupConfigChanged creates a new RelayGroupConfigChanged domain event. +func NewRelayGroupConfigChanged(ev *event.E) *RelayGroupConfigChanged { + return &RelayGroupConfigChanged{ + Base: newBase(RelayGroupConfigChangedType), + Event: ev, + } +} + +// ClusterMembershipChangedType is the event type for ClusterMembershipChanged. +const ClusterMembershipChangedType = "sync.cluster.membership.changed" + +// ClusterMembershipChanged is emitted when cluster membership changes. +type ClusterMembershipChanged struct { + Base + Event *event.E // The kind 39108 event + Action string // "join" or "leave" +} + +// NewClusterMembershipChanged creates a new ClusterMembershipChanged domain event. +func NewClusterMembershipChanged(ev *event.E, action string) *ClusterMembershipChanged { + return &ClusterMembershipChanged{ + Base: newBase(ClusterMembershipChangedType), + Event: ev, + Action: action, + } +} + +// SyncSerialUpdatedType is the event type for SyncSerialUpdated. +const SyncSerialUpdatedType = "sync.serial.updated" + +// SyncSerialUpdated is emitted when the sync manager's serial is updated. +type SyncSerialUpdated struct { + Base + Serial uint64 // The new serial number +} + +// NewSyncSerialUpdated creates a new SyncSerialUpdated domain event. +func NewSyncSerialUpdated(serial uint64) *SyncSerialUpdated { + return &SyncSerialUpdated{ + Base: newBase(SyncSerialUpdatedType), + Serial: serial, + } +} + +// ============================================================================= +// Authentication Events +// ============================================================================= + +// UserAuthenticatedType is the event type for UserAuthenticated. +const UserAuthenticatedType = "auth.user.authenticated" + +// UserAuthenticated is emitted when a user successfully authenticates. +type UserAuthenticated struct { + Base + Pubkey []byte // The authenticated pubkey + AccessLevel string // The granted access level + IsFirstTime bool // Whether this is a first-time user + ConnectionID string // The connection ID +} + +// NewUserAuthenticated creates a new UserAuthenticated domain event. +func NewUserAuthenticated(pubkey []byte, accessLevel string, isFirstTime bool, connID string) *UserAuthenticated { + return &UserAuthenticated{ + Base: newBase(UserAuthenticatedType), + Pubkey: pubkey, + AccessLevel: accessLevel, + IsFirstTime: isFirstTime, + ConnectionID: connID, + } +} + +// ============================================================================= +// Connection Events +// ============================================================================= + +// ConnectionOpenedType is the event type for ConnectionOpened. +const ConnectionOpenedType = "connection.opened" + +// ConnectionOpened is emitted when a new WebSocket connection is established. +type ConnectionOpened struct { + Base + ConnectionID string // Unique connection identifier + RemoteAddr string // Client IP:port +} + +// NewConnectionOpened creates a new ConnectionOpened domain event. +func NewConnectionOpened(connID, remoteAddr string) *ConnectionOpened { + return &ConnectionOpened{ + Base: newBase(ConnectionOpenedType), + ConnectionID: connID, + RemoteAddr: remoteAddr, + } +} + +// ConnectionClosedType is the event type for ConnectionClosed. +const ConnectionClosedType = "connection.closed" + +// ConnectionClosed is emitted when a WebSocket connection is closed. +type ConnectionClosed struct { + Base + ConnectionID string // Unique connection identifier + Duration time.Duration // How long the connection was open + EventsReceived int // Number of events received + EventsPublished int // Number of events published +} + +// NewConnectionClosed creates a new ConnectionClosed domain event. +func NewConnectionClosed(connID string, duration time.Duration, received, published int) *ConnectionClosed { + return &ConnectionClosed{ + Base: newBase(ConnectionClosedType), + ConnectionID: connID, + Duration: duration, + EventsReceived: received, + EventsPublished: published, + } +} + +// ============================================================================= +// Subscription Events +// ============================================================================= + +// SubscriptionCreatedType is the event type for SubscriptionCreated. +const SubscriptionCreatedType = "subscription.created" + +// SubscriptionCreated is emitted when a new REQ subscription is created. +type SubscriptionCreated struct { + Base + SubscriptionID string // The subscription ID from REQ + ConnectionID string // The connection this subscription belongs to + FilterCount int // Number of filters in the subscription +} + +// NewSubscriptionCreated creates a new SubscriptionCreated domain event. +func NewSubscriptionCreated(subID, connID string, filterCount int) *SubscriptionCreated { + return &SubscriptionCreated{ + Base: newBase(SubscriptionCreatedType), + SubscriptionID: subID, + ConnectionID: connID, + FilterCount: filterCount, + } +} + +// SubscriptionClosedType is the event type for SubscriptionClosed. +const SubscriptionClosedType = "subscription.closed" + +// SubscriptionClosed is emitted when a subscription is closed. +type SubscriptionClosed struct { + Base + SubscriptionID string // The subscription ID + ConnectionID string // The connection this subscription belonged to + EventsMatched int // Number of events that matched this subscription +} + +// NewSubscriptionClosed creates a new SubscriptionClosed domain event. +func NewSubscriptionClosed(subID, connID string, eventsMatched int) *SubscriptionClosed { + return &SubscriptionClosed{ + Base: newBase(SubscriptionClosedType), + SubscriptionID: subID, + ConnectionID: connID, + EventsMatched: eventsMatched, + } +} + +// ============================================================================= +// NIP-43 Events +// ============================================================================= + +// MemberJoinedType is the event type for MemberJoined. +const MemberJoinedType = "nip43.member.joined" + +// MemberJoined is emitted when a new member joins via NIP-43. +type MemberJoined struct { + Base + Pubkey []byte // The new member's pubkey + InviteCode string // The invite code used (if any) +} + +// NewMemberJoined creates a new MemberJoined domain event. +func NewMemberJoined(pubkey []byte, inviteCode string) *MemberJoined { + return &MemberJoined{ + Base: newBase(MemberJoinedType), + Pubkey: pubkey, + InviteCode: inviteCode, + } +} + +// MemberLeftType is the event type for MemberLeft. +const MemberLeftType = "nip43.member.left" + +// MemberLeft is emitted when a member leaves via NIP-43. +type MemberLeft struct { + Base + Pubkey []byte // The departed member's pubkey +} + +// NewMemberLeft creates a new MemberLeft domain event. +func NewMemberLeft(pubkey []byte) *MemberLeft { + return &MemberLeft{ + Base: newBase(MemberLeftType), + Pubkey: pubkey, + } +} + +// ============================================================================= +// Event Type Registry +// ============================================================================= + +// AllEventTypes returns all registered event type constants. +func AllEventTypes() []string { + return []string{ + EventSavedType, + EventDeletedType, + FollowListUpdatedType, + ACLMembershipChangedType, + PolicyConfigUpdatedType, + PolicyFollowsUpdatedType, + RelayGroupConfigChangedType, + ClusterMembershipChangedType, + SyncSerialUpdatedType, + UserAuthenticatedType, + ConnectionOpenedType, + ConnectionClosedType, + SubscriptionCreatedType, + SubscriptionClosedType, + MemberJoinedType, + MemberLeftType, + } +} diff --git a/pkg/event/ingestion/service.go b/pkg/event/ingestion/service.go new file mode 100644 index 0000000..13f3d19 --- /dev/null +++ b/pkg/event/ingestion/service.go @@ -0,0 +1,225 @@ +// Package ingestion provides a service for orchestrating the event processing pipeline. +// It coordinates validation, special kind handling, authorization, routing, and processing. +package ingestion + +import ( + "context" + "fmt" + + "git.mleku.dev/mleku/nostr/encoders/event" + "next.orly.dev/pkg/event/authorization" + "next.orly.dev/pkg/event/processing" + "next.orly.dev/pkg/event/routing" + "next.orly.dev/pkg/event/specialkinds" + "next.orly.dev/pkg/event/validation" +) + +// SprocketChecker checks events against the sprocket (external filter). +type SprocketChecker interface { + IsEnabled() bool + IsDisabled() bool + IsRunning() bool + ProcessEvent(*event.E) (*SprocketResponse, error) +} + +// SprocketResponse is the response from sprocket processing. +type SprocketResponse struct { + Action string // "accept", "reject", "shadowReject" + Msg string +} + +// ConnectionContext provides connection-specific data for event processing. +type ConnectionContext struct { + // AuthedPubkey is the authenticated pubkey for this connection (may be nil). + AuthedPubkey []byte + + // Remote is the remote address of the connection. + Remote string + + // ConnectionID uniquely identifies this connection. + ConnectionID string +} + +// Result represents the outcome of event ingestion. +type Result struct { + // Accepted indicates the event was accepted for processing. + Accepted bool + + // Saved indicates the event was saved to the database. + Saved bool + + // Message is an optional message for the OK response. + Message string + + // RequireAuth indicates authentication is required. + RequireAuth bool + + // Error contains any error that occurred. + Error error +} + +// Accepted returns a successful Result. +func Accepted(message string) Result { + return Result{Accepted: true, Saved: true, Message: message} +} + +// AcceptedNotSaved returns a Result where the event was accepted but not saved. +func AcceptedNotSaved(message string) Result { + return Result{Accepted: true, Saved: false, Message: message} +} + +// Rejected returns a rejection Result. +func Rejected(message string) Result { + return Result{Accepted: false, Message: message} +} + +// AuthRequired returns a Result indicating authentication is required. +func AuthRequired(message string) Result { + return Result{Accepted: false, RequireAuth: true, Message: message} +} + +// Errored returns a Result with an error. +func Errored(err error) Result { + return Result{Accepted: false, Error: err} +} + +// Config configures the ingestion service. +type Config struct { + // SprocketChecker is the optional sprocket checker. + SprocketChecker SprocketChecker + + // SpecialKinds is the registry for special kind handlers. + SpecialKinds *specialkinds.Registry +} + +// Service orchestrates the event ingestion pipeline. +type Service struct { + validator *validation.Service + authorizer *authorization.Service + router *routing.DefaultRouter + processor *processing.Service + sprocket SprocketChecker + specialKinds *specialkinds.Registry +} + +// NewService creates a new ingestion service. +func NewService( + validator *validation.Service, + authorizer *authorization.Service, + router *routing.DefaultRouter, + processor *processing.Service, + cfg Config, +) *Service { + return &Service{ + validator: validator, + authorizer: authorizer, + router: router, + processor: processor, + sprocket: cfg.SprocketChecker, + specialKinds: cfg.SpecialKinds, + } +} + +// Ingest processes an event through the full ingestion pipeline. +// Returns a Result indicating the outcome. +func (s *Service) Ingest(ctx context.Context, ev *event.E, connCtx *ConnectionContext) Result { + // Stage 1: Event validation (ID, timestamp, signature) + if result := s.validator.ValidateEvent(ev); !result.Valid { + return Rejected(result.Msg) + } + + // Stage 2: Sprocket check (if enabled) + if s.sprocket != nil && s.sprocket.IsEnabled() { + if s.sprocket.IsDisabled() { + return Rejected("sprocket disabled - events rejected until sprocket is restored") + } + if !s.sprocket.IsRunning() { + return Rejected("sprocket not running - events rejected until sprocket starts") + } + + response, err := s.sprocket.ProcessEvent(ev) + if err != nil { + return Errored(fmt.Errorf("sprocket processing failed: %w", err)) + } + + switch response.Action { + case "accept": + // Continue processing + case "reject": + return Rejected(response.Msg) + case "shadowReject": + // Accept but don't save + return AcceptedNotSaved("") + } + } + + // Stage 3: Special kind handling + if s.specialKinds != nil { + hctx := &specialkinds.HandlerContext{ + AuthedPubkey: connCtx.AuthedPubkey, + Remote: connCtx.Remote, + ConnectionID: connCtx.ConnectionID, + } + if result, handled := s.specialKinds.TryHandle(ctx, ev, hctx); handled { + if result.Error != nil { + return Errored(result.Error) + } + if result.Handled { + if result.SaveEvent { + // Handler wants the event saved + procResult := s.processor.Process(ctx, ev) + if procResult.Error != nil { + return Errored(procResult.Error) + } + return Accepted(result.Message) + } + return AcceptedNotSaved(result.Message) + } + // result.Continue - fall through to normal processing + } + } + + // Stage 4: Authorization check + decision := s.authorizer.Authorize(ev, connCtx.AuthedPubkey, connCtx.Remote, ev.Kind) + if !decision.Allowed { + if decision.RequireAuth { + return AuthRequired(decision.DenyReason) + } + return Rejected(decision.DenyReason) + } + + // Stage 5: Routing (ephemeral events, etc.) + if routeResult := s.router.Route(ev, connCtx.AuthedPubkey); routeResult.Action != routing.Continue { + if routeResult.Action == routing.Handled { + return AcceptedNotSaved(routeResult.Message) + } + if routeResult.Action == routing.Error { + return Errored(fmt.Errorf("routing error: %s", routeResult.Message)) + } + } + + // Stage 6: Processing (save, hooks, delivery) + procResult := s.processor.Process(ctx, ev) + if procResult.Blocked { + return Rejected(procResult.BlockMsg) + } + if procResult.Error != nil { + return Errored(procResult.Error) + } + + return Result{ + Accepted: true, + Saved: true, + } +} + +// IngestWithRawValidation includes raw JSON validation before unmarshaling. +// Use this when the event hasn't been validated yet. +func (s *Service) IngestWithRawValidation(ctx context.Context, rawJSON []byte, ev *event.E, connCtx *ConnectionContext) Result { + // Stage 0: Raw JSON validation + if result := s.validator.ValidateRawJSON(rawJSON); !result.Valid { + return Rejected(result.Msg) + } + + return s.Ingest(ctx, ev, connCtx) +} diff --git a/pkg/event/ingestion/service_test.go b/pkg/event/ingestion/service_test.go new file mode 100644 index 0000000..e301bf7 --- /dev/null +++ b/pkg/event/ingestion/service_test.go @@ -0,0 +1,87 @@ +package ingestion + +import ( + "testing" +) + +func TestResultConstructors(t *testing.T) { + t.Run("Accepted", func(t *testing.T) { + r := Accepted("msg") + if !r.Accepted || !r.Saved || r.RequireAuth || r.Error != nil { + t.Errorf("unexpected Accepted result: %+v", r) + } + if r.Message != "msg" { + t.Errorf("unexpected message: %s", r.Message) + } + }) + + t.Run("AcceptedNotSaved", func(t *testing.T) { + r := AcceptedNotSaved("msg") + if !r.Accepted || r.Saved || r.RequireAuth || r.Error != nil { + t.Error("unexpected AcceptedNotSaved result") + } + }) + + t.Run("Rejected", func(t *testing.T) { + r := Rejected("msg") + if r.Accepted || r.Saved || r.RequireAuth || r.Error != nil { + t.Error("unexpected Rejected result") + } + if r.Message != "msg" { + t.Errorf("unexpected message: %s", r.Message) + } + }) + + t.Run("AuthRequired", func(t *testing.T) { + r := AuthRequired("msg") + if r.Accepted || r.Saved || !r.RequireAuth || r.Error != nil { + t.Error("unexpected AuthRequired result") + } + }) + + t.Run("Errored", func(t *testing.T) { + err := &testError{msg: "test"} + r := Errored(err) + if r.Accepted || r.Saved || r.RequireAuth || r.Error != err { + t.Error("unexpected Errored result") + } + }) +} + +type testError struct { + msg string +} + +func (e *testError) Error() string { + return e.msg +} + +func TestConnectionContext(t *testing.T) { + ctx := &ConnectionContext{ + AuthedPubkey: []byte{1, 2, 3}, + Remote: "127.0.0.1:1234", + ConnectionID: "conn-123", + } + + if string(ctx.AuthedPubkey) != string([]byte{1, 2, 3}) { + t.Error("AuthedPubkey mismatch") + } + if ctx.Remote != "127.0.0.1:1234" { + t.Error("Remote mismatch") + } + if ctx.ConnectionID != "conn-123" { + t.Error("ConnectionID mismatch") + } +} + +func TestConfig(t *testing.T) { + cfg := Config{ + SprocketChecker: nil, + SpecialKinds: nil, + } + + // Just verify Config can be created + if cfg.SprocketChecker != nil { + t.Error("expected nil SprocketChecker") + } +} diff --git a/pkg/event/specialkinds/registry.go b/pkg/event/specialkinds/registry.go new file mode 100644 index 0000000..f9d52e5 --- /dev/null +++ b/pkg/event/specialkinds/registry.go @@ -0,0 +1,175 @@ +// Package specialkinds provides a registry for handling special event kinds +// that require custom processing before normal storage/delivery. +// This includes NIP-43 join/leave requests, policy configuration, and +// curating configuration events. +package specialkinds + +import ( + "context" + + "git.mleku.dev/mleku/nostr/encoders/event" +) + +// Result represents the outcome of handling a special kind event. +type Result struct { + // Handled indicates the event was fully processed by a handler. + // If true, normal event processing should be skipped. + Handled bool + + // Continue indicates processing should continue to normal event handling. + Continue bool + + // Error is set if the handler encountered an error. + Error error + + // Message is an optional message to include in the OK response. + Message string + + // SaveEvent indicates the event should still be saved even if handled. + SaveEvent bool +} + +// HandlerContext provides access to connection-specific data needed by handlers. +type HandlerContext struct { + // AuthedPubkey is the authenticated pubkey for this connection (may be nil). + AuthedPubkey []byte + + // Remote is the remote address of the connection. + Remote string + + // ConnectionID uniquely identifies this connection. + ConnectionID string +} + +// Handler processes special event kinds. +type Handler interface { + // CanHandle returns true if this handler can process the given event. + // This is typically based on the event kind and possibly tag values. + CanHandle(ev *event.E) bool + + // Handle processes the event. + // Returns a Result indicating the outcome. + Handle(ctx context.Context, ev *event.E, hctx *HandlerContext) Result + + // Name returns a descriptive name for this handler (for logging). + Name() string +} + +// Registry holds registered special kind handlers. +type Registry struct { + handlers []Handler +} + +// NewRegistry creates a new special kind registry. +func NewRegistry() *Registry { + return &Registry{ + handlers: make([]Handler, 0), + } +} + +// Register adds a handler to the registry. +// Handlers are checked in registration order. +func (r *Registry) Register(h Handler) { + r.handlers = append(r.handlers, h) +} + +// TryHandle attempts to handle an event with registered handlers. +// Returns (result, true) if a handler processed the event. +// Returns (nil, false) if no handler matched. +func (r *Registry) TryHandle(ctx context.Context, ev *event.E, hctx *HandlerContext) (*Result, bool) { + for _, h := range r.handlers { + if h.CanHandle(ev) { + result := h.Handle(ctx, ev, hctx) + return &result, true + } + } + return nil, false +} + +// ListHandlers returns the names of all registered handlers. +func (r *Registry) ListHandlers() []string { + names := make([]string, len(r.handlers)) + for i, h := range r.handlers { + names[i] = h.Name() + } + return names +} + +// HandlerCount returns the number of registered handlers. +func (r *Registry) HandlerCount() int { + return len(r.handlers) +} + +// ============================================================================= +// Handler Function Adapter +// ============================================================================= + +// HandlerFunc is a function adapter that implements Handler. +type HandlerFunc struct { + name string + canHandleFn func(*event.E) bool + handleFn func(context.Context, *event.E, *HandlerContext) Result +} + +// NewHandlerFunc creates a Handler from functions. +func NewHandlerFunc( + name string, + canHandle func(*event.E) bool, + handle func(context.Context, *event.E, *HandlerContext) Result, +) *HandlerFunc { + return &HandlerFunc{ + name: name, + canHandleFn: canHandle, + handleFn: handle, + } +} + +// CanHandle implements Handler. +func (h *HandlerFunc) CanHandle(ev *event.E) bool { + return h.canHandleFn(ev) +} + +// Handle implements Handler. +func (h *HandlerFunc) Handle(ctx context.Context, ev *event.E, hctx *HandlerContext) Result { + return h.handleFn(ctx, ev, hctx) +} + +// Name implements Handler. +func (h *HandlerFunc) Name() string { + return h.name +} + +// ============================================================================= +// Convenience Result Constructors +// ============================================================================= + +// Handled returns a Result indicating the event was fully handled. +func Handled(message string) Result { + return Result{ + Handled: true, + Message: message, + } +} + +// HandledWithSave returns a Result indicating the event was handled but should also be saved. +func HandledWithSave(message string) Result { + return Result{ + Handled: true, + SaveEvent: true, + Message: message, + } +} + +// ContinueProcessing returns a Result indicating normal processing should continue. +func ContinueProcessing() Result { + return Result{ + Continue: true, + } +} + +// ErrorResult returns a Result with an error. +func ErrorResult(err error) Result { + return Result{ + Error: err, + } +} diff --git a/pkg/event/specialkinds/registry_test.go b/pkg/event/specialkinds/registry_test.go new file mode 100644 index 0000000..0d295a0 --- /dev/null +++ b/pkg/event/specialkinds/registry_test.go @@ -0,0 +1,225 @@ +package specialkinds + +import ( + "context" + "errors" + "testing" + + "git.mleku.dev/mleku/nostr/encoders/event" +) + +func TestNewRegistry(t *testing.T) { + r := NewRegistry() + if r == nil { + t.Fatal("NewRegistry returned nil") + } + if r.HandlerCount() != 0 { + t.Errorf("expected 0 handlers, got %d", r.HandlerCount()) + } +} + +func TestRegistryRegister(t *testing.T) { + r := NewRegistry() + + h := NewHandlerFunc( + "test-handler", + func(ev *event.E) bool { return true }, + func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result { + return Handled("test") + }, + ) + + r.Register(h) + + if r.HandlerCount() != 1 { + t.Errorf("expected 1 handler, got %d", r.HandlerCount()) + } + + names := r.ListHandlers() + if len(names) != 1 || names[0] != "test-handler" { + t.Errorf("unexpected handler names: %v", names) + } +} + +func TestRegistryTryHandle_NoMatch(t *testing.T) { + r := NewRegistry() + + // Register a handler that never matches + h := NewHandlerFunc( + "never-matches", + func(ev *event.E) bool { return false }, + func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result { + return Handled("should not be called") + }, + ) + r.Register(h) + + ev := &event.E{Kind: 1} + result, matched := r.TryHandle(context.Background(), ev, nil) + + if matched { + t.Error("expected no match, got matched") + } + if result != nil { + t.Error("expected nil result when no match") + } +} + +func TestRegistryTryHandle_Match(t *testing.T) { + r := NewRegistry() + + // Register a handler that matches kind 1 + h := NewHandlerFunc( + "kind-1-handler", + func(ev *event.E) bool { return ev.Kind == 1 }, + func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result { + return Handled("processed kind 1") + }, + ) + r.Register(h) + + ev := &event.E{Kind: 1} + result, matched := r.TryHandle(context.Background(), ev, nil) + + if !matched { + t.Error("expected match, got no match") + } + if result == nil { + t.Fatal("expected result, got nil") + } + if !result.Handled { + t.Error("expected Handled=true") + } + if result.Message != "processed kind 1" { + t.Errorf("unexpected message: %s", result.Message) + } +} + +func TestRegistryTryHandle_FirstMatchWins(t *testing.T) { + r := NewRegistry() + + // Register two handlers that both match kind 1 + h1 := NewHandlerFunc( + "first-handler", + func(ev *event.E) bool { return ev.Kind == 1 }, + func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result { + return Handled("first") + }, + ) + h2 := NewHandlerFunc( + "second-handler", + func(ev *event.E) bool { return ev.Kind == 1 }, + func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result { + return Handled("second") + }, + ) + r.Register(h1) + r.Register(h2) + + ev := &event.E{Kind: 1} + result, _ := r.TryHandle(context.Background(), ev, nil) + + if result.Message != "first" { + t.Errorf("expected first handler to win, got message: %s", result.Message) + } +} + +func TestHandlerContext(t *testing.T) { + r := NewRegistry() + + var capturedCtx *HandlerContext + + h := NewHandlerFunc( + "context-capture", + func(ev *event.E) bool { return true }, + func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result { + capturedCtx = hctx + return Handled("captured") + }, + ) + r.Register(h) + + hctx := &HandlerContext{ + AuthedPubkey: []byte{1, 2, 3}, + Remote: "127.0.0.1:12345", + ConnectionID: "conn-abc", + } + + r.TryHandle(context.Background(), &event.E{Kind: 1}, hctx) + + if capturedCtx == nil { + t.Fatal("handler context not captured") + } + if string(capturedCtx.AuthedPubkey) != string(hctx.AuthedPubkey) { + t.Error("AuthedPubkey mismatch") + } + if capturedCtx.Remote != hctx.Remote { + t.Error("Remote mismatch") + } + if capturedCtx.ConnectionID != hctx.ConnectionID { + t.Error("ConnectionID mismatch") + } +} + +func TestResultConstructors(t *testing.T) { + t.Run("Handled", func(t *testing.T) { + r := Handled("msg") + if !r.Handled || r.Continue || r.SaveEvent || r.Error != nil { + t.Error("unexpected Handled result") + } + if r.Message != "msg" { + t.Errorf("unexpected message: %s", r.Message) + } + }) + + t.Run("HandledWithSave", func(t *testing.T) { + r := HandledWithSave("msg") + if !r.Handled || r.Continue || !r.SaveEvent || r.Error != nil { + t.Error("unexpected HandledWithSave result") + } + }) + + t.Run("ContinueProcessing", func(t *testing.T) { + r := ContinueProcessing() + if r.Handled || !r.Continue || r.SaveEvent || r.Error != nil { + t.Error("unexpected ContinueProcessing result") + } + }) + + t.Run("ErrorResult", func(t *testing.T) { + err := errors.New("test error") + r := ErrorResult(err) + if r.Handled || r.Continue || r.SaveEvent || r.Error != err { + t.Error("unexpected ErrorResult result") + } + }) +} + +func TestHandlerFuncInterface(t *testing.T) { + h := NewHandlerFunc( + "test", + func(ev *event.E) bool { return ev.Kind == 42 }, + func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result { + return Handled("42") + }, + ) + + // Check interface compliance + var _ Handler = h + + if h.Name() != "test" { + t.Errorf("unexpected name: %s", h.Name()) + } + + if !h.CanHandle(&event.E{Kind: 42}) { + t.Error("expected CanHandle to return true for kind 42") + } + if h.CanHandle(&event.E{Kind: 1}) { + t.Error("expected CanHandle to return false for kind 1") + } + + result := h.Handle(context.Background(), &event.E{Kind: 42}, nil) + if result.Message != "42" { + t.Errorf("unexpected result message: %s", result.Message) + } +} diff --git a/pkg/version/version b/pkg/version/version index e2a2ea8..e0d5dc4 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.56.4 +v0.56.5