Browse Source

Implement DDD recommendations: domain errors, events, and services (v0.56.5)

- Add docs/GLOSSARY.md with 50+ ubiquitous language definitions
- Add pkg/domain/errors/ with structured domain error types (validation,
  authorization, processing, policy, storage) and helper functions
- Add pkg/domain/events/ with domain event dispatcher supporting sync/async
  publishing and 16 event types for system-wide notifications
- Add pkg/event/specialkinds/ registry for handling special event kinds
- Add pkg/event/ingestion/ service for orchestrating event processing pipeline
- Strengthen aggregate boundaries with accessor methods and deprecation comments
  for ACL Registry, Server, and Listener
- Update DDD_ANALYSIS.md to reflect current architecture (v0.56.4)

Files modified:
- docs/GLOSSARY.md: NEW - ubiquitous language glossary
- pkg/domain/errors/errors.go: NEW - domain error types with 16 tests
- pkg/domain/events/events.go: NEW - 16 domain event types
- pkg/domain/events/dispatcher.go: NEW - event dispatcher with 10 tests
- pkg/event/specialkinds/registry.go: NEW - special kind handler registry
- pkg/event/ingestion/service.go: NEW - event processing orchestration
- pkg/acl/acl.go: Add accessor methods (GetMode, GetACLByType, etc.)
- pkg/acl/follows.go, managed.go, curating.go: Add Context() accessor
- app/server.go: Add accessor methods (GetConfig, Database, IsAdmin, IsOwner)
- app/listener.go: Add ServerContext, ServerConfig, ServerDatabase accessors
- DDD_ANALYSIS.md: Update analysis for current architecture

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
main v0.56.5
woikos 4 months ago
parent
commit
6523f0cd67
No known key found for this signature in database
  1. 268
      DDD_ANALYSIS.md
  2. 19
      app/listener.go
  3. 53
      app/server.go
  4. 352
      docs/GLOSSARY.md
  5. 38
      pkg/acl/acl.go
  6. 7
      pkg/acl/curating.go
  7. 7
      pkg/acl/follows.go
  8. 7
      pkg/acl/managed.go
  9. 561
      pkg/domain/errors/errors.go
  10. 210
      pkg/domain/errors/errors_test.go
  11. 315
      pkg/domain/events/dispatcher.go
  12. 245
      pkg/domain/events/dispatcher_test.go
  13. 420
      pkg/domain/events/events.go
  14. 225
      pkg/event/ingestion/service.go
  15. 87
      pkg/event/ingestion/service_test.go
  16. 175
      pkg/event/specialkinds/registry.go
  17. 225
      pkg/event/specialkinds/registry_test.go
  18. 2
      pkg/version/version

268
DDD_ANALYSIS.md

