You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

268 lines
6.7 KiB

// Package processing provides event processing services for the ORLY relay.
// It handles event persistence, delivery to subscribers, and post-save hooks.
package processing
import (
"context"
"strings"
"time"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/kind"
)
// Result contains the outcome of event processing.
type Result struct {
Saved bool
Duplicate bool
Blocked bool
BlockMsg string
Error error
}
// OK returns a successful processing result.
func OK() Result {
return Result{Saved: true}
}
// Blocked returns a blocked processing result.
func Blocked(msg string) Result {
return Result{Blocked: true, BlockMsg: msg}
}
// Failed returns an error processing result.
func Failed(err error) Result {
return Result{Error: err}
}
// Database abstracts database operations for event processing.
type Database interface {
// SaveEvent saves an event to the database.
SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error)
// CheckForDeleted checks if an event has been deleted.
CheckForDeleted(ev *event.E, adminOwners [][]byte) error
}
// Publisher abstracts event delivery to subscribers.
type Publisher interface {
// Deliver sends an event to all matching subscribers.
Deliver(ev *event.E)
}
// RateLimiter abstracts rate limiting for write operations.
type RateLimiter interface {
// IsEnabled returns whether rate limiting is enabled.
IsEnabled() bool
// Wait blocks until the rate limit allows the operation.
Wait(ctx context.Context, opType int) error
}
// SyncManager abstracts sync manager for serial updates.
type SyncManager interface {
// UpdateSerial updates the serial number after saving an event.
UpdateSerial()
}
// ACLRegistry abstracts ACL registry for reconfiguration.
type ACLRegistry interface {
// Configure reconfigures the ACL system.
Configure(cfg ...any) error
// Active returns the active ACL mode.
Active() string
}
// RelayGroupManager handles relay group configuration events.
type RelayGroupManager interface {
// ValidateRelayGroupEvent validates a relay group config event.
ValidateRelayGroupEvent(ev *event.E) error
// HandleRelayGroupEvent processes a relay group event.
HandleRelayGroupEvent(ev *event.E, syncMgr any)
}
// ClusterManager handles cluster membership events.
type ClusterManager interface {
// HandleMembershipEvent processes a cluster membership event.
HandleMembershipEvent(ev *event.E) error
}
// Config holds configuration for the processing service.
type Config struct {
Admins [][]byte
Owners [][]byte
WriteTimeout time.Duration
}
// DefaultConfig returns the default processing configuration.
func DefaultConfig() *Config {
return &Config{
WriteTimeout: 30 * time.Second,
}
}
// Service implements event processing.
type Service struct {
cfg *Config
db Database
publisher Publisher
rateLimiter RateLimiter
syncManager SyncManager
aclRegistry ACLRegistry
relayGroupMgr RelayGroupManager
clusterManager ClusterManager
}
// New creates a new processing service.
func New(cfg *Config, db Database, publisher Publisher) *Service {
if cfg == nil {
cfg = DefaultConfig()
}
return &Service{
cfg: cfg,
db: db,
publisher: publisher,
}
}
// SetRateLimiter sets the rate limiter.
func (s *Service) SetRateLimiter(rl RateLimiter) {
s.rateLimiter = rl
}
// SetSyncManager sets the sync manager.
func (s *Service) SetSyncManager(sm SyncManager) {
s.syncManager = sm
}
// SetACLRegistry sets the ACL registry.
func (s *Service) SetACLRegistry(acl ACLRegistry) {
s.aclRegistry = acl
}
// SetRelayGroupManager sets the relay group manager.
func (s *Service) SetRelayGroupManager(rgm RelayGroupManager) {
s.relayGroupMgr = rgm
}
// SetClusterManager sets the cluster manager.
func (s *Service) SetClusterManager(cm ClusterManager) {
s.clusterManager = cm
}
// Process saves an event and triggers delivery.
func (s *Service) Process(ctx context.Context, ev *event.E) Result {
// Check if event was previously deleted (skip for "none" ACL mode and delete events)
// Delete events (kind 5) shouldn't be blocked by existing deletes
if ev.Kind != kind.EventDeletion.K && s.aclRegistry != nil && s.aclRegistry.Active() != "none" {
adminOwners := append(s.cfg.Admins, s.cfg.Owners...)
if err := s.db.CheckForDeleted(ev, adminOwners); err != nil {
if strings.HasPrefix(err.Error(), "blocked:") {
errStr := err.Error()[len("blocked: "):]
return Blocked(errStr)
}
}
}
// Save the event
result := s.saveEvent(ctx, ev)
if !result.Saved {
return result
}
// Run post-save hooks
s.runPostSaveHooks(ev)
// Deliver the event to subscribers
s.deliver(ev)
return OK()
}
// saveEvent handles rate limiting and database persistence.
func (s *Service) saveEvent(ctx context.Context, ev *event.E) Result {
// Create timeout context
saveCtx, cancel := context.WithTimeout(ctx, s.cfg.WriteTimeout)
defer cancel()
// Apply rate limiting
if s.rateLimiter != nil && s.rateLimiter.IsEnabled() {
const writeOpType = 1 // ratelimit.Write
s.rateLimiter.Wait(saveCtx, writeOpType)
}
// Save to database
_, err := s.db.SaveEvent(saveCtx, ev)
if err != nil {
if strings.HasPrefix(err.Error(), "blocked:") {
errStr := err.Error()[len("blocked: "):]
return Blocked(errStr)
}
return Failed(err)
}
return OK()
}
// deliver sends event to subscribers.
func (s *Service) deliver(ev *event.E) {
cloned := ev.Clone()
go s.publisher.Deliver(cloned)
}
// runPostSaveHooks handles side effects after event persistence.
func (s *Service) runPostSaveHooks(ev *event.E) {
// Handle relay group configuration events
if s.relayGroupMgr != nil {
if err := s.relayGroupMgr.ValidateRelayGroupEvent(ev); err == nil {
if s.syncManager != nil {
s.relayGroupMgr.HandleRelayGroupEvent(ev, s.syncManager)
}
}
}
// Handle cluster membership events (Kind 39108)
if ev.Kind == 39108 && s.clusterManager != nil {
s.clusterManager.HandleMembershipEvent(ev)
}
// Update serial for distributed synchronization
if s.syncManager != nil {
s.syncManager.UpdateSerial()
}
// ACL reconfiguration for admin events
if s.isAdminEvent(ev) {
if ev.Kind == kind.FollowList.K || ev.Kind == kind.RelayListMetadata.K {
if s.aclRegistry != nil {
go s.aclRegistry.Configure()
}
}
}
}
// isAdminEvent checks if event is from admin or owner.
func (s *Service) isAdminEvent(ev *event.E) bool {
for _, admin := range s.cfg.Admins {
if fastEqual(admin, ev.Pubkey) {
return true
}
}
for _, owner := range s.cfg.Owners {
if fastEqual(owner, ev.Pubkey) {
return true
}
}
return false
}
// fastEqual compares two byte slices for equality.
func fastEqual(a, b []byte) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}