-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpubsub_test.go
165 lines (136 loc) · 3.39 KB
/
pubsub_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package ibsync
import (
"sync"
"testing"
"time"
)
// Test basic Publish and Subscribe
func TestPublishSubscribe(t *testing.T) {
topic := "test_topic"
msg := "test message"
// Subscribe to a topic
ch, unsubscribe := Subscribe(topic)
defer unsubscribe()
// Publish a message to the topic
Publish(topic, msg)
// Verify that the subscriber receives the message
select {
case received := <-ch:
if received != msg {
t.Errorf("Expected message %s, but got %s", msg, received)
}
case <-time.After(1 * time.Second):
t.Error("Did not receive message on subscribed channel")
}
}
// Test multiple subscribers
func TestMultipleSubscribers(t *testing.T) {
topic := "multi_subscribers_topic"
msg := "hello, subscribers!"
// Subscribe multiple channels to the same topic
ch1, unsubscribeCh1 := Subscribe(topic)
defer unsubscribeCh1()
ch2, unsubscribeCh2 := Subscribe(topic)
defer unsubscribeCh2()
// Publish a message to the topic
Publish(topic, msg)
// Verify that both subscribers receive the message
for _, ch := range []<-chan string{ch1, ch2} {
select {
case received := <-ch:
if received != msg {
t.Errorf("Expected message %s, but got %s", msg, received)
}
case <-time.After(1 * time.Second):
t.Error("Did not receive message on subscribed channel")
}
}
}
// Test Unsubscribe
func TestUnsubscribe(t *testing.T) {
topic := "unsubscribe_test"
ch, _ := Subscribe(topic)
Unsubscribe(topic, ch)
select {
case _, open := <-ch:
if open {
t.Error("Expected channel to be closed after unsubscribe")
}
default:
// Success, channel was properly closed
}
}
// Test UnsubscribeAll
func TestUnsubscribeAll(t *testing.T) {
topic := "unsubscribe_all_test"
ch1, _ := Subscribe(topic)
ch2, _ := Subscribe(topic)
UnsubscribeAll(topic)
// Verify that both channels are closed
for _, ch := range []<-chan string{ch1, ch2} {
select {
case _, open := <-ch:
if open {
t.Error("Expected channel to be closed after UnsubscribeAll")
}
default:
// Success, channel was properly closed
}
}
}
// Test Publish without subscribers
func TestPublishWithoutSubscribers(t *testing.T) {
topic := "no_subscriber_topic"
Publish(topic, "no subscribers") // No channels subscribed, should proceed without errors
}
// Test Publish while unsubscribing in parallel
func TestPublishUnsubscribeParallel(t *testing.T) {
topic := "parallel_publish_unsubscribe"
msg := "parallel message"
var wg sync.WaitGroup
wg.Add(2)
_, unsubscribe := Subscribe(topic)
go func() {
defer wg.Done()
Publish(topic, msg)
}()
go func() {
defer wg.Done()
unsubscribe()
}()
wg.Wait()
}
func BenchmarkPubSub(b *testing.B) {
reqID := 1
eurusd := NewForex("EUR", "IDEALPRO", "USD")
contractDetails := NewContractDetails()
contractDetails.Contract = *eurusd
b.ResetTimer()
for i := 0; i < b.N; i++ {
ch, cancel := Subscribe(reqID)
Publish(reqID, Encode(contractDetails))
msg := <-ch
var cd ContractDetails
if err := Decode(&cd, msg); err != nil {
return
}
cancel()
}
}
func BenchmarkPubSubBuffered(b *testing.B) {
reqID := 1
eurusd := NewForex("EUR", "IDEALPRO", "USD")
contractDetails := NewContractDetails()
contractDetails.Contract = *eurusd
b.ResetTimer()
for i := 0; i < b.N; i++ {
ch, cancel := Subscribe(reqID, 100)
Publish(reqID, Encode(contractDetails))
msg := <-ch
var cd ContractDetails
if err := Decode(&cd, msg); err != nil {
return
}
cancel()
}
}