You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
245 lines
5.7 KiB
245 lines
5.7 KiB
package events |
|
|
|
import ( |
|
"sync" |
|
"sync/atomic" |
|
"testing" |
|
"time" |
|
) |
|
|
|
func TestDispatcherPublish(t *testing.T) { |
|
d := NewDispatcher(DefaultDispatcherConfig()) |
|
defer d.Stop() |
|
|
|
var received atomic.Int32 |
|
|
|
d.Subscribe(NewSubscriberForTypes(func(e DomainEvent) { |
|
received.Add(1) |
|
}, EventSavedType)) |
|
|
|
// Should be received |
|
d.Publish(NewEventSaved(nil, 1, false, false)) |
|
|
|
// Should not be received (different type) |
|
d.Publish(NewEventDeleted(nil, nil, 1)) |
|
|
|
if received.Load() != 1 { |
|
t.Errorf("expected 1 event received, got %d", received.Load()) |
|
} |
|
} |
|
|
|
func TestDispatcherPublishAsync(t *testing.T) { |
|
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100}) |
|
defer d.Stop() |
|
|
|
var received atomic.Int32 |
|
var wg sync.WaitGroup |
|
|
|
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) { |
|
received.Add(1) |
|
wg.Done() |
|
})) |
|
|
|
wg.Add(10) |
|
for i := 0; i < 10; i++ { |
|
d.PublishAsync(NewEventSaved(nil, uint64(i), false, false)) |
|
} |
|
|
|
// Wait for all events to be processed |
|
done := make(chan struct{}) |
|
go func() { |
|
wg.Wait() |
|
close(done) |
|
}() |
|
|
|
select { |
|
case <-done: |
|
case <-time.After(time.Second): |
|
t.Fatal("timeout waiting for async events") |
|
} |
|
|
|
if received.Load() != 10 { |
|
t.Errorf("expected 10 events received, got %d", received.Load()) |
|
} |
|
} |
|
|
|
func TestDispatcherMultipleSubscribers(t *testing.T) { |
|
d := NewDispatcher(DefaultDispatcherConfig()) |
|
defer d.Stop() |
|
|
|
var count1, count2 atomic.Int32 |
|
|
|
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) { |
|
count1.Add(1) |
|
})) |
|
|
|
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) { |
|
count2.Add(1) |
|
})) |
|
|
|
d.Publish(NewEventSaved(nil, 1, false, false)) |
|
|
|
if count1.Load() != 1 || count2.Load() != 1 { |
|
t.Errorf("expected both subscribers to receive event, got %d and %d", count1.Load(), count2.Load()) |
|
} |
|
} |
|
|
|
func TestDispatcherUnsubscribe(t *testing.T) { |
|
d := NewDispatcher(DefaultDispatcherConfig()) |
|
defer d.Stop() |
|
|
|
var received atomic.Int32 |
|
|
|
sub := NewSubscriberForAll(func(e DomainEvent) { |
|
received.Add(1) |
|
}) |
|
|
|
d.Subscribe(sub) |
|
d.Publish(NewEventSaved(nil, 1, false, false)) |
|
|
|
d.Unsubscribe(sub) |
|
d.Publish(NewEventSaved(nil, 2, false, false)) |
|
|
|
if received.Load() != 1 { |
|
t.Errorf("expected 1 event received after unsubscribe, got %d", received.Load()) |
|
} |
|
} |
|
|
|
func TestDispatcherTypedSubscription(t *testing.T) { |
|
d := NewDispatcher(DefaultDispatcherConfig()) |
|
defer d.Stop() |
|
|
|
var savedCount atomic.Int32 |
|
var deletedCount atomic.Int32 |
|
|
|
d.OnEventSaved(func(e *EventSaved) { |
|
savedCount.Add(1) |
|
}) |
|
|
|
d.OnEventDeleted(func(e *EventDeleted) { |
|
deletedCount.Add(1) |
|
}) |
|
|
|
d.Publish(NewEventSaved(nil, 1, false, false)) |
|
d.Publish(NewEventDeleted(nil, nil, 1)) |
|
|
|
if savedCount.Load() != 1 { |
|
t.Errorf("expected 1 saved event, got %d", savedCount.Load()) |
|
} |
|
if deletedCount.Load() != 1 { |
|
t.Errorf("expected 1 deleted event, got %d", deletedCount.Load()) |
|
} |
|
} |
|
|
|
func TestDispatcherStats(t *testing.T) { |
|
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 100}) |
|
defer d.Stop() |
|
|
|
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) {})) |
|
|
|
d.Publish(NewEventSaved(nil, 1, false, false)) |
|
d.Publish(NewEventSaved(nil, 2, false, false)) |
|
|
|
stats := d.Stats() |
|
if stats.EventsPublished != 2 { |
|
t.Errorf("expected 2 events published, got %d", stats.EventsPublished) |
|
} |
|
if stats.SubscriberCount != 1 { |
|
t.Errorf("expected 1 subscriber, got %d", stats.SubscriberCount) |
|
} |
|
if stats.QueueCapacity != 100 { |
|
t.Errorf("expected queue capacity 100, got %d", stats.QueueCapacity) |
|
} |
|
} |
|
|
|
func TestDispatcherQueueFull(t *testing.T) { |
|
d := NewDispatcher(DispatcherConfig{AsyncBufferSize: 1}) |
|
defer d.Stop() |
|
|
|
// Block the processor |
|
blocker := make(chan struct{}) |
|
d.Subscribe(NewSubscriberForAll(func(e DomainEvent) { |
|
<-blocker |
|
})) |
|
|
|
// First should succeed |
|
if !d.PublishAsync(NewEventSaved(nil, 1, false, false)) { |
|
t.Error("first async publish should succeed") |
|
} |
|
|
|
// Wait for it to be picked up |
|
time.Sleep(10 * time.Millisecond) |
|
|
|
// Second should fail (queue full) |
|
if d.PublishAsync(NewEventSaved(nil, 2, false, false)) { |
|
// might succeed if the first was already picked up |
|
} |
|
|
|
close(blocker) |
|
} |
|
|
|
func TestEventTypes(t *testing.T) { |
|
types := AllEventTypes() |
|
if len(types) == 0 { |
|
t.Error("expected event types to be registered") |
|
} |
|
|
|
// Check all types are unique |
|
seen := make(map[string]bool) |
|
for _, typ := range types { |
|
if seen[typ] { |
|
t.Errorf("duplicate event type: %s", typ) |
|
} |
|
seen[typ] = true |
|
} |
|
} |
|
|
|
func TestDomainEventInterface(t *testing.T) { |
|
events := []DomainEvent{ |
|
NewEventSaved(nil, 1, true, false), |
|
NewEventDeleted([]byte{1}, []byte{2}, 3), |
|
NewFollowListUpdated([]byte{1}, nil, nil), |
|
NewACLMembershipChanged([]byte{1}, "none", "write", "followed"), |
|
NewPolicyConfigUpdated([]byte{1}, map[string]interface{}{"key": "value"}), |
|
NewPolicyFollowsUpdated([]byte{1}, 10, nil), |
|
NewRelayGroupConfigChanged(nil), |
|
NewClusterMembershipChanged(nil, "join"), |
|
NewSyncSerialUpdated(100), |
|
NewUserAuthenticated([]byte{1}, "write", true, "conn-123"), |
|
NewConnectionOpened("conn-123", "192.168.1.1:1234"), |
|
NewConnectionClosed("conn-123", time.Hour, 100, 50), |
|
NewSubscriptionCreated("sub-1", "conn-123", 3), |
|
NewSubscriptionClosed("sub-1", "conn-123", 25), |
|
NewMemberJoined([]byte{1}, "invite-abc"), |
|
NewMemberLeft([]byte{1}), |
|
} |
|
|
|
for _, e := range events { |
|
if e.OccurredAt().IsZero() { |
|
t.Errorf("event %s has zero timestamp", e.EventType()) |
|
} |
|
if e.EventType() == "" { |
|
t.Error("event has empty type") |
|
} |
|
} |
|
} |
|
|
|
func TestSubscriberFunc(t *testing.T) { |
|
var called bool |
|
sf := NewSubscriberFunc( |
|
func(e DomainEvent) { called = true }, |
|
func(typ string) bool { return typ == EventSavedType }, |
|
) |
|
|
|
if !sf.Supports(EventSavedType) { |
|
t.Error("should support EventSavedType") |
|
} |
|
if sf.Supports(EventDeletedType) { |
|
t.Error("should not support EventDeletedType") |
|
} |
|
|
|
sf.Handle(NewEventSaved(nil, 1, false, false)) |
|
if !called { |
|
t.Error("handler was not called") |
|
} |
|
}
|
|
|