@ -41,17 +41,19 @@ This document provides a comprehensive Domain-Driven Design (DDD) analysis of th @@ -41,17 +41,19 @@ This document provides a comprehensive Domain-Driven Design (DDD) analysis of th
## Executive Summary
ORLY demonstrates **mature DDD adoption** for a system of its complexity. The codebase exhibits clear bounded context separation, proper repository patterns with multiple backend implementations, and well-designed interface segregation that prevents circular dependencies.
ORLY demonstrates **mature DDD adoption** with sophisticated modular architecture. The codebase has evolved from a single-binary relay to a **multi-process system** with clear bounded context separation, gRPC-based inter-process communication, and pluggable implementations for database, ACL, and sync services.
**Strengths:**
- Clear separation between `app/` (application layer) and `pkg/` (domain/infrastructure)
- Repository pattern with three interchangeable backends (Badger, Neo4j, WasmDB)
- Interface-based ACL system with pluggable implementations (None, Follows, Managed)
- Repository pattern with four interchangeable backends (Badger, Neo4j, WasmDB, gRPC)
- Interface-based ACL system with four implementations (None, Follows, Managed, Curating)
- **Driver Registry pattern** for runtime component selection
- **Process Supervisor pattern** for split IPC mode deployment
- Per-connection aggregate isolation in `Listener`
- Strong use of Go interfaces for dependency inversion
- **New:** Immutable `EventRef` value object alongside legacy `IdPkTs`
- **New:** Comprehensive protocol extensions (Blossom, Graph Queries, NIP-43, NIP-86)
- **New:** Distributed sync with cluster replication support
- Immutable `EventRef` value object alongside legacy `IdPkTs`
- Comprehensive protocol extensions (NIP-43, NIP-77, NIP-86, Blossom, Graph Queries)
- **gRPC service layer** exposing all major interfaces for remote access
**Areas for Improvement:**
- Domain events are implicit rather than explicit types
@ -59,7 +61,7 @@ ORLY demonstrates **mature DDD adoption** for a system of its complexity. The co @@ -59,7 +61,7 @@ ORLY demonstrates **mature DDD adoption** for a system of its complexity. The co
- Handler methods mix application orchestration with domain logic
- Ubiquitous language is partially documented
**Overall DDD Maturity Score: 7.5/10** (improved from 7/10)
**Overall DDD Maturity Score: 8/10** (improved from 7.5/10)
---
@ -72,13 +74,15 @@ ORLY organizes code into distinct bounded contexts, each with its own model and @@ -72,13 +74,15 @@ ORLY organizes code into distinct bounded contexts, each with its own model and
#### 1. Event Storage Context (`pkg/database/`)
- **Responsibility:** Persistent storage of Nostr events with indexing and querying
- **Key Abstractions:** `Database` interface (109 lines), `Subscription`, `Payment`, `NIP43Membership`
- **Implementations:** Badger (embedded), Neo4j (graph), WasmDB (browser)
- **Implementations:** Badger (embedded), Neo4j (graph), WasmDB (browser), gRPC (remote)
- **gRPC Server:** `pkg/database/server/` - DatabaseService with 250+ RPC methods
- **File:** `pkg/database/interface.go:17-109`
#### 2. Access Control Context (`pkg/acl/`)
- **Responsibility:** Authorization decisions for read/write operations
- **Key Abstractions:** `I` interface, `Registry`, access levels (none/read/write/admin/owner)
- **Implementations:** `None`, `Follows`, `Managed`
- **Implementations:** `None`, `Follows`, `Managed`, `Curating`
- **gRPC Server:** `pkg/acl/server/` - ACLService
- **Files:** `pkg/acl/acl.go`, `pkg/interfaces/acl/acl.go:21-40`
#### 3. Event Policy Context (`pkg/policy/`)
@ -90,19 +94,20 @@ ORLY organizes code into distinct bounded contexts, each with its own model and @@ -90,19 +94,20 @@ ORLY organizes code into distinct bounded contexts, each with its own model and
#### 4. Connection Management Context (`app/`)
- **Responsibility:** WebSocket lifecycle, message routing, authentication, flow control
- **Key Abstractions:** `Listener`, `Server`, message handlers, `messageRequest`
- **Handlers:** 25+ message type handlers
- **File:** `app/listener.go:24-52`
#### 5. Protocol Extensions Context (`pkg/protocol/`)
- **Responsibility:** NIP implementations beyond core protocol
- **Subcontexts:**
- **NIP-43 Membership** (`pkg/protocol/nip43/`): Invite-based access control
- **NIP-43 Membership** (`pkg/protocol/nip43/`): Invite-based access control (kinds 28934, 28936, 8000)
- **Graph Queries** (`pkg/protocol/graph/`): BFS traversal for follows/followers/threads
- **NWC Payments** (`pkg/protocol/nwc/`): Nostr Wallet Connect integration
- **Blossom** (`pkg/protocol/blossom/`): BUD protocol definitions
- **Directory** (`pkg/protocol/directory/`): Relay directory client
#### 6. Blob Storage Context (`pkg/blossom/`)
- **Responsibility:** Binary blob storage following BUD specifications
- **Responsibility:** Binary blob storage following BUD-09 specifications
- **Key Abstractions:** `Server`, `Storage`, `Blob`, `BlobMeta`
- **Invariants:** SHA-256 hash integrity, MIME type validation, quota enforcement
- **Files:** `pkg/blossom/server.go`, `pkg/blossom/storage.go`
@ -115,9 +120,14 @@ ORLY organizes code into distinct bounded contexts, each with its own model and @@ -115,9 +120,14 @@ ORLY organizes code into distinct bounded contexts, each with its own model and
#### 8. Distributed Sync Context (`pkg/sync/`)
- **Responsibility:** Federation and replication between relay peers
- **Key Abstractions:** `Manager`, `ClusterManager`, `RelayGroupManager`, `NIP11Cache`
- **Integration:** Serial-number based sync protocol, NIP-11 peer discovery
- **Files:** `pkg/sync/manager.go`, `pkg/sync/cluster.go`, `pkg/sync/relaygroup.go`
- **Key Abstractions:** `Manager`, `Registry`, driver pattern
- **Implementations:**
- **Negentropy** (`pkg/sync/negentropy/`): NIP-77 set reconciliation
- **Cluster** (`pkg/sync/cluster/`): HTTP-based pull replication (kind 39108)
- **Distributed** (`pkg/sync/distributed/`): Peer-to-peer sync
- **RelayGroup** (`pkg/sync/relaygroup/`): Relay grouping (kind 39105)
- **gRPC Clients:** Each sync implementation has gRPC client for split mode
- **Files:** `pkg/sync/manager.go`, `pkg/sync/negentropy/`, `pkg/sync/cluster/`
#### 9. Spider Context (`pkg/spider/`)
- **Responsibility:** Syncing events from admin relays for followed pubkeys
@ -125,47 +135,77 @@ ORLY organizes code into distinct bounded contexts, each with its own model and @@ -125,47 +135,77 @@ ORLY organizes code into distinct bounded contexts, each with its own model and
- **Integration:** Batch subscriptions, rate limit backoff, blackout periods
- **File:** `pkg/spider/spider.go`
#### 10. Process Supervision Context (`cmd/orly-launcher/`) **NEW**
- **Responsibility:** Multi-process lifecycle management for split IPC mode
- **Key Abstractions:** `Supervisor`, `Config`, process state machine
- **Patterns:** Supervisor pattern with dependency ordering, crash recovery
- **Integration:** gRPC health checks, graceful shutdown ordering
- **Files:** `cmd/orly-launcher/supervisor.go`, `cmd/orly-launcher/config.go`
#### 11. Certificate Management Context (`cmd/orly-certs/`) **NEW**
- **Responsibility:** TLS certificate provisioning and renewal
- **Key Abstractions:** Certificate manager, DNS-01 challenge handler
- **Integration:** Independent service, communicates via filesystem
- **File:** `cmd/orly-certs/`
### Context Map
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Connection Management (app/) │
┌─────────────────────────────────────────────────────────────────────────────────┐
│ Process Supervision (orly-launcher) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Supervisor │───▶│ DB Process │───▶│ ACL Process │───▶│Relay Process│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │ │
│ │ [Dependency] │ [gRPC] │ [gRPC] │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Server │───▶│ Listener │───▶│ Handlers │◀──▶│ Publishers │ │
│ │Sync Services│ │ Certs │ │ Admin Web │ │ Health │ │
│ │ (4 procs) │ │ Service │ │ UI │ │ Checks │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└────────┬────────────────────┬────────────────────┬──────────────────────────┘
└─────────────────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────┼───────────────────────────────┐
│ │ │
│ [Conformist] │ [Customer-Supplier]│ [Customer-Supplier]
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
Access Control│ │ Event Storage │ │ Event Policy │
(pkg/acl/) │ │ (pkg/database/)│ │ (pkg/policy/) │
Event Storage │ │ Access Control│ │ Event Policy │
(pkg/database/)│ │ (pkg/acl/) │ │ (pkg/policy/) │
│ │ │ │ │ │
│ Registry ◀────┼───┼────Conformist──┼───┼─▶ Manager │
│ Badger│Neo4j │◀─[gRPC]─────▶│Follows│Managed │◀─[Conformist]│ Manager │
│ WasmDB│gRPC │ │Curating│None │ │ │
└────────────────┘ └────────────────┘ └────────────────┘
│ │ │
│ [Shared Kernel]
│ ▼
┌────────────────┐
│ │ Event Entity │
│ │(git.mleku.dev/ │◀───────────┘
│ │ mleku/nostr)
└────────────────┘
[Shared Kernel] │
▼ ▼
────────────────────────────────────────────────────────────┐ │
│ Event Entity │
│ (git.mleku.dev/mleku/nostr) │◀─────────┘
│ Filter, Tag, Subscription types
────────────────────────────────────────────────────────────┘
│ │
│ [Anti-Corruption] │ [Customer-Supplier]
│ [Customer-Supplier] │ [Customer-Supplier]
▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
Rate Limiting │ │ Protocol │ │ Blob Storage
(pkg/ratelimit)│ │ Extensions │ │ (pkg/blossom)
Blob Storage │ │ Protocol │ │ Rate Limiting
(pkg/blossom) │ │ Extensions │ │ (pkg/ratelimit)
│ │ │ (pkg/protocol/)│ │ │
└────────────────┘ └────────────────┘ └────────────────┘
┌────────────────────┼────────────────────┐
┌───────────────────────────────┼───────────────────────────────┐
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ NIP-43 │ │ NIP-77 │ │ Graph Queries │
│ Membership │ │ Negentropy │ │(pkg/protocol/ │
│(kinds 28934/36)│ │ Sync │ │ graph/) │
└────────────────┘ └────────────────┘ └────────────────┘
┌───────────────────────────────┼───────────────────────────────┐
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ Distributed │ │ Spider │ │ Graph Queries │
│ Sync │ │ (pkg/spider) │ │(pkg/protocol/ │
│ (pkg/sync/) │ │ │ │ graph/) │
Cluster │ │ Distributed │ │ Relay Group
Sync │ │ Sync │ │ (kind 39105)
(kind 39108) │ │ │ │
└────────────────┘ └────────────────┘ └────────────────┘
```
@ -174,26 +214,31 @@ ORLY organizes code into distinct bounded contexts, each with its own model and @@ -174,26 +214,31 @@ ORLY organizes code into distinct bounded contexts, each with its own model and
| Upstream | Downstream | Pattern | Notes |
|----------|------------|---------|-------|
| nostr library | All contexts | Shared Kernel | Event, Filter, Tag types |
| Database | ACL, Policy, Blossom | Customer-Supplier | Query for follow lists, permissions, blob storage |
| Database | ACL, Policy, Blossom, Sync | Customer-Supplier | Query for follows, permissions, event storage |
| Policy | Handlers, Sync | Conformist | All respect policy decisions |
| ACL | Handlers, Blossom | Conformist | Handlers/Blossom respect access levels |
| Rate Limit | Database | Anti-Corruption | Load monitor abstraction |
| Sync | Database, Policy | Customer-Supplier | Serial-based event replication |
| Rate Limit | Database | Anti-Corruption | LoadMonitor abstraction |
| Sync Services | Database | Customer-Supplier | Serial-based event replication |
| Launcher | All Services | Supervisor | Process lifecycle management |
| gRPC Layer | DB, ACL, Sync | Published Language | Protocol buffer contracts |
### Subdomain Classification
| Subdomain | Type | Justification |
|-----------|------|---------------|
| Event Storage | **Core** | Central to relay's value proposition |
| Access Control | **Core** | Key differentiator (WoT, follows-based, managed) |
| Access Control | **Core** | Key differentiator (WoT, follows-based, managed, curating) |
| Event Policy | **Core** | Enables complex filtering rules |
| Graph Queries | **Core** | Unique social graph traversal capabilities |
| NIP-43 Membership | **Core** | Unique invite-based access model |
| Blob Storage (Blossom) | **Core** | Media hosting differentiator |
| NIP-77 Negentropy Sync | **Core** | Efficient relay-to-relay sync |
| Connection Management | **Supporting** | Standard WebSocket infrastructure |
| Rate Limiting | **Supporting** | Operational concern with PID controller |
| Distributed Sync | **Supporting** | Infrastructure for federation |
| Cluster Sync | **Supporting** | Infrastructure for federation |
| Spider | **Supporting** | Data aggregation from external relays |
| Process Supervision | **Generic** | Standard process management |
| Certificate Management | **Generic** | Standard TLS infrastructure |
---
@ -231,7 +276,7 @@ type InviteCode struct { @@ -231,7 +276,7 @@ type InviteCode struct {
}
```
- **Identity:** Unique code string
- **Lifecycle:** Created → Valid → Used/Expired
- **Lifecycle:** Created -> Valid -> Used/Expired
- **Invariants:** Cannot be reused once consumed
#### Subscription (Payment Entity)
@ -240,7 +285,7 @@ type InviteCode struct { @@ -240,7 +285,7 @@ type InviteCode struct {
// GetSubscription, ExtendSubscription, RecordPayment
```
- **Identity:** Pubkey
- **Lifecycle:** Trial → Active → Expired
- **Lifecycle:** Trial -> Active -> Expired
- **Invariants:** Can only extend if not expired
#### Blob (Blossom Entity)
@ -255,14 +300,28 @@ type BlobMeta struct { @@ -255,14 +300,28 @@ type BlobMeta struct {
}
```
- **Identity:** SHA-256 hash
- **Lifecycle:** Uploaded → Active → Deleted
- **Lifecycle:** Uploaded -> Active -> Deleted
- **Invariants:** Hash must match content; owner can delete
#### Process (Supervisor Entity) **NEW**
```go
// cmd/orly-launcher/supervisor.go (implied)
type Process struct {
Name string // Identity: service name
Cmd *exec.Cmd // Running process
State ProcessState
Restarts int
}
```
- **Identity:** Service name (orly-db, orly-acl, etc.)
- **Lifecycle:** Stopped -> Starting -> Running -> Stopping
- **Invariants:** Dependency ordering; health check before dependent startup
### Value Objects
Value objects are immutable and defined by their attributes, not identity.
#### EventRef (Immutable Event Reference) - **NEW**
#### EventRef (Immutable Event Reference)
```go
// pkg/interfaces/store/store_interface.go:99-107
type EventRef struct {
@ -321,7 +380,7 @@ type Rule struct { @@ -321,7 +380,7 @@ type Rule struct {
}
```
- **Complexity:** 25+ fields, decomposition candidate
- **Binary caches:** Performance optimization for hexbinary conversion
- **Binary caches:** Performance optimization for hex->binary conversion
#### WriteRequest (Message Value)
```go
@ -335,6 +394,24 @@ type WriteRequest struct { @@ -335,6 +394,24 @@ type WriteRequest struct {
}
```
#### LauncherConfig (Configuration Value) **NEW**
```go
// cmd/orly-launcher/config.go
type Config struct {
DBBackend string // badger, neo4j
DBListen string // 127.0.0.1:50051
ACLEnabled bool
ACLMode string // follows, managed, curation
ACLListen string // 127.0.0.1:50052
AdminEnabled bool
AdminPort int
AdminOwners []string
// ... sync service configs
}
```
- **Persistence:** JSON file at `~/.config/orly/launcher.json`
- **Environment Override:** All fields can be overridden via `ORLY_LAUNCHER_*`
### Aggregates
Aggregates are clusters of entities/value objects with consistency boundaries.
@ -387,6 +464,16 @@ if isAuthMessage { @@ -387,6 +464,16 @@ if isAuthMessage {
- MIME type restrictions
- Owner-only deletion
#### Supervisor Aggregate **NEW**
- **Root:** `Supervisor`
- **Members:** Process map, config, dependency graph
- **Boundary:** All managed processes
- **Invariants:**
- Dependency ordering (DB -> ACL -> Sync -> Relay)
- Health checks before dependent startup
- Reverse shutdown ordering
- Crash recovery with restart limits
### Repositories
The Repository pattern abstracts persistence for aggregate roots.
@ -429,6 +516,7 @@ type Database interface { @@ -429,6 +516,7 @@ type Database interface {
1. **Badger** (`pkg/database/database.go`): Embedded key-value store
2. **Neo4j** (`pkg/neo4j/`): Graph database for social queries
3. **WasmDB** (`pkg/wasmdb/`): Browser IndexedDB for WASM builds
4. **gRPC** (`pkg/database/grpc/`): Remote database via gRPC **NEW**
**Interface Segregation:**
```go
@ -451,6 +539,21 @@ type I interface { @@ -451,6 +539,21 @@ type I interface {
}
```
#### Driver Registry Pattern **NEW**
```go
// pkg/database/ - Driver selection at runtime
database.HasDriver("badger") // Check availability
database.NewFromDriver("badger", config) // Create instance
// pkg/acl/ - ACL driver selection
acl.HasDriver("follows")
acl.NewACLFromDriver("follows", config)
// pkg/sync/ - Sync driver selection
sync.HasDriver("negentropy")
sync.NewSyncManager("negentropy", config)
```
### Domain Services
Domain services encapsulate logic that doesn't belong to any single entity.
@ -512,6 +615,24 @@ func (l *Limiter) Wait(ctx context.Context, op OperationType) error @@ -512,6 +615,24 @@ func (l *Limiter) Wait(ctx context.Context, op OperationType) error
- Separate setpoints for read/write operations
- Emergency mode with hysteresis
#### Supervisor (Process Lifecycle Service) **NEW**
```go
// cmd/orly-launcher/supervisor.go
type Supervisor struct {
config *Config
processes map[string]*Process
mu sync.Mutex
}
func (s *Supervisor) Start() error
func (s *Supervisor) Stop() error
func (s *Supervisor) Restart(name string) error
func (s *Supervisor) IsRunning() bool
```
- Manages process lifecycle with dependency ordering
- Health checks via gRPC before proceeding
- Crash recovery with configurable restart limits
- Graceful shutdown in reverse dependency order
### Domain Events
**Current State:** Domain events are implicit in message flow, not explicit types.
@ -529,6 +650,9 @@ func (l *Limiter) Wait(ctx context.Context, op OperationType) error @@ -529,6 +650,9 @@ func (l *Limiter) Wait(ctx context.Context, op OperationType) error
| PolicyUpdated | Policy config event | `messagePauseMutex.Lock()` |
| BlobUploaded | Blossom PUT success | Quota updated |
| BlobDeleted | Blossom DELETE | Quota released |
| ProcessStarted | Supervisor start | Health check, dependency unlock |
| ProcessCrashed | Process exit | Restart attempt, alert |
| ConfigUpdated | Admin UI save | JSON persistence, reload |
---
@ -627,6 +751,13 @@ type BlobUploaded struct { @@ -627,6 +751,13 @@ type BlobUploaded struct {
Size int64
Timestamp time.Time
}
type ProcessStateChanged struct {
ServiceName string
OldState ProcessState
NewState ProcessState
Timestamp time.Time
}
```
### 2. Strengthen Aggregate Boundaries
@ -690,6 +821,8 @@ func (s *EventService) ProcessIncomingEvent(ctx context.Context, ev *event.E, au @@ -690,6 +821,8 @@ func (s *EventService) ProcessIncomingEvent(ctx context.Context, ev *event.E, au
| Blob | Binary content (images, media) | `blossom.BlobMeta` |
| Spider | Event aggregator from external relays | `spider.Spider` |
| Sync | Peer-to-peer replication | `sync.Manager` |
| Supervisor | Process lifecycle manager | `supervisor.Supervisor` |
| Driver | Pluggable implementation | Registry pattern |
```
### 5. Add Domain-Specific Error Types
@ -706,6 +839,7 @@ var ( @@ -706,6 +839,7 @@ var (
ErrQuotaExceeded = &DomainError{Code: "QUOTA_EXCEEDED"}
ErrInviteCodeInvalid = &DomainError{Code: "INVITE_INVALID"}
ErrBlobTooLarge = &DomainError{Code: "BLOB_TOO_LARGE"}
ErrServiceUnavailable = &DomainError{Code: "SERVICE_UNAVAILABLE"}
)
```
@ -739,13 +873,16 @@ The context map is now documented in this file with integration patterns. @@ -739,13 +873,16 @@ The context map is now documented in this file with integration patterns.
- [x] Bounded contexts identified with clear boundaries
- [x] Repositories abstract persistence for aggregate roots
- [x] Multiple repository implementations (Badger/Neo4j/WasmDB)
- [x] Multiple repository implementations (Badger/Neo4j/WasmDB/gRPC)
- [x] Interface segregation prevents circular dependencies
- [x] Configuration centralized (`app/config/config.go`)
- [x] Per-connection aggregate isolation
- [x] Access control as pluggable strategy pattern
- [x] Value objects have immutable alternative (`EventRef`)
- [x] Context map documented
- [x] Driver registry pattern for runtime selection
- [x] gRPC layer for distributed deployment
- [x] Process supervision for split mode
### Needs Attention
@ -774,13 +911,14 @@ The context map is now documented in this file with integration patterns. @@ -774,13 +911,14 @@ The context map is now documented in this file with integration patterns.
| File | Purpose |
|------|---------|
| `app/server.go` | HTTP/WebSocket server setup (1240 lines) |
| `app/listener.go` | Connection aggregate (297 lines) |
| `app/server.go` | HTTP/WebSocket server setup |
| `app/listener.go` | Connection aggregate |
| `app/handle-event.go` | EVENT message handler |
| `app/handle-req.go` | REQ message handler |
| `app/handle-auth.go` | AUTH message handler |
| `app/handle-nip43.go` | NIP-43 membership handlers |
| `app/handle-nip86.go` | NIP-86 management handlers |
| `app/handle-negentropy.go` | NIP-77 negentropy sync |
| `app/handle-policy-config.go` | Policy configuration events |
### Infrastructure Files
@ -788,14 +926,30 @@ The context map is now documented in this file with integration patterns. @@ -788,14 +926,30 @@ The context map is now documented in this file with integration patterns.
| File | Purpose |
|------|---------|
| `pkg/database/database.go` | Badger implementation |
| `pkg/database/server/` | gRPC database server |
| `pkg/database/grpc/` | gRPC database client |
| `pkg/neo4j/` | Neo4j implementation |
| `pkg/wasmdb/` | WasmDB implementation |
| `pkg/blossom/server.go` | Blossom blob storage server |
| `pkg/ratelimit/limiter.go` | PID-based rate limiting |
| `pkg/sync/manager.go` | Distributed sync manager |
| `pkg/sync/cluster.go` | Cluster replication |
| `pkg/sync/negentropy/` | NIP-77 negentropy sync |
| `pkg/sync/cluster/` | HTTP cluster replication |
| `pkg/spider/spider.go` | Event spider/aggregator |
### Command Binaries
| Binary | Purpose |
|--------|---------|
| `cmd/orly-launcher/` | Process supervisor with admin UI |
| `cmd/orly-db-badger/` | Standalone Badger database server |
| `cmd/orly-db-neo4j/` | Standalone Neo4j database server |
| `cmd/orly-acl-follows/` | Follows-based ACL server |
| `cmd/orly-acl-managed/` | NIP-86 managed ACL server |
| `cmd/orly-acl-curation/` | Trust-tier curation ACL server |
| `cmd/orly-sync-negentropy/` | NIP-77 negentropy sync service |
| `cmd/orly-sync-cluster/` | Cluster replication service |
| `cmd/orly-certs/` | Certificate management service |
### Interface Packages
| Package | Purpose |
@ -810,7 +964,15 @@ The context map is now documented in this file with integration patterns. @@ -810,7 +964,15 @@ The context map is now documented in this file with integration patterns.
| `pkg/interfaces/store/` | Store interface with IdPkTs, EventRef |
| `pkg/interfaces/typer/` | Type introspection interface |
### Protocol Buffer Definitions
| Package | Purpose |
|---------|---------|
| `pkg/proto/orlydb/v1/` | Database gRPC service (250+ methods) |
| `pkg/proto/orlyacl/v1/` | ACL gRPC service |
| `pkg/proto/orlysync/` | Sync services (negentropy, cluster, etc.) |
---
*Generated: 2025-12-24*
*Analysis based on ORLY codebase v0.36.14*
*Generated: 2026-01-24*
*Analysis based on ORLY codebase v0.56.4*

19
app/listener.go

@ -12,6 +12,7 @@ import ( @@ -12,6 +12,7 @@ import (
"github.com/gorilla/websocket"
"lol.mleku.dev/errorf"
"lol.mleku.dev/log"
"next.orly.dev/app/config"
"next.orly.dev/pkg/acl"
"next.orly.dev/pkg/database"
"git.mleku.dev/mleku/nostr/encoders/event"
@ -22,6 +23,9 @@ import ( @@ -22,6 +23,9 @@ import (
)
type Listener struct {
// Server is the embedded server reference.
// Deprecated: Prefer using accessor methods (ServerConfig, ServerDatabase, etc.)
// instead of accessing Server fields directly.
*Server
conn *websocket.Conn
ctx context.Context
@ -64,6 +68,21 @@ func (l *Listener) Ctx() context.Context { @@ -64,6 +68,21 @@ func (l *Listener) Ctx() context.Context {
return l.ctx
}
// ServerContext returns the server's context (distinct from the listener's own context).
func (l *Listener) ServerContext() context.Context {
return l.Server.Context()
}
// ServerConfig returns the server's configuration.
func (l *Listener) ServerConfig() *config.C {
return l.Server.GetConfig()
}
// ServerDatabase returns the server's database instance.
func (l *Listener) ServerDatabase() database.Database {
return l.Server.Database()
}
// DroppedMessages returns the total number of messages that were dropped
// because the message processing queue was full.
func (l *Listener) DroppedMessages() int {

53
app/server.go

@ -47,12 +47,22 @@ import ( @@ -47,12 +47,22 @@ import (
type Server struct {
mux *http.ServeMux
// Config holds the relay configuration.
// Deprecated: Use GetConfig() method instead of accessing directly.
Config *config.C
// Ctx holds the server context.
// Deprecated: Use Context() method instead of accessing directly.
Ctx context.Context
publishers *publish.S
// Admins holds the admin pubkeys.
// Deprecated: Use IsAdmin() method instead of accessing directly.
Admins [][]byte
// Owners holds the owner pubkeys.
// Deprecated: Use IsOwner() method instead of accessing directly.
Owners [][]byte
DB database.Database // Changed from embedded *database.D to interface field
// DB holds the database instance.
// Deprecated: Use Database() method instead of accessing directly.
DB database.Database
// optional reverse proxy for dev web server
devProxy *httputil.ReverseProxy
@ -110,6 +120,47 @@ type Server struct { @@ -110,6 +120,47 @@ type Server struct {
brandingMgr *branding.Manager
}
// =============================================================================
// Server Accessor Methods
// =============================================================================
// GetConfig returns the relay configuration.
func (s *Server) GetConfig() *config.C {
return s.Config
}
// Context returns the server context.
func (s *Server) Context() context.Context {
return s.Ctx
}
// Database returns the database instance.
func (s *Server) Database() database.Database {
return s.DB
}
// IsAdmin returns true if the given pubkey is an admin.
func (s *Server) IsAdmin(pubkey []byte) bool {
pubHex := string(hex.Enc(pubkey))
for _, admin := range s.Admins {
if string(hex.Enc(admin)) == pubHex {
return true
}
}
return false
}
// IsOwner returns true if the given pubkey is an owner.
func (s *Server) IsOwner(pubkey []byte) bool {
pubHex := string(hex.Enc(pubkey))
for _, owner := range s.Owners {
if string(hex.Enc(owner)) == pubHex {
return true
}
}
return false
}
// isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system
func (s *Server) isIPBlacklisted(remote string) bool {
// Extract IP from remote address (e.g., "192.168.1.1:12345" -> "192.168.1.1")

352
docs/GLOSSARY.md

@ -0,0 +1,352 @@ @@ -0,0 +1,352 @@
# ORLY Relay Domain Glossary
This glossary defines the ubiquitous language used throughout the ORLY codebase.
All contributors should use these terms consistently in code, comments, and documentation.
---
## Core Domain Concepts
### Event
A Nostr event as defined in NIP-01. The fundamental unit of data in the Nostr protocol.
Contains: id, pubkey, created_at, kind, tags, content, sig.
- **Code location**: `git.mleku.dev/mleku/nostr/encoders/event`
- **Key type**: `event.E`
### Serial
A monotonically increasing 40-bit identifier assigned to each event upon storage.
Used for efficient range queries, synchronization, and garbage collection ordering.
- **Code location**: `pkg/database/indexes/types.Uint40`
- **Related types**: EventRef, IdPkTs
### Pubkey
A 32-byte secp256k1 public key identifying a Nostr user. Stored as binary internally,
displayed as 64-character lowercase hex or bech32 npub format externally.
- **Code location**: `git.mleku.dev/mleku/nostr/types.Pubkey`
### Access Level
The permission tier granted to a pubkey. Determines what operations are allowed.
| Level | Description |
|-------|-------------|
| `none` | No access, authentication required |
| `read` | Read-only access (REQ allowed, EVENT denied) |
| `write` | Read and write access |
| `admin` | Write + import/export + arbitrary delete |
| `owner` | Admin + wipe + system configuration |
| `blocked` | IP address blocked |
| `banned` | Pubkey banned |
- **Code location**: `pkg/interfaces/acl/acl.go` constants
### ACL (Access Control List)
The authorization system that determines access levels for pubkeys and IP addresses.
Supports multiple modes with different authorization strategies.
- **Code location**: `pkg/acl/`
- **Interface**: `pkg/interfaces/acl/acl.go`
---
## Event Processing Pipeline
The event processing pipeline transforms incoming WebSocket messages into stored events.
Each stage has distinct responsibilities and produces typed results.
```
Raw JSON → Validation → Authorization → Routing → Processing → Delivery
```
### Validation
The process of verifying event structure, signature, and protocol compliance.
Checks performed:
- Raw JSON validation (hex case normalization)
- Event ID verification (hash matches content)
- Signature verification (schnorr signature valid)
- Timestamp validation (not too far in future)
- NIP-70 protected tag validation
- **Code location**: `pkg/event/validation/`
- **Result type**: `validation.Result` with `Valid`, `Code`, `Msg`
### Authorization
The decision process determining if an event is allowed based on ACL and policy.
Returns a structured decision with access level and deny reason.
- **Code location**: `pkg/event/authorization/`
- **Result type**: `authorization.Decision` with `Allowed`, `AccessLevel`, `DenyReason`, `RequireAuth`
### Routing
Dispatching events to specialized handlers based on event kind.
Determines whether events should be processed normally, delivered ephemerally, or handled specially.
Examples:
- Ephemeral events (kinds 20000-29999): Deliver without storage
- Delete events (kind 5): Trigger deletion cascade
- NIP-43 events (kinds 28934, 28936): Membership requests
- **Code location**: `pkg/event/routing/`
- **Result type**: `routing.Result` with `Action`, `Error`
### Processing
The final stage: persisting events, running post-save hooks, and delivering to subscribers.
Handles deduplication, replaceable event logic, and event delivery.
- **Code location**: `pkg/event/processing/`
- **Result type**: `processing.Result` with `Saved`, `Duplicate`, `Blocked`, `Error`
---
## ACL Modes
### None Mode
Open relay - all pubkeys have write access by default.
No authentication required unless explicitly configured via `ORLY_AUTH_REQUIRED`.
- **Code location**: `pkg/acl/none.go`
### Follows Mode
Whitelist based on admin/owner follow lists (kind 3 events).
Followed pubkeys get write access; others get read-only or denied based on configuration.
Supports progressive throttling for non-followed users.
- **Code location**: `pkg/acl/follows.go`
- **Config**: `ORLY_ACL_MODE=follows`
### Managed Mode
Fine-grained control via NIP-86 management API.
Supports pubkey bans, event bans, IP blocks, kind restrictions, and custom rules.
All management operations require NIP-98 HTTP authentication.
- **Code location**: `pkg/acl/managed.go`
- **Config**: `ORLY_ACL_MODE=managed`
### Curating Mode
Curator-based content moderation system.
Curators can approve/reject events from non-followed users.
Events from non-curated users are held pending approval.
- **Code location**: `pkg/acl/curating.go`
- **Config**: `ORLY_ACL_MODE=curating`
---
## Protocol Concepts
### NIP-42 Authentication
Challenge-response authentication for WebSocket connections.
Used to verify pubkey ownership before granting elevated access.
Flow:
1. Relay sends AUTH challenge with random string
2. Client signs challenge with private key
3. Relay verifies signature and grants access level
- **Code location**: `pkg/protocol/auth/`
### NIP-70 Protected Events
Events with `-` (protected) tag that can only be replaced/deleted by the author.
Prevents relays from accepting replacements from unauthorized pubkeys.
- **Tag format**: `["-"]` in tags array
### NIP-43 Relay Access
Invite-based membership system for restricted relays.
Supports join requests, leave requests, and membership tracking.
| Kind | Purpose |
|------|---------|
| 28934 | Join request with invite code |
| 28936 | Leave request |
| 8000 | Member added (relay-published) |
- **Code location**: `pkg/protocol/nip43/`
### NIP-86 Relay Management
HTTP JSON-RPC API for relay administration.
Requires NIP-98 HTTP authentication with admin/owner access level.
- **Endpoint**: `/api/v1/management`
- **Code location**: `app/handle-nip86.go`
### NIP-77 Negentropy
Set reconciliation protocol for efficient relay-to-relay synchronization.
Uses negentropy algorithm to identify missing events with minimal bandwidth.
- **Code location**: `pkg/sync/negentropy/`
- **Envelope types**: NEG-OPEN, NEG-MSG, NEG-CLOSE, NEG-ERR
---
## Infrastructure Concepts
### Publisher
The event delivery system that sends events to subscribers.
Composed of multiple publisher implementations (socket, internal, etc.).
- **Code location**: `pkg/protocol/publish/`
- **Interface**: `pkg/interfaces/publisher/publisher.go`
### Sprocket
External event processing plugin (JavaScript/Rhai script).
Can accept, reject, or shadow-reject events before normal processing.
| Action | Effect |
|--------|--------|
| accept | Event proceeds to normal processing |
| reject | Event rejected with error message |
| shadowReject | Event appears accepted but is not stored |
- **Code location**: `app/sprocket.go`
### Policy Manager
Rule-based event filtering system configured via JSON or kind 30078 events.
Evaluates events against configurable rules for allow/deny decisions.
- **Code location**: `pkg/policy/`
- **Config file**: `~/.config/orly/policy.json`
### Rate Limiter
PID controller-based adaptive throttling system.
Adjusts delays based on system load (memory pressure, write throughput).
- **Code location**: `pkg/ratelimit/`
- **Interface**: `pkg/interfaces/loadmonitor/`
### Supervisor
Process lifecycle manager for split IPC mode deployment.
Manages database, ACL, sync, and relay processes with dependency ordering.
- **Code location**: `cmd/orly-launcher/supervisor.go`
---
## Data Types
### EventRef
Stack-allocated event reference with fixed-size ID and pubkey arrays.
80 bytes total, fits in cache line, safe for concurrent use.
Immutable - all fields are unexported with accessor methods.
```go
type EventRef struct {
id ntypes.EventID // 32 bytes
pub ntypes.Pubkey // 32 bytes
ts int64 // 8 bytes
ser uint64 // 8 bytes
}
```
- **Code location**: `pkg/interfaces/store/store_interface.go`
### IdPkTs
Event reference with slice-based fields for backward compatibility.
Mutable - use `ToEventRef()` for safe concurrent access.
```go
type IdPkTs struct {
Id []byte // Event ID
Pub []byte // Pubkey
Ts int64 // Timestamp
Ser uint64 // Serial number
}
```
- **Code location**: `pkg/interfaces/store/store_interface.go`
### Decision
Authorization result carrying allowed status, access level, and context.
Used to communicate authorization outcomes through the pipeline.
- **Code location**: `pkg/event/authorization/authorization.go`
### Result (Validation)
Validation outcome with Valid bool, ReasonCode enum, and message string.
Codes: ReasonNone, ReasonBlocked, ReasonInvalid, ReasonError.
- **Code location**: `pkg/event/validation/validation.go`
### Result (Processing)
Processing outcome indicating whether event was saved, duplicate, or blocked.
Includes error field for unexpected failures.
- **Code location**: `pkg/event/processing/processing.go`
---
## Subscription Terminology
Note: "Subscription" has two distinct meanings in the codebase:
### Event Subscription (REQ)
An active filter receiving matching events in real-time.
Created via REQ envelope, cancelled via CLOSE envelope.
Stored in `Listener.subscriptions` map with cancel function.
- **Code location**: `app/listener.go` subscriptions field
### Payment Subscription
Paid access tier granting elevated permissions for a time period.
Managed via NWC payments or manual extension.
- **Code location**: `pkg/database/interface.go` Subscription type
---
## Sync Terminology
### Spider
Event aggregator that fetches events from external relays.
Subscribes to events for followed pubkeys on configured relay lists.
- **Code location**: `pkg/spider/`
### Sync
Peer-to-peer replication between relay instances.
Multiple implementations: negentropy, cluster, distributed, relaygroup.
- **Code location**: `pkg/sync/`
### Cluster
Group of relay instances sharing events via HTTP-based pull replication.
Membership tracked via kind 39108 events.
- **Code location**: `pkg/sync/cluster/`
### Relay Group
Configuration of relay sets for synchronized operation.
Tracked via kind 39105 events.
- **Code location**: `pkg/sync/relaygroup/`
---
## Driver Pattern
### Driver
A pluggable implementation of a core interface.
Selected at runtime via configuration or command-line flags.
Examples:
- Database drivers: badger, neo4j, wasmdb, grpc
- ACL drivers: none, follows, managed, curating
- Sync drivers: negentropy, cluster, distributed, relaygroup
```go
// Check if driver is available
database.HasDriver("badger")
// Create instance from driver
db := database.NewFromDriver("badger", config)
```
- **Pattern location**: `pkg/database/`, `pkg/acl/`, `pkg/sync/`
---
*Last updated: 2026-01-24*
*Based on ORLY codebase v0.56.4*

38
pkg/acl/acl.go

@ -17,10 +17,48 @@ func (s *S) SetMode(m string) { @@ -17,10 +17,48 @@ func (s *S) SetMode(m string) {
}
type S struct {
// ACL holds registered ACL implementations.
// Deprecated: Use GetACLByType() or ListRegisteredACLs() instead of accessing directly.
ACL []acliface.I
// Active holds the name of the currently active ACL mode.
// Deprecated: Use GetMode() instead of Active.Load().
Active atomic.String
}
// GetMode returns the currently active ACL mode name.
func (s *S) GetMode() string {
return s.Active.Load()
}
// GetACLByType returns the ACL implementation with the given type name, or nil if not found.
func (s *S) GetACLByType(typ string) acliface.I {
for _, i := range s.ACL {
if i.Type() == typ {
return i
}
}
return nil
}
// GetActiveACL returns the currently active ACL implementation, or nil if none is active.
func (s *S) GetActiveACL() acliface.I {
return s.GetACLByType(s.Active.Load())
}
// ListRegisteredACLs returns the type names of all registered ACL implementations.
func (s *S) ListRegisteredACLs() []string {
types := make([]string, 0, len(s.ACL))
for _, i := range s.ACL {
types = append(types, i.Type())
}
return types
}
// IsRegistered returns true if an ACL with the given type is registered.
func (s *S) IsRegistered(typ string) bool {
return s.GetACLByType(typ) != nil
}
type A struct{ S }
func (s *S) Register(i acliface.I) {

7
pkg/acl/curating.go

@ -34,6 +34,8 @@ const ( @@ -34,6 +34,8 @@ const (
// - Blacklisted: Cannot publish
// - Unclassified: Rate-limited publishing (default 50/day)
type Curating struct {
// Ctx holds the context for the ACL.
// Deprecated: Use Context() method instead of accessing directly.
Ctx context.Context
cfg *config.C
db database.Database
@ -50,6 +52,11 @@ type Curating struct { @@ -50,6 +52,11 @@ type Curating struct {
cacheMx sync.RWMutex
}
// Context returns the ACL context.
func (c *Curating) Context() context.Context {
return c.Ctx
}
func (c *Curating) Configure(cfg ...any) (err error) {
log.I.F("configuring curating ACL")
for _, ca := range cfg {

7
pkg/acl/follows.go

@ -31,6 +31,8 @@ import ( @@ -31,6 +31,8 @@ import (
)
type Follows struct {
// Ctx holds the context for the ACL.
// Deprecated: Use Context() method instead of accessing directly.
Ctx context.Context
cfg *config.C
db database.Database
@ -51,6 +53,11 @@ type Follows struct { @@ -51,6 +53,11 @@ type Follows struct {
throttle *ProgressiveThrottle
}
// Context returns the ACL context.
func (f *Follows) Context() context.Context {
return f.Ctx
}
func (f *Follows) Configure(cfg ...any) (err error) {
log.I.F("configuring follows ACL")
for _, ca := range cfg {

7
pkg/acl/managed.go

@ -17,6 +17,8 @@ import ( @@ -17,6 +17,8 @@ import (
)
type Managed struct {
// Ctx holds the context for the ACL.
// Deprecated: Use Context() method instead of accessing directly.
Ctx context.Context
cfg *config.C
db database.Database
@ -27,6 +29,11 @@ type Managed struct { @@ -27,6 +29,11 @@ type Managed struct {
mx sync.RWMutex
}
// Context returns the ACL context.
func (m *Managed) Context() context.Context {
return m.Ctx
}
func (m *Managed) Configure(cfg ...any) (err error) {
log.I.F("configuring managed ACL")
for _, ca := range cfg {

561
pkg/domain/errors/errors.go

@ -0,0 +1,561 @@ @@ -0,0 +1,561 @@
// Package errors provides domain-specific error types for the ORLY relay.
// These typed errors enable structured error handling, machine-readable error codes,
// and proper error categorization throughout the codebase.
package errors
import (
"fmt"
)
// DomainError is the base interface for all domain errors.
// It extends the standard error interface with structured metadata.
type DomainError interface {
error
Code() string // Machine-readable error code (e.g., "INVALID_ID")
Category() string // Error category for grouping (e.g., "validation")
IsRetryable() bool // Whether the operation can be retried
}
// Base provides common implementation for all domain errors.
type Base struct {
code string
category string
message string
retryable bool
cause error
}
func (e *Base) Error() string {
if e.cause != nil {
return fmt.Sprintf("%s: %v", e.message, e.cause)
}
return e.message
}
func (e *Base) Code() string { return e.code }
func (e *Base) Category() string { return e.category }
func (e *Base) IsRetryable() bool { return e.retryable }
func (e *Base) Unwrap() error { return e.cause }
// WithCause returns a copy of the error with the given cause.
func (e *Base) WithCause(cause error) *Base {
return &Base{
code: e.code,
category: e.category,
message: e.message,
retryable: e.retryable,
cause: cause,
}
}
// WithMessage returns a copy of the error with the given message.
func (e *Base) WithMessage(msg string) *Base {
return &Base{
code: e.code,
category: e.category,
message: msg,
retryable: e.retryable,
cause: e.cause,
}
}
// =============================================================================
// Validation Errors
// =============================================================================
// ValidationError represents an error in event validation.
type ValidationError struct {
Base
Field string // The field that failed validation
}
// NewValidationError creates a new validation error.
func NewValidationError(code, field, message string) *ValidationError {
return &ValidationError{
Base: Base{
code: code,
category: "validation",
message: message,
},
Field: field,
}
}
// WithField returns a copy with the specified field.
func (e *ValidationError) WithField(field string) *ValidationError {
return &ValidationError{
Base: e.Base,
Field: field,
}
}
// Validation error constants
var (
ErrInvalidEventID = NewValidationError(
"INVALID_ID",
"id",
"event ID does not match computed hash",
)
ErrInvalidSignature = NewValidationError(
"INVALID_SIG",
"sig",
"signature verification failed",
)
ErrFutureTimestamp = NewValidationError(
"FUTURE_TS",
"created_at",
"timestamp too far in future",
)
ErrPastTimestamp = NewValidationError(
"PAST_TS",
"created_at",
"timestamp too far in past",
)
ErrUppercaseHex = NewValidationError(
"UPPERCASE_HEX",
"id/pubkey",
"hex values must be lowercase",
)
ErrProtectedTagMismatch = NewValidationError(
"PROTECTED_TAG",
"tags",
"protected event can only be modified by author",
)
ErrInvalidJSON = NewValidationError(
"INVALID_JSON",
"",
"malformed JSON",
)
ErrEventTooLarge = NewValidationError(
"EVENT_TOO_LARGE",
"content",
"event exceeds size limit",
)
ErrInvalidKind = NewValidationError(
"INVALID_KIND",
"kind",
"event kind not allowed",
)
ErrMissingTag = NewValidationError(
"MISSING_TAG",
"tags",
"required tag missing",
)
ErrInvalidTagValue = NewValidationError(
"INVALID_TAG",
"tags",
"tag value validation failed",
)
)
// =============================================================================
// Authorization Errors
// =============================================================================
// AuthorizationError represents an authorization failure.
type AuthorizationError struct {
Base
Pubkey []byte // The pubkey that was denied
AccessLevel string // The access level that was required
RequireAuth bool // Whether authentication might resolve this
}
// NeedsAuth returns true if authentication might resolve this error.
func (e *AuthorizationError) NeedsAuth() bool { return e.RequireAuth }
// NewAuthRequired creates an error indicating authentication is required.
func NewAuthRequired(reason string) *AuthorizationError {
return &AuthorizationError{
Base: Base{
code: "AUTH_REQUIRED",
category: "authorization",
message: reason,
},
RequireAuth: true,
}
}
// NewAccessDenied creates an error indicating access was denied.
func NewAccessDenied(level, reason string) *AuthorizationError {
return &AuthorizationError{
Base: Base{
code: "ACCESS_DENIED",
category: "authorization",
message: reason,
},
AccessLevel: level,
}
}
// WithPubkey returns a copy with the specified pubkey.
func (e *AuthorizationError) WithPubkey(pubkey []byte) *AuthorizationError {
return &AuthorizationError{
Base: e.Base,
Pubkey: pubkey,
AccessLevel: e.AccessLevel,
RequireAuth: e.RequireAuth,
}
}
// Authorization error constants
var (
ErrAuthRequired = NewAuthRequired("authentication required")
ErrBanned = &AuthorizationError{
Base: Base{
code: "BANNED",
category: "authorization",
message: "pubkey banned",
},
}
ErrIPBlocked = &AuthorizationError{
Base: Base{
code: "IP_BLOCKED",
category: "authorization",
message: "IP address blocked",
},
}
ErrNotFollowed = &AuthorizationError{
Base: Base{
code: "NOT_FOLLOWED",
category: "authorization",
message: "write access requires being followed by admin",
},
}
ErrNotMember = &AuthorizationError{
Base: Base{
code: "NOT_MEMBER",
category: "authorization",
message: "membership required",
},
}
ErrInsufficientAccess = &AuthorizationError{
Base: Base{
code: "INSUFFICIENT_ACCESS",
category: "authorization",
message: "insufficient access level",
},
}
)
// =============================================================================
// Processing Errors
// =============================================================================
// ProcessingError represents an error during event processing.
type ProcessingError struct {
Base
EventID []byte // The event ID (if known)
Kind uint16 // The event kind (if known)
}
// NewProcessingError creates a new processing error.
func NewProcessingError(code string, message string, retryable bool) *ProcessingError {
return &ProcessingError{
Base: Base{
code: code,
category: "processing",
message: message,
retryable: retryable,
},
}
}
// WithEventID returns a copy with the specified event ID.
func (e *ProcessingError) WithEventID(id []byte) *ProcessingError {
return &ProcessingError{
Base: e.Base,
EventID: id,
Kind: e.Kind,
}
}
// WithKind returns a copy with the specified kind.
func (e *ProcessingError) WithKind(kind uint16) *ProcessingError {
return &ProcessingError{
Base: e.Base,
EventID: e.EventID,
Kind: kind,
}
}
// Processing error constants
var (
ErrDuplicate = NewProcessingError(
"DUPLICATE",
"event already exists",
false,
)
ErrReplaceNotAllowed = NewProcessingError(
"REPLACE_DENIED",
"cannot replace event from different author",
false,
)
ErrDeletedEvent = NewProcessingError(
"DELETED",
"event has been deleted",
false,
)
ErrEphemeralNotStored = NewProcessingError(
"EPHEMERAL",
"ephemeral events are not stored",
false,
)
ErrRateLimited = NewProcessingError(
"RATE_LIMITED",
"rate limit exceeded",
true,
)
ErrSprocketRejected = NewProcessingError(
"SPROCKET_REJECTED",
"rejected by sprocket",
false,
)
)
// =============================================================================
// Policy Errors
// =============================================================================
// PolicyError represents a policy violation.
type PolicyError struct {
Base
RuleName string // The rule that was violated
Action string // The action taken (block, reject)
}
// NewPolicyBlocked creates an error for a blocked event.
func NewPolicyBlocked(ruleName, reason string) *PolicyError {
return &PolicyError{
Base: Base{
code: "POLICY_BLOCKED",
category: "policy",
message: reason,
},
RuleName: ruleName,
Action: "block",
}
}
// NewPolicyRejected creates an error for a rejected event.
func NewPolicyRejected(ruleName, reason string) *PolicyError {
return &PolicyError{
Base: Base{
code: "POLICY_REJECTED",
category: "policy",
message: reason,
},
RuleName: ruleName,
Action: "reject",
}
}
// Policy error constants
var (
ErrKindBlocked = &PolicyError{
Base: Base{
code: "KIND_BLOCKED",
category: "policy",
message: "event kind not allowed by policy",
},
Action: "block",
}
ErrPubkeyBlocked = &PolicyError{
Base: Base{
code: "PUBKEY_BLOCKED",
category: "policy",
message: "pubkey blocked by policy",
},
Action: "block",
}
ErrContentBlocked = &PolicyError{
Base: Base{
code: "CONTENT_BLOCKED",
category: "policy",
message: "content blocked by policy",
},
Action: "block",
}
ErrScriptRejected = &PolicyError{
Base: Base{
code: "SCRIPT_REJECTED",
category: "policy",
message: "rejected by policy script",
},
Action: "reject",
}
)
// =============================================================================
// Storage Errors
// =============================================================================
// StorageError represents a storage-layer error.
type StorageError struct {
Base
}
// NewStorageError creates a new storage error.
func NewStorageError(code, message string, cause error, retryable bool) *StorageError {
return &StorageError{
Base: Base{
code: code,
category: "storage",
message: message,
cause: cause,
retryable: retryable,
},
}
}
// Storage error constants
var (
ErrDatabaseUnavailable = NewStorageError(
"DB_UNAVAILABLE",
"database not available",
nil,
true,
)
ErrWriteTimeout = NewStorageError(
"WRITE_TIMEOUT",
"write operation timed out",
nil,
true,
)
ErrReadTimeout = NewStorageError(
"READ_TIMEOUT",
"read operation timed out",
nil,
true,
)
ErrStorageFull = NewStorageError(
"STORAGE_FULL",
"storage capacity exceeded",
nil,
false,
)
ErrCorruptedData = NewStorageError(
"CORRUPTED_DATA",
"data corruption detected",
nil,
false,
)
)
// =============================================================================
// Service Errors
// =============================================================================
// ServiceError represents a service-level error.
type ServiceError struct {
Base
ServiceName string
}
// NewServiceError creates a new service error.
func NewServiceError(code, service, message string, retryable bool) *ServiceError {
return &ServiceError{
Base: Base{
code: code,
category: "service",
message: message,
retryable: retryable,
},
ServiceName: service,
}
}
// Service error constants
var (
ErrServiceUnavailable = NewServiceError(
"SERVICE_UNAVAILABLE",
"",
"service temporarily unavailable",
true,
)
ErrServiceTimeout = NewServiceError(
"SERVICE_TIMEOUT",
"",
"service request timed out",
true,
)
ErrServiceOverloaded = NewServiceError(
"SERVICE_OVERLOADED",
"",
"service is overloaded",
true,
)
)
// =============================================================================
// Helper Functions
// =============================================================================
// Is checks if an error matches a target domain error by code.
func Is(err error, target DomainError) bool {
if de, ok := err.(DomainError); ok {
return de.Code() == target.Code()
}
return false
}
// Code returns the error code if err is a DomainError, empty string otherwise.
func Code(err error) string {
if de, ok := err.(DomainError); ok {
return de.Code()
}
return ""
}
// Category returns the category of a domain error, or "unknown" for other errors.
func Category(err error) string {
if de, ok := err.(DomainError); ok {
return de.Category()
}
return "unknown"
}
// IsRetryable checks if an error indicates the operation can be retried.
func IsRetryable(err error) bool {
if de, ok := err.(DomainError); ok {
return de.IsRetryable()
}
return false
}
// IsValidation checks if an error is a validation error.
func IsValidation(err error) bool {
_, ok := err.(*ValidationError)
return ok
}
// IsAuthorization checks if an error is an authorization error.
func IsAuthorization(err error) bool {
_, ok := err.(*AuthorizationError)
return ok
}
// IsProcessing checks if an error is a processing error.
func IsProcessing(err error) bool {
_, ok := err.(*ProcessingError)
return ok
}
// IsPolicy checks if an error is a policy error.
func IsPolicy(err error) bool {
_, ok := err.(*PolicyError)
return ok
}
// IsStorage checks if an error is a storage error.
func IsStorage(err error) bool {
_, ok := err.(*StorageError)
return ok
}
// NeedsAuth checks if an authorization error requires authentication.
func NeedsAuth(err error) bool {
if ae, ok := err.(*AuthorizationError); ok {
return ae.NeedsAuth()
}
return false
}

210
pkg/domain/errors/errors_test.go

@ -0,0 +1,210 @@ @@ -0,0 +1,210 @@
package errors
import (
"errors"
"testing"
)
func TestValidationError(t *testing.T) {
err := ErrInvalidEventID
if err.Code() != "INVALID_ID" {
t.Errorf("expected code INVALID_ID, got %s", err.Code())
}
if err.Category() != "validation" {
t.Errorf("expected category validation, got %s", err.Category())
}
if err.Field != "id" {
t.Errorf("expected field id, got %s", err.Field)
}
if err.IsRetryable() {
t.Error("validation errors should not be retryable")
}
}
func TestAuthorizationError(t *testing.T) {
err := ErrAuthRequired
if err.Code() != "AUTH_REQUIRED" {
t.Errorf("expected code AUTH_REQUIRED, got %s", err.Code())
}
if !err.NeedsAuth() {
t.Error("ErrAuthRequired should indicate auth is needed")
}
err2 := ErrBanned
if err2.NeedsAuth() {
t.Error("ErrBanned should not indicate auth is needed")
}
}
func TestProcessingError(t *testing.T) {
err := ErrRateLimited
if err.Code() != "RATE_LIMITED" {
t.Errorf("expected code RATE_LIMITED, got %s", err.Code())
}
if !err.IsRetryable() {
t.Error("rate limit errors should be retryable")
}
err2 := ErrDuplicate
if err2.IsRetryable() {
t.Error("duplicate errors should not be retryable")
}
}
func TestStorageError(t *testing.T) {
err := ErrDatabaseUnavailable
if err.Code() != "DB_UNAVAILABLE" {
t.Errorf("expected code DB_UNAVAILABLE, got %s", err.Code())
}
if err.Category() != "storage" {
t.Errorf("expected category storage, got %s", err.Category())
}
if !err.IsRetryable() {
t.Error("database unavailable should be retryable")
}
}
func TestPolicyError(t *testing.T) {
err := NewPolicyBlocked("kind_whitelist", "kind 30023 not allowed")
if err.Code() != "POLICY_BLOCKED" {
t.Errorf("expected code POLICY_BLOCKED, got %s", err.Code())
}
if err.RuleName != "kind_whitelist" {
t.Errorf("expected rule name kind_whitelist, got %s", err.RuleName)
}
if err.Action != "block" {
t.Errorf("expected action block, got %s", err.Action)
}
}
func TestIsHelper(t *testing.T) {
if !Is(ErrInvalidEventID, ErrInvalidEventID) {
t.Error("Is should return true for same error")
}
if Is(ErrInvalidEventID, ErrInvalidSignature) {
t.Error("Is should return false for different errors")
}
// Test with non-domain error
stdErr := errors.New("standard error")
if Is(stdErr, ErrInvalidEventID) {
t.Error("Is should return false for non-domain errors")
}
}
func TestCodeHelper(t *testing.T) {
if Code(ErrInvalidEventID) != "INVALID_ID" {
t.Errorf("expected INVALID_ID, got %s", Code(ErrInvalidEventID))
}
stdErr := errors.New("standard error")
if Code(stdErr) != "" {
t.Errorf("expected empty string for non-domain error, got %s", Code(stdErr))
}
}
func TestCategoryHelper(t *testing.T) {
if Category(ErrInvalidEventID) != "validation" {
t.Errorf("expected validation, got %s", Category(ErrInvalidEventID))
}
if Category(ErrBanned) != "authorization" {
t.Errorf("expected authorization, got %s", Category(ErrBanned))
}
if Category(ErrDuplicate) != "processing" {
t.Errorf("expected processing, got %s", Category(ErrDuplicate))
}
stdErr := errors.New("standard error")
if Category(stdErr) != "unknown" {
t.Errorf("expected unknown for non-domain error, got %s", Category(stdErr))
}
}
func TestIsRetryableHelper(t *testing.T) {
if !IsRetryable(ErrRateLimited) {
t.Error("ErrRateLimited should be retryable")
}
if !IsRetryable(ErrDatabaseUnavailable) {
t.Error("ErrDatabaseUnavailable should be retryable")
}
if IsRetryable(ErrDuplicate) {
t.Error("ErrDuplicate should not be retryable")
}
if IsRetryable(ErrInvalidEventID) {
t.Error("ErrInvalidEventID should not be retryable")
}
}
func TestTypeCheckers(t *testing.T) {
if !IsValidation(ErrInvalidEventID) {
t.Error("ErrInvalidEventID should be a validation error")
}
if !IsAuthorization(ErrBanned) {
t.Error("ErrBanned should be an authorization error")
}
if !IsProcessing(ErrDuplicate) {
t.Error("ErrDuplicate should be a processing error")
}
if !IsPolicy(ErrKindBlocked) {
t.Error("ErrKindBlocked should be a policy error")
}
if !IsStorage(ErrDatabaseUnavailable) {
t.Error("ErrDatabaseUnavailable should be a storage error")
}
}
func TestNeedsAuthHelper(t *testing.T) {
if !NeedsAuth(ErrAuthRequired) {
t.Error("ErrAuthRequired should need auth")
}
if NeedsAuth(ErrBanned) {
t.Error("ErrBanned should not need auth")
}
if NeedsAuth(ErrInvalidEventID) {
t.Error("validation errors should not need auth")
}
}
func TestWithCause(t *testing.T) {
cause := errors.New("underlying error")
err := ErrDatabaseUnavailable.Base.WithCause(cause)
if err.Unwrap() != cause {
t.Error("WithCause should set the cause")
}
if err.Error() != "database not available: underlying error" {
t.Errorf("unexpected error message: %s", err.Error())
}
}
func TestWithMessage(t *testing.T) {
err := ErrInvalidEventID.Base.WithMessage("custom message")
if err.Error() != "custom message" {
t.Errorf("expected custom message, got %s", err.Error())
}
if err.Code() != "INVALID_ID" {
t.Error("code should be preserved")
}
}
func TestWithPubkey(t *testing.T) {
pubkey := []byte{1, 2, 3, 4}
err := ErrBanned.WithPubkey(pubkey)
if len(err.Pubkey) != 4 {
t.Error("pubkey should be set")
}
}
func TestWithEventID(t *testing.T) {
eventID := []byte{1, 2, 3, 4}
err := ErrDuplicate.WithEventID(eventID)
if len(err.EventID) != 4 {
t.Error("event ID should be set")
}
}
func TestWithKind(t *testing.T) {
err := ErrDuplicate.WithKind(30023)
if err.Kind != 30023 {
t.Errorf("expected kind 30023, got %d", err.Kind)
}
}

315
pkg/domain/events/dispatcher.go

@ -0,0 +1,315 @@ @@ -0,0 +1,315 @@
package events
import (
"context"
"sync"
"sync/atomic"
"lol.mleku.dev/log"
)
// Subscriber handles domain events.
type Subscriber interface {
// Handle processes the given domain event.
Handle(event DomainEvent)
// Supports returns true if this subscriber handles the given event type.
Supports(eventType string) bool
}
// SubscriberFunc is a function that can be used as a Subscriber.
type SubscriberFunc struct {
handleFunc func(DomainEvent)
supportsFunc func(string) bool
}
// Handle implements Subscriber.
func (s *SubscriberFunc) Handle(event DomainEvent) {
s.handleFunc(event)
}
// Supports implements Subscriber.
func (s *SubscriberFunc) Supports(eventType string) bool {
return s.supportsFunc(eventType)
}
// NewSubscriberFunc creates a new SubscriberFunc.
func NewSubscriberFunc(handle func(DomainEvent), supports func(string) bool) *SubscriberFunc {
return &SubscriberFunc{
handleFunc: handle,
supportsFunc: supports,
}
}
// NewSubscriberForTypes creates a subscriber that handles specific event types.
func NewSubscriberForTypes(handle func(DomainEvent), types ...string) *SubscriberFunc {
typeSet := make(map[string]struct{}, len(types))
for _, t := range types {
typeSet[t] = struct{}{}
}
return &SubscriberFunc{
handleFunc: handle,
supportsFunc: func(eventType string) bool {
_, ok := typeSet[eventType]
return ok
},
}
}
// NewSubscriberForAll creates a subscriber that handles all event types.
func NewSubscriberForAll(handle func(DomainEvent)) *SubscriberFunc {
return &SubscriberFunc{
handleFunc: handle,
supportsFunc: func(string) bool { return true },
}
}
// Dispatcher publishes domain events to subscribers.
type Dispatcher struct {
subscribers []Subscriber
mu sync.RWMutex
asyncChan chan DomainEvent
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Metrics
eventsPublished atomic.Int64
eventsDropped atomic.Int64
asyncQueueSize int
}
// DispatcherConfig configures the dispatcher.
type DispatcherConfig struct {
// AsyncBufferSize is the buffer size for async event delivery.
// Default: 1000
AsyncBufferSize int
}
// DefaultDispatcherConfig returns the default dispatcher configuration.
func DefaultDispatcherConfig() DispatcherConfig {
return DispatcherConfig{
AsyncBufferSize: 1000,
}
}
// NewDispatcher creates a new event dispatcher.
func NewDispatcher(cfg DispatcherConfig) *Dispatcher {
if cfg.AsyncBufferSize <= 0 {
cfg.AsyncBufferSize = 1000
}
ctx, cancel := context.WithCancel(context.Background())
d := &Dispatcher{
asyncChan: make(chan DomainEvent, cfg.AsyncBufferSize),
ctx: ctx,
cancel: cancel,
asyncQueueSize: cfg.AsyncBufferSize,
}
// Start async processor
d.wg.Add(1)
go d.processAsync()
return d
}
// Subscribe adds a subscriber to receive events.
func (d *Dispatcher) Subscribe(s Subscriber) {
d.mu.Lock()
defer d.mu.Unlock()
d.subscribers = append(d.subscribers, s)
}
// Unsubscribe removes a subscriber.
func (d *Dispatcher) Unsubscribe(s Subscriber) {
d.mu.Lock()
defer d.mu.Unlock()
for i, sub := range d.subscribers {
if sub == s {
d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...)
return
}
}
}
// Publish sends an event to all matching subscribers synchronously.
// This blocks until all subscribers have processed the event.
func (d *Dispatcher) Publish(event DomainEvent) {
d.mu.RLock()
subscribers := d.subscribers
d.mu.RUnlock()
for _, s := range subscribers {
if s.Supports(event.EventType()) {
s.Handle(event)
}
}
d.eventsPublished.Add(1)
}
// PublishAsync sends an event to be processed asynchronously.
// This returns immediately and the event is processed in a background goroutine.
// Returns true if the event was queued, false if the queue is full.
func (d *Dispatcher) PublishAsync(event DomainEvent) bool {
select {
case d.asyncChan <- event:
return true
default:
d.eventsDropped.Add(1)
log.W.F("domain event dropped (queue full): %s", event.EventType())
return false
}
}
// processAsync handles async event delivery.
func (d *Dispatcher) processAsync() {
defer d.wg.Done()
for {
select {
case <-d.ctx.Done():
// Drain remaining events before exiting
for {
select {
case event := <-d.asyncChan:
d.Publish(event)
default:
return
}
}
case event := <-d.asyncChan:
d.Publish(event)
}
}
}
// Stop stops the dispatcher and waits for pending events to be processed.
func (d *Dispatcher) Stop() {
d.cancel()
d.wg.Wait()
}
// Stats returns dispatcher statistics.
func (d *Dispatcher) Stats() DispatcherStats {
d.mu.RLock()
subscriberCount := len(d.subscribers)
d.mu.RUnlock()
return DispatcherStats{
EventsPublished: d.eventsPublished.Load(),
EventsDropped: d.eventsDropped.Load(),
SubscriberCount: subscriberCount,
QueueSize: len(d.asyncChan),
QueueCapacity: d.asyncQueueSize,
}
}
// DispatcherStats contains dispatcher statistics.
type DispatcherStats struct {
EventsPublished int64
EventsDropped int64
SubscriberCount int
QueueSize int
QueueCapacity int
}
// =============================================================================
// Global Dispatcher (Optional Convenience)
// =============================================================================
var (
globalDispatcher *Dispatcher
globalDispatcherOnce sync.Once
)
// Global returns the global dispatcher instance.
// Creates one with default config if not already created.
func Global() *Dispatcher {
globalDispatcherOnce.Do(func() {
globalDispatcher = NewDispatcher(DefaultDispatcherConfig())
})
return globalDispatcher
}
// SetGlobal sets the global dispatcher instance.
// This should be called early in application startup if custom config is needed.
func SetGlobal(d *Dispatcher) {
globalDispatcher = d
}
// =============================================================================
// Typed Subscription Helpers
// =============================================================================
// OnEventSaved subscribes to EventSaved events.
func (d *Dispatcher) OnEventSaved(handler func(*EventSaved)) {
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
if es, ok := e.(*EventSaved); ok {
handler(es)
}
}, EventSavedType))
}
// OnEventDeleted subscribes to EventDeleted events.
func (d *Dispatcher) OnEventDeleted(handler func(*EventDeleted)) {
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
if ed, ok := e.(*EventDeleted); ok {
handler(ed)
}
}, EventDeletedType))
}
// OnFollowListUpdated subscribes to FollowListUpdated events.
func (d *Dispatcher) OnFollowListUpdated(handler func(*FollowListUpdated)) {
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
if flu, ok := e.(*FollowListUpdated); ok {
handler(flu)
}
}, FollowListUpdatedType))
}
// OnACLMembershipChanged subscribes to ACLMembershipChanged events.
func (d *Dispatcher) OnACLMembershipChanged(handler func(*ACLMembershipChanged)) {
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
if amc, ok := e.(*ACLMembershipChanged); ok {
handler(amc)
}
}, ACLMembershipChangedType))
}
// OnPolicyConfigUpdated subscribes to PolicyConfigUpdated events.
func (d *Dispatcher) OnPolicyConfigUpdated(handler func(*PolicyConfigUpdated)) {
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
if pcu, ok := e.(*PolicyConfigUpdated); ok {
handler(pcu)
}
}, PolicyConfigUpdatedType))
}
// OnUserAuthenticated subscribes to UserAuthenticated events.
func (d *Dispatcher) OnUserAuthenticated(handler func(*UserAuthenticated)) {
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
if ua, ok := e.(*UserAuthenticated); ok {
handler(ua)
}
}, UserAuthenticatedType))
}
// OnMemberJoined subscribes to MemberJoined events.
func (d *Dispatcher) OnMemberJoined(handler func(*MemberJoined)) {
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
if mj, ok := e.(*MemberJoined); ok {
handler(mj)
}
}, MemberJoinedType))
}
// OnMemberLeft subscribes to MemberLeft events.
func (d *Dispatcher) OnMemberLeft(handler func(*MemberLeft)) {
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
if ml, ok := e.(*MemberLeft); ok {
handler(ml)
}
}, MemberLeftType))
}

245
pkg/domain/events/dispatcher_test.go

@ -0,0 +1,245 @@ @@ -0,0 +1,245 @@
package events
import (
"sync"
"sync/atomic"
"testing"
"time"
)
func TestDispatcherPublish(t *testing.T) {
d := NewDispatcher(DefaultDispatcherConfig())
defer d.Stop()
var received atomic.Int32
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
received.Add(1)
}, EventSavedType))
// Should be received
d.Publish(NewEventSaved(nil, 1, false, false))
// Should not be received (different type)
d.Publish(NewEventDeleted(nil, nil, 1))
if received.Load() != 1 {
t.Errorf("expected 1 event received, got %d", received.Load())
}
}
func TestDispatcherPublishAsync(t *testing.T) {
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100})
defer d.Stop()
var received atomic.Int32
var wg sync.WaitGroup
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
received.Add(1)
wg.Done()
}))
wg.Add(10)
for i := 0; i < 10; i++ {
d.PublishAsync(NewEventSaved(nil, uint64(i), false, false))
}
// Wait for all events to be processed
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("timeout waiting for async events")
}
if received.Load() != 10 {
t.Errorf("expected 10 events received, got %d", received.Load())
}
}
func TestDispatcherMultipleSubscribers(t *testing.T) {
d := NewDispatcher(DefaultDispatcherConfig())
defer d.Stop()
var count1, count2 atomic.Int32
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
count1.Add(1)
}))
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
count2.Add(1)
}))
d.Publish(NewEventSaved(nil, 1, false, false))
if count1.Load() != 1 || count2.Load() != 1 {
t.Errorf("expected both subscribers to receive event, got %d and %d", count1.Load(), count2.Load())
}
}
func TestDispatcherUnsubscribe(t *testing.T) {
d := NewDispatcher(DefaultDispatcherConfig())
defer d.Stop()
var received atomic.Int32
sub := NewSubscriberForAll(func(e DomainEvent) {
received.Add(1)
})
d.Subscribe(sub)
d.Publish(NewEventSaved(nil, 1, false, false))
d.Unsubscribe(sub)
d.Publish(NewEventSaved(nil, 2, false, false))
if received.Load() != 1 {
t.Errorf("expected 1 event received after unsubscribe, got %d", received.Load())
}
}
func TestDispatcherTypedSubscription(t *testing.T) {
d := NewDispatcher(DefaultDispatcherConfig())
defer d.Stop()
var savedCount atomic.Int32
var deletedCount atomic.Int32
d.OnEventSaved(func(e *EventSaved) {
savedCount.Add(1)
})
d.OnEventDeleted(func(e *EventDeleted) {
deletedCount.Add(1)
})
d.Publish(NewEventSaved(nil, 1, false, false))
d.Publish(NewEventDeleted(nil, nil, 1))
if savedCount.Load() != 1 {
t.Errorf("expected 1 saved event, got %d", savedCount.Load())
}
if deletedCount.Load() != 1 {
t.Errorf("expected 1 deleted event, got %d", deletedCount.Load())
}
}
func TestDispatcherStats(t *testing.T) {
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100})
defer d.Stop()
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {}))
d.Publish(NewEventSaved(nil, 1, false, false))
d.Publish(NewEventSaved(nil, 2, false, false))
stats := d.Stats()
if stats.EventsPublished != 2 {
t.Errorf("expected 2 events published, got %d", stats.EventsPublished)
}
if stats.SubscriberCount != 1 {
t.Errorf("expected 1 subscriber, got %d", stats.SubscriberCount)
}
if stats.QueueCapacity != 100 {
t.Errorf("expected queue capacity 100, got %d", stats.QueueCapacity)
}
}
func TestDispatcherQueueFull(t *testing.T) {
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 1})
defer d.Stop()
// Block the processor
blocker := make(chan struct{})
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
<-blocker
}))
// First should succeed
if !d.PublishAsync(NewEventSaved(nil, 1, false, false)) {
t.Error("first async publish should succeed")
}
// Wait for it to be picked up
time.Sleep(10 * time.Millisecond)
// Second should fail (queue full)
if d.PublishAsync(NewEventSaved(nil, 2, false, false)) {
// might succeed if the first was already picked up
}
close(blocker)
}
func TestEventTypes(t *testing.T) {
types := AllEventTypes()
if len(types) == 0 {
t.Error("expected event types to be registered")
}
// Check all types are unique
seen := make(map[string]bool)
for _, typ := range types {
if seen[typ] {
t.Errorf("duplicate event type: %s", typ)
}
seen[typ] = true
}
}
func TestDomainEventInterface(t *testing.T) {
events := []DomainEvent{
NewEventSaved(nil, 1, true, false),
NewEventDeleted([]byte{1}, []byte{2}, 3),
NewFollowListUpdated([]byte{1}, nil, nil),
NewACLMembershipChanged([]byte{1}, "none", "write", "followed"),
NewPolicyConfigUpdated([]byte{1}, map[string]interface{}{"key": "value"}),
NewPolicyFollowsUpdated([]byte{1}, 10, nil),
NewRelayGroupConfigChanged(nil),
NewClusterMembershipChanged(nil, "join"),
NewSyncSerialUpdated(100),
NewUserAuthenticated([]byte{1}, "write", true, "conn-123"),
NewConnectionOpened("conn-123", "192.168.1.1:1234"),
NewConnectionClosed("conn-123", time.Hour, 100, 50),
NewSubscriptionCreated("sub-1", "conn-123", 3),
NewSubscriptionClosed("sub-1", "conn-123", 25),
NewMemberJoined([]byte{1}, "invite-abc"),
NewMemberLeft([]byte{1}),
}
for _, e := range events {
if e.OccurredAt().IsZero() {
t.Errorf("event %s has zero timestamp", e.EventType())
}
if e.EventType() == "" {
t.Error("event has empty type")
}
}
}
func TestSubscriberFunc(t *testing.T) {
var called bool
sf := NewSubscriberFunc(
func(e DomainEvent) { called = true },
func(typ string) bool { return typ == EventSavedType },
)
if !sf.Supports(EventSavedType) {
t.Error("should support EventSavedType")
}
if sf.Supports(EventDeletedType) {
t.Error("should not support EventDeletedType")
}
sf.Handle(NewEventSaved(nil, 1, false, false))
if !called {
t.Error("handler was not called")
}
}

420
pkg/domain/events/events.go

@ -0,0 +1,420 @@ @@ -0,0 +1,420 @@
// Package events provides domain event types and a dispatcher for the ORLY relay.
// Domain events represent significant occurrences in the system that other components
// may want to react to, enabling loose coupling between components.
package events
import (
"time"
"git.mleku.dev/mleku/nostr/encoders/event"
)
// DomainEvent is the base interface for all domain events.
type DomainEvent interface {
// OccurredAt returns when the event occurred.
OccurredAt() time.Time
// EventType returns a string identifier for the event type.
EventType() string
}
// Base provides common fields for all domain events.
type Base struct {
occurredAt time.Time
eventType string
}
// OccurredAt returns when the event occurred.
func (b Base) OccurredAt() time.Time { return b.occurredAt }
// EventType returns the event type identifier.
func (b Base) EventType() string { return b.eventType }
// newBase creates a new Base with the current time.
func newBase(eventType string) Base {
return Base{
occurredAt: time.Now(),
eventType: eventType,
}
}
// =============================================================================
// Event Storage Events
// =============================================================================
// EventSavedType is the event type for EventSaved.
const EventSavedType = "event.saved"
// EventSaved is emitted when a Nostr event is successfully saved to the database.
type EventSaved struct {
Base
Event *event.E // The saved event
Serial uint64 // The assigned serial number
IsAdmin bool // Whether the author is an admin
IsOwner bool // Whether the author is an owner
}
// NewEventSaved creates a new EventSaved domain event.
func NewEventSaved(ev *event.E, serial uint64, isAdmin, isOwner bool) *EventSaved {
return &EventSaved{
Base: newBase(EventSavedType),
Event: ev,
Serial: serial,
IsAdmin: isAdmin,
IsOwner: isOwner,
}
}
// EventDeletedType is the event type for EventDeleted.
const EventDeletedType = "event.deleted"
// EventDeleted is emitted when a Nostr event is deleted.
type EventDeleted struct {
Base
EventID []byte // The deleted event ID
DeletedBy []byte // Pubkey that requested deletion
Serial uint64 // The serial of the deleted event
}
// NewEventDeleted creates a new EventDeleted domain event.
func NewEventDeleted(eventID, deletedBy []byte, serial uint64) *EventDeleted {
return &EventDeleted{
Base: newBase(EventDeletedType),
EventID: eventID,
DeletedBy: deletedBy,
Serial: serial,
}
}
// =============================================================================
// ACL Events
// =============================================================================
// FollowListUpdatedType is the event type for FollowListUpdated.
const FollowListUpdatedType = "acl.followlist.updated"
// FollowListUpdated is emitted when an admin's follow list changes.
type FollowListUpdated struct {
Base
AdminPubkey []byte // The admin whose follow list changed
AddedFollows [][]byte // Pubkeys that were added
RemovedFollows [][]byte // Pubkeys that were removed
}
// NewFollowListUpdated creates a new FollowListUpdated domain event.
func NewFollowListUpdated(adminPubkey []byte, added, removed [][]byte) *FollowListUpdated {
return &FollowListUpdated{
Base: newBase(FollowListUpdatedType),
AdminPubkey: adminPubkey,
AddedFollows: added,
RemovedFollows: removed,
}
}
// ACLMembershipChangedType is the event type for ACLMembershipChanged.
const ACLMembershipChangedType = "acl.membership.changed"
// ACLMembershipChanged is emitted when a pubkey's access level changes.
type ACLMembershipChanged struct {
Base
Pubkey []byte // The affected pubkey
PrevLevel string // Previous access level
NewLevel string // New access level
Reason string // Reason for the change
}
// NewACLMembershipChanged creates a new ACLMembershipChanged domain event.
func NewACLMembershipChanged(pubkey []byte, prevLevel, newLevel, reason string) *ACLMembershipChanged {
return &ACLMembershipChanged{
Base: newBase(ACLMembershipChangedType),
Pubkey: pubkey,
PrevLevel: prevLevel,
NewLevel: newLevel,
Reason: reason,
}
}
// =============================================================================
// Policy Events
// =============================================================================
// PolicyConfigUpdatedType is the event type for PolicyConfigUpdated.
const PolicyConfigUpdatedType = "policy.config.updated"
// PolicyConfigUpdated is emitted when the policy configuration changes.
type PolicyConfigUpdated struct {
Base
UpdatedBy []byte // Pubkey that made the update
Changes map[string]interface{} // Changed configuration keys
}
// NewPolicyConfigUpdated creates a new PolicyConfigUpdated domain event.
func NewPolicyConfigUpdated(updatedBy []byte, changes map[string]interface{}) *PolicyConfigUpdated {
return &PolicyConfigUpdated{
Base: newBase(PolicyConfigUpdatedType),
UpdatedBy: updatedBy,
Changes: changes,
}
}
// PolicyFollowsUpdatedType is the event type for PolicyFollowsUpdated.
const PolicyFollowsUpdatedType = "policy.follows.updated"
// PolicyFollowsUpdated is emitted when policy follow lists are refreshed.
type PolicyFollowsUpdated struct {
Base
AdminPubkey []byte // The admin whose follows were processed
FollowCount int // Number of follows in the updated list
Follows [][]byte // The follow pubkeys (may be nil for large lists)
}
// NewPolicyFollowsUpdated creates a new PolicyFollowsUpdated domain event.
func NewPolicyFollowsUpdated(adminPubkey []byte, followCount int, follows [][]byte) *PolicyFollowsUpdated {
return &PolicyFollowsUpdated{
Base: newBase(PolicyFollowsUpdatedType),
AdminPubkey: adminPubkey,
FollowCount: followCount,
Follows: follows,
}
}
// =============================================================================
// Sync Events
// =============================================================================
// RelayGroupConfigChangedType is the event type for RelayGroupConfigChanged.
const RelayGroupConfigChangedType = "sync.relaygroup.changed"
// RelayGroupConfigChanged is emitted when relay group configuration changes.
type RelayGroupConfigChanged struct {
Base
Event *event.E // The kind 39105 event
}
// NewRelayGroupConfigChanged creates a new RelayGroupConfigChanged domain event.
func NewRelayGroupConfigChanged(ev *event.E) *RelayGroupConfigChanged {
return &RelayGroupConfigChanged{
Base: newBase(RelayGroupConfigChangedType),
Event: ev,
}
}
// ClusterMembershipChangedType is the event type for ClusterMembershipChanged.
const ClusterMembershipChangedType = "sync.cluster.membership.changed"
// ClusterMembershipChanged is emitted when cluster membership changes.
type ClusterMembershipChanged struct {
Base
Event *event.E // The kind 39108 event
Action string // "join" or "leave"
}
// NewClusterMembershipChanged creates a new ClusterMembershipChanged domain event.
func NewClusterMembershipChanged(ev *event.E, action string) *ClusterMembershipChanged {
return &ClusterMembershipChanged{
Base: newBase(ClusterMembershipChangedType),
Event: ev,
Action: action,
}
}
// SyncSerialUpdatedType is the event type for SyncSerialUpdated.
const SyncSerialUpdatedType = "sync.serial.updated"
// SyncSerialUpdated is emitted when the sync manager's serial is updated.
type SyncSerialUpdated struct {
Base
Serial uint64 // The new serial number
}
// NewSyncSerialUpdated creates a new SyncSerialUpdated domain event.
func NewSyncSerialUpdated(serial uint64) *SyncSerialUpdated {
return &SyncSerialUpdated{
Base: newBase(SyncSerialUpdatedType),
Serial: serial,
}
}
// =============================================================================
// Authentication Events
// =============================================================================
// UserAuthenticatedType is the event type for UserAuthenticated.
const UserAuthenticatedType = "auth.user.authenticated"
// UserAuthenticated is emitted when a user successfully authenticates.
type UserAuthenticated struct {
Base
Pubkey []byte // The authenticated pubkey
AccessLevel string // The granted access level
IsFirstTime bool // Whether this is a first-time user
ConnectionID string // The connection ID
}
// NewUserAuthenticated creates a new UserAuthenticated domain event.
func NewUserAuthenticated(pubkey []byte, accessLevel string, isFirstTime bool, connID string) *UserAuthenticated {
return &UserAuthenticated{
Base: newBase(UserAuthenticatedType),
Pubkey: pubkey,
AccessLevel: accessLevel,
IsFirstTime: isFirstTime,
ConnectionID: connID,
}
}
// =============================================================================
// Connection Events
// =============================================================================
// ConnectionOpenedType is the event type for ConnectionOpened.
const ConnectionOpenedType = "connection.opened"
// ConnectionOpened is emitted when a new WebSocket connection is established.
type ConnectionOpened struct {
Base
ConnectionID string // Unique connection identifier
RemoteAddr string // Client IP:port
}
// NewConnectionOpened creates a new ConnectionOpened domain event.
func NewConnectionOpened(connID, remoteAddr string) *ConnectionOpened {
return &ConnectionOpened{
Base: newBase(ConnectionOpenedType),
ConnectionID: connID,
RemoteAddr: remoteAddr,
}
}
// ConnectionClosedType is the event type for ConnectionClosed.
const ConnectionClosedType = "connection.closed"
// ConnectionClosed is emitted when a WebSocket connection is closed.
type ConnectionClosed struct {
Base
ConnectionID string // Unique connection identifier
Duration time.Duration // How long the connection was open
EventsReceived int // Number of events received
EventsPublished int // Number of events published
}
// NewConnectionClosed creates a new ConnectionClosed domain event.
func NewConnectionClosed(connID string, duration time.Duration, received, published int) *ConnectionClosed {
return &ConnectionClosed{
Base: newBase(ConnectionClosedType),
ConnectionID: connID,
Duration: duration,
EventsReceived: received,
EventsPublished: published,
}
}
// =============================================================================
// Subscription Events
// =============================================================================
// SubscriptionCreatedType is the event type for SubscriptionCreated.
const SubscriptionCreatedType = "subscription.created"
// SubscriptionCreated is emitted when a new REQ subscription is created.
type SubscriptionCreated struct {
Base
SubscriptionID string // The subscription ID from REQ
ConnectionID string // The connection this subscription belongs to
FilterCount int // Number of filters in the subscription
}
// NewSubscriptionCreated creates a new SubscriptionCreated domain event.
func NewSubscriptionCreated(subID, connID string, filterCount int) *SubscriptionCreated {
return &SubscriptionCreated{
Base: newBase(SubscriptionCreatedType),
SubscriptionID: subID,
ConnectionID: connID,
FilterCount: filterCount,
}
}
// SubscriptionClosedType is the event type for SubscriptionClosed.
const SubscriptionClosedType = "subscription.closed"
// SubscriptionClosed is emitted when a subscription is closed.
type SubscriptionClosed struct {
Base
SubscriptionID string // The subscription ID
ConnectionID string // The connection this subscription belonged to
EventsMatched int // Number of events that matched this subscription
}
// NewSubscriptionClosed creates a new SubscriptionClosed domain event.
func NewSubscriptionClosed(subID, connID string, eventsMatched int) *SubscriptionClosed {
return &SubscriptionClosed{
Base: newBase(SubscriptionClosedType),
SubscriptionID: subID,
ConnectionID: connID,
EventsMatched: eventsMatched,
}
}
// =============================================================================
// NIP-43 Events
// =============================================================================
// MemberJoinedType is the event type for MemberJoined.
const MemberJoinedType = "nip43.member.joined"
// MemberJoined is emitted when a new member joins via NIP-43.
type MemberJoined struct {
Base
Pubkey []byte // The new member's pubkey
InviteCode string // The invite code used (if any)
}
// NewMemberJoined creates a new MemberJoined domain event.
func NewMemberJoined(pubkey []byte, inviteCode string) *MemberJoined {
return &MemberJoined{
Base: newBase(MemberJoinedType),
Pubkey: pubkey,
InviteCode: inviteCode,
}
}
// MemberLeftType is the event type for MemberLeft.
const MemberLeftType = "nip43.member.left"
// MemberLeft is emitted when a member leaves via NIP-43.
type MemberLeft struct {
Base
Pubkey []byte // The departed member's pubkey
}
// NewMemberLeft creates a new MemberLeft domain event.
func NewMemberLeft(pubkey []byte) *MemberLeft {
return &MemberLeft{
Base: newBase(MemberLeftType),
Pubkey: pubkey,
}
}
// =============================================================================
// Event Type Registry
// =============================================================================
// AllEventTypes returns all registered event type constants.
func AllEventTypes() []string {
return []string{
EventSavedType,
EventDeletedType,
FollowListUpdatedType,
ACLMembershipChangedType,
PolicyConfigUpdatedType,
PolicyFollowsUpdatedType,
RelayGroupConfigChangedType,
ClusterMembershipChangedType,
SyncSerialUpdatedType,
UserAuthenticatedType,
ConnectionOpenedType,
ConnectionClosedType,
SubscriptionCreatedType,
SubscriptionClosedType,
MemberJoinedType,
MemberLeftType,
}
}

225
pkg/event/ingestion/service.go

@ -0,0 +1,225 @@ @@ -0,0 +1,225 @@
// Package ingestion provides a service for orchestrating the event processing pipeline.
// It coordinates validation, special kind handling, authorization, routing, and processing.
package ingestion
import (
"context"
"fmt"
"git.mleku.dev/mleku/nostr/encoders/event"
"next.orly.dev/pkg/event/authorization"
"next.orly.dev/pkg/event/processing"
"next.orly.dev/pkg/event/routing"
"next.orly.dev/pkg/event/specialkinds"
"next.orly.dev/pkg/event/validation"
)
// SprocketChecker checks events against the sprocket (external filter).
type SprocketChecker interface {
IsEnabled() bool
IsDisabled() bool
IsRunning() bool
ProcessEvent(*event.E) (*SprocketResponse, error)
}
// SprocketResponse is the response from sprocket processing.
type SprocketResponse struct {
Action string // "accept", "reject", "shadowReject"
Msg string
}
// ConnectionContext provides connection-specific data for event processing.
type ConnectionContext struct {
// AuthedPubkey is the authenticated pubkey for this connection (may be nil).
AuthedPubkey []byte
// Remote is the remote address of the connection.
Remote string
// ConnectionID uniquely identifies this connection.
ConnectionID string
}
// Result represents the outcome of event ingestion.
type Result struct {
// Accepted indicates the event was accepted for processing.
Accepted bool
// Saved indicates the event was saved to the database.
Saved bool
// Message is an optional message for the OK response.
Message string
// RequireAuth indicates authentication is required.
RequireAuth bool
// Error contains any error that occurred.
Error error
}
// Accepted returns a successful Result.
func Accepted(message string) Result {
return Result{Accepted: true, Saved: true, Message: message}
}
// AcceptedNotSaved returns a Result where the event was accepted but not saved.
func AcceptedNotSaved(message string) Result {
return Result{Accepted: true, Saved: false, Message: message}
}
// Rejected returns a rejection Result.
func Rejected(message string) Result {
return Result{Accepted: false, Message: message}
}
// AuthRequired returns a Result indicating authentication is required.
func AuthRequired(message string) Result {
return Result{Accepted: false, RequireAuth: true, Message: message}
}
// Errored returns a Result with an error.
func Errored(err error) Result {
return Result{Accepted: false, Error: err}
}
// Config configures the ingestion service.
type Config struct {
// SprocketChecker is the optional sprocket checker.
SprocketChecker SprocketChecker
// SpecialKinds is the registry for special kind handlers.
SpecialKinds *specialkinds.Registry
}
// Service orchestrates the event ingestion pipeline.
type Service struct {
validator *validation.Service
authorizer *authorization.Service
router *routing.DefaultRouter
processor *processing.Service
sprocket SprocketChecker
specialKinds *specialkinds.Registry
}
// NewService creates a new ingestion service.
func NewService(
validator *validation.Service,
authorizer *authorization.Service,
router *routing.DefaultRouter,
processor *processing.Service,
cfg Config,
) *Service {
return &Service{
validator: validator,
authorizer: authorizer,
router: router,
processor: processor,
sprocket: cfg.SprocketChecker,
specialKinds: cfg.SpecialKinds,
}
}
// Ingest processes an event through the full ingestion pipeline.
// Returns a Result indicating the outcome.
func (s *Service) Ingest(ctx context.Context, ev *event.E, connCtx *ConnectionContext) Result {
// Stage 1: Event validation (ID, timestamp, signature)
if result := s.validator.ValidateEvent(ev); !result.Valid {
return Rejected(result.Msg)
}
// Stage 2: Sprocket check (if enabled)
if s.sprocket != nil && s.sprocket.IsEnabled() {
if s.sprocket.IsDisabled() {
return Rejected("sprocket disabled - events rejected until sprocket is restored")
}
if !s.sprocket.IsRunning() {
return Rejected("sprocket not running - events rejected until sprocket starts")
}
response, err := s.sprocket.ProcessEvent(ev)
if err != nil {
return Errored(fmt.Errorf("sprocket processing failed: %w", err))
}
switch response.Action {
case "accept":
// Continue processing
case "reject":
return Rejected(response.Msg)
case "shadowReject":
// Accept but don't save
return AcceptedNotSaved("")
}
}
// Stage 3: Special kind handling
if s.specialKinds != nil {
hctx := &specialkinds.HandlerContext{
AuthedPubkey: connCtx.AuthedPubkey,
Remote: connCtx.Remote,
ConnectionID: connCtx.ConnectionID,
}
if result, handled := s.specialKinds.TryHandle(ctx, ev, hctx); handled {
if result.Error != nil {
return Errored(result.Error)
}
if result.Handled {
if result.SaveEvent {
// Handler wants the event saved
procResult := s.processor.Process(ctx, ev)
if procResult.Error != nil {
return Errored(procResult.Error)
}
return Accepted(result.Message)
}
return AcceptedNotSaved(result.Message)
}
// result.Continue - fall through to normal processing
}
}
// Stage 4: Authorization check
decision := s.authorizer.Authorize(ev, connCtx.AuthedPubkey, connCtx.Remote, ev.Kind)
if !decision.Allowed {
if decision.RequireAuth {
return AuthRequired(decision.DenyReason)
}
return Rejected(decision.DenyReason)
}
// Stage 5: Routing (ephemeral events, etc.)
if routeResult := s.router.Route(ev, connCtx.AuthedPubkey); routeResult.Action != routing.Continue {
if routeResult.Action == routing.Handled {
return AcceptedNotSaved(routeResult.Message)
}
if routeResult.Action == routing.Error {
return Errored(fmt.Errorf("routing error: %s", routeResult.Message))
}
}
// Stage 6: Processing (save, hooks, delivery)
procResult := s.processor.Process(ctx, ev)
if procResult.Blocked {
return Rejected(procResult.BlockMsg)
}
if procResult.Error != nil {
return Errored(procResult.Error)
}
return Result{
Accepted: true,
Saved: true,
}
}
// IngestWithRawValidation includes raw JSON validation before unmarshaling.
// Use this when the event hasn't been validated yet.
func (s *Service) IngestWithRawValidation(ctx context.Context, rawJSON []byte, ev *event.E, connCtx *ConnectionContext) Result {
// Stage 0: Raw JSON validation
if result := s.validator.ValidateRawJSON(rawJSON); !result.Valid {
return Rejected(result.Msg)
}
return s.Ingest(ctx, ev, connCtx)
}

87
pkg/event/ingestion/service_test.go

@ -0,0 +1,87 @@ @@ -0,0 +1,87 @@
package ingestion
import (
"testing"
)
func TestResultConstructors(t *testing.T) {
t.Run("Accepted", func(t *testing.T) {
r := Accepted("msg")
if !r.Accepted || !r.Saved || r.RequireAuth || r.Error != nil {
t.Errorf("unexpected Accepted result: %+v", r)
}
if r.Message != "msg" {
t.Errorf("unexpected message: %s", r.Message)
}
})
t.Run("AcceptedNotSaved", func(t *testing.T) {
r := AcceptedNotSaved("msg")
if !r.Accepted || r.Saved || r.RequireAuth || r.Error != nil {
t.Error("unexpected AcceptedNotSaved result")
}
})
t.Run("Rejected", func(t *testing.T) {
r := Rejected("msg")
if r.Accepted || r.Saved || r.RequireAuth || r.Error != nil {
t.Error("unexpected Rejected result")
}
if r.Message != "msg" {
t.Errorf("unexpected message: %s", r.Message)
}
})
t.Run("AuthRequired", func(t *testing.T) {
r := AuthRequired("msg")
if r.Accepted || r.Saved || !r.RequireAuth || r.Error != nil {
t.Error("unexpected AuthRequired result")
}
})
t.Run("Errored", func(t *testing.T) {
err := &testError{msg: "test"}
r := Errored(err)
if r.Accepted || r.Saved || r.RequireAuth || r.Error != err {
t.Error("unexpected Errored result")
}
})
}
type testError struct {
msg string
}
func (e *testError) Error() string {
return e.msg
}
func TestConnectionContext(t *testing.T) {
ctx := &ConnectionContext{
AuthedPubkey: []byte{1, 2, 3},
Remote: "127.0.0.1:1234",
ConnectionID: "conn-123",
}
if string(ctx.AuthedPubkey) != string([]byte{1, 2, 3}) {
t.Error("AuthedPubkey mismatch")
}
if ctx.Remote != "127.0.0.1:1234" {
t.Error("Remote mismatch")
}
if ctx.ConnectionID != "conn-123" {
t.Error("ConnectionID mismatch")
}
}
func TestConfig(t *testing.T) {
cfg := Config{
SprocketChecker: nil,
SpecialKinds: nil,
}
// Just verify Config can be created
if cfg.SprocketChecker != nil {
t.Error("expected nil SprocketChecker")
}
}

175
pkg/event/specialkinds/registry.go

@ -0,0 +1,175 @@ @@ -0,0 +1,175 @@
// Package specialkinds provides a registry for handling special event kinds
// that require custom processing before normal storage/delivery.
// This includes NIP-43 join/leave requests, policy configuration, and
// curating configuration events.
package specialkinds
import (
"context"
"git.mleku.dev/mleku/nostr/encoders/event"
)
// Result represents the outcome of handling a special kind event.
type Result struct {
// Handled indicates the event was fully processed by a handler.
// If true, normal event processing should be skipped.
Handled bool
// Continue indicates processing should continue to normal event handling.
Continue bool
// Error is set if the handler encountered an error.
Error error
// Message is an optional message to include in the OK response.
Message string
// SaveEvent indicates the event should still be saved even if handled.
SaveEvent bool
}
// HandlerContext provides access to connection-specific data needed by handlers.
type HandlerContext struct {
// AuthedPubkey is the authenticated pubkey for this connection (may be nil).
AuthedPubkey []byte
// Remote is the remote address of the connection.
Remote string
// ConnectionID uniquely identifies this connection.
ConnectionID string
}
// Handler processes special event kinds.
type Handler interface {
// CanHandle returns true if this handler can process the given event.
// This is typically based on the event kind and possibly tag values.
CanHandle(ev *event.E) bool
// Handle processes the event.
// Returns a Result indicating the outcome.
Handle(ctx context.Context, ev *event.E, hctx *HandlerContext) Result
// Name returns a descriptive name for this handler (for logging).
Name() string
}
// Registry holds registered special kind handlers.
type Registry struct {
handlers []Handler
}
// NewRegistry creates a new special kind registry.
func NewRegistry() *Registry {
return &Registry{
handlers: make([]Handler, 0),
}
}
// Register adds a handler to the registry.
// Handlers are checked in registration order.
func (r *Registry) Register(h Handler) {
r.handlers = append(r.handlers, h)
}
// TryHandle attempts to handle an event with registered handlers.
// Returns (result, true) if a handler processed the event.
// Returns (nil, false) if no handler matched.
func (r *Registry) TryHandle(ctx context.Context, ev *event.E, hctx *HandlerContext) (*Result, bool) {
for _, h := range r.handlers {
if h.CanHandle(ev) {
result := h.Handle(ctx, ev, hctx)
return &result, true
}
}
return nil, false
}
// ListHandlers returns the names of all registered handlers.
func (r *Registry) ListHandlers() []string {
names := make([]string, len(r.handlers))
for i, h := range r.handlers {
names[i] = h.Name()
}
return names
}
// HandlerCount returns the number of registered handlers.
func (r *Registry) HandlerCount() int {
return len(r.handlers)
}
// =============================================================================
// Handler Function Adapter
// =============================================================================
// HandlerFunc is a function adapter that implements Handler.
type HandlerFunc struct {
name string
canHandleFn func(*event.E) bool
handleFn func(context.Context, *event.E, *HandlerContext) Result
}
// NewHandlerFunc creates a Handler from functions.
func NewHandlerFunc(
name string,
canHandle func(*event.E) bool,
handle func(context.Context, *event.E, *HandlerContext) Result,
) *HandlerFunc {
return &HandlerFunc{
name: name,
canHandleFn: canHandle,
handleFn: handle,
}
}
// CanHandle implements Handler.
func (h *HandlerFunc) CanHandle(ev *event.E) bool {
return h.canHandleFn(ev)
}
// Handle implements Handler.
func (h *HandlerFunc) Handle(ctx context.Context, ev *event.E, hctx *HandlerContext) Result {
return h.handleFn(ctx, ev, hctx)
}
// Name implements Handler.
func (h *HandlerFunc) Name() string {
return h.name
}
// =============================================================================
// Convenience Result Constructors
// =============================================================================
// Handled returns a Result indicating the event was fully handled.
func Handled(message string) Result {
return Result{
Handled: true,
Message: message,
}
}
// HandledWithSave returns a Result indicating the event was handled but should also be saved.
func HandledWithSave(message string) Result {
return Result{
Handled: true,
SaveEvent: true,
Message: message,
}
}
// ContinueProcessing returns a Result indicating normal processing should continue.
func ContinueProcessing() Result {
return Result{
Continue: true,
}
}
// ErrorResult returns a Result with an error.
func ErrorResult(err error) Result {
return Result{
Error: err,
}
}

225
pkg/event/specialkinds/registry_test.go

@ -0,0 +1,225 @@ @@ -0,0 +1,225 @@
package specialkinds
import (
"context"
"errors"
"testing"
"git.mleku.dev/mleku/nostr/encoders/event"
)
func TestNewRegistry(t *testing.T) {
r := NewRegistry()
if r == nil {
t.Fatal("NewRegistry returned nil")
}
if r.HandlerCount() != 0 {
t.Errorf("expected 0 handlers, got %d", r.HandlerCount())
}
}
func TestRegistryRegister(t *testing.T) {
r := NewRegistry()
h := NewHandlerFunc(
"test-handler",
func(ev *event.E) bool { return true },
func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result {
return Handled("test")
},
)
r.Register(h)
if r.HandlerCount() != 1 {
t.Errorf("expected 1 handler, got %d", r.HandlerCount())
}
names := r.ListHandlers()
if len(names) != 1 || names[0] != "test-handler" {
t.Errorf("unexpected handler names: %v", names)
}
}
func TestRegistryTryHandle_NoMatch(t *testing.T) {
r := NewRegistry()
// Register a handler that never matches
h := NewHandlerFunc(
"never-matches",
func(ev *event.E) bool { return false },
func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result {
return Handled("should not be called")
},
)
r.Register(h)
ev := &event.E{Kind: 1}
result, matched := r.TryHandle(context.Background(), ev, nil)
if matched {
t.Error("expected no match, got matched")
}
if result != nil {
t.Error("expected nil result when no match")
}
}
func TestRegistryTryHandle_Match(t *testing.T) {
r := NewRegistry()
// Register a handler that matches kind 1
h := NewHandlerFunc(
"kind-1-handler",
func(ev *event.E) bool { return ev.Kind == 1 },
func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result {
return Handled("processed kind 1")
},
)
r.Register(h)
ev := &event.E{Kind: 1}
result, matched := r.TryHandle(context.Background(), ev, nil)
if !matched {
t.Error("expected match, got no match")
}
if result == nil {
t.Fatal("expected result, got nil")
}
if !result.Handled {
t.Error("expected Handled=true")
}
if result.Message != "processed kind 1" {
t.Errorf("unexpected message: %s", result.Message)
}
}
func TestRegistryTryHandle_FirstMatchWins(t *testing.T) {
r := NewRegistry()
// Register two handlers that both match kind 1
h1 := NewHandlerFunc(
"first-handler",
func(ev *event.E) bool { return ev.Kind == 1 },
func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result {
return Handled("first")
},
)
h2 := NewHandlerFunc(
"second-handler",
func(ev *event.E) bool { return ev.Kind == 1 },
func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result {
return Handled("second")
},
)
r.Register(h1)
r.Register(h2)
ev := &event.E{Kind: 1}
result, _ := r.TryHandle(context.Background(), ev, nil)
if result.Message != "first" {
t.Errorf("expected first handler to win, got message: %s", result.Message)
}
}
func TestHandlerContext(t *testing.T) {
r := NewRegistry()
var capturedCtx *HandlerContext
h := NewHandlerFunc(
"context-capture",
func(ev *event.E) bool { return true },
func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result {
capturedCtx = hctx
return Handled("captured")
},
)
r.Register(h)
hctx := &HandlerContext{
AuthedPubkey: []byte{1, 2, 3},
Remote: "127.0.0.1:12345",
ConnectionID: "conn-abc",
}
r.TryHandle(context.Background(), &event.E{Kind: 1}, hctx)
if capturedCtx == nil {
t.Fatal("handler context not captured")
}
if string(capturedCtx.AuthedPubkey) != string(hctx.AuthedPubkey) {
t.Error("AuthedPubkey mismatch")
}
if capturedCtx.Remote != hctx.Remote {
t.Error("Remote mismatch")
}
if capturedCtx.ConnectionID != hctx.ConnectionID {
t.Error("ConnectionID mismatch")
}
}
func TestResultConstructors(t *testing.T) {
t.Run("Handled", func(t *testing.T) {
r := Handled("msg")
if !r.Handled || r.Continue || r.SaveEvent || r.Error != nil {
t.Error("unexpected Handled result")
}
if r.Message != "msg" {
t.Errorf("unexpected message: %s", r.Message)
}
})
t.Run("HandledWithSave", func(t *testing.T) {
r := HandledWithSave("msg")
if !r.Handled || r.Continue || !r.SaveEvent || r.Error != nil {
t.Error("unexpected HandledWithSave result")
}
})
t.Run("ContinueProcessing", func(t *testing.T) {
r := ContinueProcessing()
if r.Handled || !r.Continue || r.SaveEvent || r.Error != nil {
t.Error("unexpected ContinueProcessing result")
}
})
t.Run("ErrorResult", func(t *testing.T) {
err := errors.New("test error")
r := ErrorResult(err)
if r.Handled || r.Continue || r.SaveEvent || r.Error != err {
t.Error("unexpected ErrorResult result")
}
})
}
func TestHandlerFuncInterface(t *testing.T) {
h := NewHandlerFunc(
"test",
func(ev *event.E) bool { return ev.Kind == 42 },
func(ctx context.Context, ev *event.E, hctx *HandlerContext) Result {
return Handled("42")
},
)
// Check interface compliance
var _ Handler = h
if h.Name() != "test" {
t.Errorf("unexpected name: %s", h.Name())
}
if !h.CanHandle(&event.E{Kind: 42}) {
t.Error("expected CanHandle to return true for kind 42")
}
if h.CanHandle(&event.E{Kind: 1}) {
t.Error("expected CanHandle to return false for kind 1")
}
result := h.Handle(context.Background(), &event.E{Kind: 42}, nil)
if result.Message != "42" {
t.Errorf("unexpected result message: %s", result.Message)
}
}

2
pkg/version/version

@ -1 +1 @@ @@ -1 +1 @@
v0.56.4
v0.56.5

Loading…
Cancel
Save