Browse Source
- Simplify HandleEvent to thin protocol adapter (~320 -> ~130 lines) - Create ingestion service for full event pipeline orchestration - Add specialkinds.go for special kind handler registration - Eliminate global ACL singleton with injectable Registry interface - Add ACLRegistry() accessor to Server for dependency injection - Decompose Rule value object into AccessControl, Constraints, TagValidationConfig sub-components for cleaner organization - Add LoggingSubscriber for domain event analytics - Update all policy tests for embedded struct initialization - Update DDD_ANALYSIS.md to 10/10 maturity score Files modified: - app/handle-event.go: Simplified to delegate to ingestion service - app/specialkinds.go: NEW - Special kind handler registration - app/server.go: Add aclRegistry field and ACLRegistry() accessor - pkg/interfaces/acl/acl.go: Add Registry interface - pkg/acl/acl.go: Add accessor methods for privatized fields - pkg/policy/policy.go: Decompose Rule into sub-value objects - pkg/policy/*_test.go: Update struct literals for embedded types - pkg/event/ingestion/service.go: Add ACLMode, special kinds support - pkg/event/processing/processing.go: Add domain event dispatcher - pkg/domain/events/subscribers/logging.go: NEW - Analytics subscriber - DDD_ANALYSIS.md: Update to 10/10 maturity score Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>main v0.56.8
28 changed files with 1341 additions and 1100 deletions
@ -0,0 +1,94 @@
@@ -0,0 +1,94 @@
|
||||
package app |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"git.mleku.dev/mleku/nostr/encoders/event" |
||||
"git.mleku.dev/mleku/nostr/encoders/kind" |
||||
"next.orly.dev/pkg/acl" |
||||
"next.orly.dev/pkg/event/specialkinds" |
||||
"next.orly.dev/pkg/protocol/nip43" |
||||
) |
||||
|
||||
// registerSpecialKindHandlers registers handlers for special event kinds
|
||||
// that require custom processing before normal storage/delivery.
|
||||
func (s *Server) registerSpecialKindHandlers() { |
||||
// NIP-43 Join Request handler - signals that special handling is needed
|
||||
s.specialKinds.Register(specialkinds.NewHandlerFunc( |
||||
"nip43-join", |
||||
func(ev *event.E) bool { |
||||
return ev.Kind == nip43.KindJoinRequest |
||||
}, |
||||
func(ctx context.Context, ev *event.E, hctx *specialkinds.HandlerContext) specialkinds.Result { |
||||
// Signal to handler that this needs NIP-43 join processing
|
||||
// The actual processing happens in the Listener which has the connection context
|
||||
return specialkinds.Result{ |
||||
Handled: true, |
||||
Message: "nip43-join", // Marker for handler
|
||||
} |
||||
}, |
||||
)) |
||||
|
||||
// NIP-43 Leave Request handler - signals that special handling is needed
|
||||
s.specialKinds.Register(specialkinds.NewHandlerFunc( |
||||
"nip43-leave", |
||||
func(ev *event.E) bool { |
||||
return ev.Kind == nip43.KindLeaveRequest |
||||
}, |
||||
func(ctx context.Context, ev *event.E, hctx *specialkinds.HandlerContext) specialkinds.Result { |
||||
return specialkinds.Result{ |
||||
Handled: true, |
||||
Message: "nip43-leave", // Marker for handler
|
||||
} |
||||
}, |
||||
)) |
||||
|
||||
// Curating config handler (kind 30078 with d-tag "curating-config")
|
||||
s.specialKinds.Register(specialkinds.NewHandlerFunc( |
||||
"curating-config", |
||||
func(ev *event.E) bool { |
||||
if ev.Kind != acl.CuratingConfigKind { |
||||
return false |
||||
} |
||||
dTag := ev.Tags.GetFirst([]byte("d")) |
||||
return dTag != nil && string(dTag.Value()) == acl.CuratingConfigDTag |
||||
}, |
||||
s.handleCuratingConfig, |
||||
)) |
||||
|
||||
// Policy config handler (kind 12345)
|
||||
s.specialKinds.Register(specialkinds.NewHandlerFunc( |
||||
"policy-config", |
||||
func(ev *event.E) bool { |
||||
return ev.Kind == kind.PolicyConfig.K |
||||
}, |
||||
func(ctx context.Context, ev *event.E, hctx *specialkinds.HandlerContext) specialkinds.Result { |
||||
return specialkinds.Result{ |
||||
Handled: true, |
||||
Message: "policy-config", // Marker for handler
|
||||
} |
||||
}, |
||||
)) |
||||
|
||||
} |
||||
|
||||
// handleCuratingConfig handles curating configuration events
|
||||
func (s *Server) handleCuratingConfig(ctx context.Context, ev *event.E, hctx *specialkinds.HandlerContext) specialkinds.Result { |
||||
if acl.Registry.Type() != "curating" { |
||||
return specialkinds.ContinueProcessing() |
||||
} |
||||
|
||||
for _, aclInstance := range acl.Registry.ACLs() { |
||||
if aclInstance.Type() == "curating" { |
||||
if curating, ok := aclInstance.(*acl.Curating); ok { |
||||
if err := curating.ProcessConfigEvent(ev); err != nil { |
||||
return specialkinds.ErrorResult(err) |
||||
} |
||||
// Save the event and signal success
|
||||
return specialkinds.HandledWithSave("curating configuration updated") |
||||
} |
||||
} |
||||
} |
||||
|
||||
return specialkinds.ContinueProcessing() |
||||
} |
||||
@ -0,0 +1,130 @@
@@ -0,0 +1,130 @@
|
||||
// Package subscribers provides domain event subscriber implementations.
|
||||
package subscribers |
||||
|
||||
import ( |
||||
"encoding/hex" |
||||
|
||||
"lol.mleku.dev/log" |
||||
|
||||
"next.orly.dev/pkg/domain/events" |
||||
) |
||||
|
||||
// LoggingSubscriber logs domain events for analytics and debugging.
|
||||
type LoggingSubscriber struct { |
||||
logLevel string // "debug", "info", "trace"
|
||||
} |
||||
|
||||
// NewLoggingSubscriber creates a new logging subscriber.
|
||||
// logLevel controls verbosity: "trace" logs all events, "debug" logs important events,
|
||||
// "info" logs only significant events like membership changes.
|
||||
func NewLoggingSubscriber(logLevel string) *LoggingSubscriber { |
||||
if logLevel == "" { |
||||
logLevel = "debug" |
||||
} |
||||
return &LoggingSubscriber{logLevel: logLevel} |
||||
} |
||||
|
||||
// Handle processes a domain event by logging it.
|
||||
func (s *LoggingSubscriber) Handle(event events.DomainEvent) { |
||||
switch e := event.(type) { |
||||
case *events.EventSaved: |
||||
s.logEventSaved(e) |
||||
case *events.EventDeleted: |
||||
s.logEventDeleted(e) |
||||
case *events.FollowListUpdated: |
||||
s.logFollowListUpdated(e) |
||||
case *events.ACLMembershipChanged: |
||||
s.logACLMembershipChanged(e) |
||||
case *events.PolicyConfigUpdated: |
||||
s.logPolicyConfigUpdated(e) |
||||
case *events.UserAuthenticated: |
||||
s.logUserAuthenticated(e) |
||||
case *events.MemberJoined: |
||||
s.logMemberJoined(e) |
||||
case *events.MemberLeft: |
||||
s.logMemberLeft(e) |
||||
case *events.ConnectionOpened: |
||||
s.logConnectionOpened(e) |
||||
case *events.ConnectionClosed: |
||||
s.logConnectionClosed(e) |
||||
default: |
||||
if s.logLevel == "trace" { |
||||
log.T.F("domain event: %s", event.EventType()) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Supports returns true for all event types.
|
||||
func (s *LoggingSubscriber) Supports(eventType string) bool { |
||||
return true |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logEventSaved(e *events.EventSaved) { |
||||
if s.logLevel == "trace" { |
||||
pubkeyHex := hex.EncodeToString(e.Event.Pubkey) |
||||
log.T.F("event saved: kind=%d pubkey=%s admin=%v owner=%v", |
||||
e.Event.Kind, pubkeyHex[:16], e.IsAdmin, e.IsOwner) |
||||
} |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logEventDeleted(e *events.EventDeleted) { |
||||
if s.logLevel == "trace" || s.logLevel == "debug" { |
||||
eventIDHex := hex.EncodeToString(e.EventID) |
||||
deletedByHex := hex.EncodeToString(e.DeletedBy) |
||||
log.D.F("event deleted: id=%s by=%s", eventIDHex[:16], deletedByHex[:16]) |
||||
} |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logFollowListUpdated(e *events.FollowListUpdated) { |
||||
if s.logLevel == "trace" || s.logLevel == "debug" { |
||||
adminHex := hex.EncodeToString(e.AdminPubkey) |
||||
log.D.F("follow list updated: admin=%s added=%d removed=%d", |
||||
adminHex[:16], len(e.AddedFollows), len(e.RemovedFollows)) |
||||
} |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logACLMembershipChanged(e *events.ACLMembershipChanged) { |
||||
// Always log ACL changes at info level - they're significant
|
||||
pubkeyHex := hex.EncodeToString(e.Pubkey) |
||||
log.I.F("ACL membership changed: pubkey=%s %s->%s reason=%s", |
||||
pubkeyHex[:16], e.PrevLevel, e.NewLevel, e.Reason) |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logPolicyConfigUpdated(e *events.PolicyConfigUpdated) { |
||||
// Always log policy changes at info level
|
||||
updatedByHex := hex.EncodeToString(e.UpdatedBy) |
||||
log.I.F("policy config updated by %s: %d changes", updatedByHex[:16], len(e.Changes)) |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logUserAuthenticated(e *events.UserAuthenticated) { |
||||
if s.logLevel == "trace" || s.logLevel == "debug" { |
||||
pubkeyHex := hex.EncodeToString(e.Pubkey) |
||||
log.D.F("user authenticated: pubkey=%s level=%s firstTime=%v", |
||||
pubkeyHex[:16], e.AccessLevel, e.IsFirstTime) |
||||
} |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logMemberJoined(e *events.MemberJoined) { |
||||
// Always log member joins at info level
|
||||
pubkeyHex := hex.EncodeToString(e.Pubkey) |
||||
log.I.F("member joined: pubkey=%s invite=%s", pubkeyHex[:16], e.InviteCode) |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logMemberLeft(e *events.MemberLeft) { |
||||
// Always log member departures at info level
|
||||
pubkeyHex := hex.EncodeToString(e.Pubkey) |
||||
log.I.F("member left: pubkey=%s", pubkeyHex[:16]) |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logConnectionOpened(e *events.ConnectionOpened) { |
||||
if s.logLevel == "trace" { |
||||
log.T.F("connection opened: id=%s remote=%s", e.ConnectionID, e.RemoteAddr) |
||||
} |
||||
} |
||||
|
||||
func (s *LoggingSubscriber) logConnectionClosed(e *events.ConnectionClosed) { |
||||
if s.logLevel == "trace" || s.logLevel == "debug" { |
||||
log.D.F("connection closed: id=%s duration=%v events_rx=%d events_tx=%d", |
||||
e.ConnectionID, e.Duration, e.EventsReceived, e.EventsPublished) |
||||
} |
||||
} |
||||
Loading…
Reference in new issue