You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

245 lines
5.7 KiB

package events
import (
"sync"
"sync/atomic"
"testing"
"time"
)
func TestDispatcherPublish(t *testing.T) {
d := NewDispatcher(DefaultDispatcherConfig())
defer d.Stop()
var received atomic.Int32
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) {
received.Add(1)
}, EventSavedType))
// Should be received
d.Publish(NewEventSaved(nil, 1, false, false))
// Should not be received (different type)
d.Publish(NewEventDeleted(nil, nil, 1))
if received.Load() != 1 {
t.Errorf("expected 1 event received, got %d", received.Load())
}
}
func TestDispatcherPublishAsync(t *testing.T) {
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100})
defer d.Stop()
var received atomic.Int32
var wg sync.WaitGroup
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
received.Add(1)
wg.Done()
}))
wg.Add(10)
for i := 0; i < 10; i++ {
d.PublishAsync(NewEventSaved(nil, uint64(i), false, false))
}
// Wait for all events to be processed
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("timeout waiting for async events")
}
if received.Load() != 10 {
t.Errorf("expected 10 events received, got %d", received.Load())
}
}
func TestDispatcherMultipleSubscribers(t *testing.T) {
d := NewDispatcher(DefaultDispatcherConfig())
defer d.Stop()
var count1, count2 atomic.Int32
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
count1.Add(1)
}))
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
count2.Add(1)
}))
d.Publish(NewEventSaved(nil, 1, false, false))
if count1.Load() != 1 || count2.Load() != 1 {
t.Errorf("expected both subscribers to receive event, got %d and %d", count1.Load(), count2.Load())
}
}
func TestDispatcherUnsubscribe(t *testing.T) {
d := NewDispatcher(DefaultDispatcherConfig())
defer d.Stop()
var received atomic.Int32
sub := NewSubscriberForAll(func(e DomainEvent) {
received.Add(1)
})
d.Subscribe(sub)
d.Publish(NewEventSaved(nil, 1, false, false))
d.Unsubscribe(sub)
d.Publish(NewEventSaved(nil, 2, false, false))
if received.Load() != 1 {
t.Errorf("expected 1 event received after unsubscribe, got %d", received.Load())
}
}
func TestDispatcherTypedSubscription(t *testing.T) {
d := NewDispatcher(DefaultDispatcherConfig())
defer d.Stop()
var savedCount atomic.Int32
var deletedCount atomic.Int32
d.OnEventSaved(func(e *EventSaved) {
savedCount.Add(1)
})
d.OnEventDeleted(func(e *EventDeleted) {
deletedCount.Add(1)
})
d.Publish(NewEventSaved(nil, 1, false, false))
d.Publish(NewEventDeleted(nil, nil, 1))
if savedCount.Load() != 1 {
t.Errorf("expected 1 saved event, got %d", savedCount.Load())
}
if deletedCount.Load() != 1 {
t.Errorf("expected 1 deleted event, got %d", deletedCount.Load())
}
}
func TestDispatcherStats(t *testing.T) {
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100})
defer d.Stop()
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {}))
d.Publish(NewEventSaved(nil, 1, false, false))
d.Publish(NewEventSaved(nil, 2, false, false))
stats := d.Stats()
if stats.EventsPublished != 2 {
t.Errorf("expected 2 events published, got %d", stats.EventsPublished)
}
if stats.SubscriberCount != 1 {
t.Errorf("expected 1 subscriber, got %d", stats.SubscriberCount)
}
if stats.QueueCapacity != 100 {
t.Errorf("expected queue capacity 100, got %d", stats.QueueCapacity)
}
}
func TestDispatcherQueueFull(t *testing.T) {
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 1})
defer d.Stop()
// Block the processor
blocker := make(chan struct{})
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {
<-blocker
}))
// First should succeed
if !d.PublishAsync(NewEventSaved(nil, 1, false, false)) {
t.Error("first async publish should succeed")
}
// Wait for it to be picked up
time.Sleep(10 * time.Millisecond)
// Second should fail (queue full)
if d.PublishAsync(NewEventSaved(nil, 2, false, false)) {
// might succeed if the first was already picked up
}
close(blocker)
}
func TestEventTypes(t *testing.T) {
types := AllEventTypes()
if len(types) == 0 {
t.Error("expected event types to be registered")
}
// Check all types are unique
seen := make(map[string]bool)
for _, typ := range types {
if seen[typ] {
t.Errorf("duplicate event type: %s", typ)
}
seen[typ] = true
}
}
func TestDomainEventInterface(t *testing.T) {
events := []DomainEvent{
NewEventSaved(nil, 1, true, false),
NewEventDeleted([]byte{1}, []byte{2}, 3),
NewFollowListUpdated([]byte{1}, nil, nil),
NewACLMembershipChanged([]byte{1}, "none", "write", "followed"),
NewPolicyConfigUpdated([]byte{1}, map[string]interface{}{"key": "value"}),
NewPolicyFollowsUpdated([]byte{1}, 10, nil),
NewRelayGroupConfigChanged(nil),
NewClusterMembershipChanged(nil, "join"),
NewSyncSerialUpdated(100),
NewUserAuthenticated([]byte{1}, "write", true, "conn-123"),
NewConnectionOpened("conn-123", "192.168.1.1:1234"),
NewConnectionClosed("conn-123", time.Hour, 100, 50),
NewSubscriptionCreated("sub-1", "conn-123", 3),
NewSubscriptionClosed("sub-1", "conn-123", 25),
NewMemberJoined([]byte{1}, "invite-abc"),
NewMemberLeft([]byte{1}),
}
for _, e := range events {
if e.OccurredAt().IsZero() {
t.Errorf("event %s has zero timestamp", e.EventType())
}
if e.EventType() == "" {
t.Error("event has empty type")
}
}
}
func TestSubscriberFunc(t *testing.T) {
var called bool
sf := NewSubscriberFunc(
func(e DomainEvent) { called = true },
func(typ string) bool { return typ == EventSavedType },
)
if !sf.Supports(EventSavedType) {
t.Error("should support EventSavedType")
}
if sf.Supports(EventDeletedType) {
t.Error("should not support EventDeletedType")
}
sf.Handle(NewEventSaved(nil, 1, false, false))
if !called {
t.Error("handler was not called")
}
}