Skip to content

Commit a479d87

Browse files
committed
[PROGRESS]
1 parent d851fda commit a479d87

File tree

5 files changed

+75
-19
lines changed

5 files changed

+75
-19
lines changed

tests/unit/test_subject.py

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import unittest
22
from unittest import mock
33

4-
from yamq import subject, observer
4+
from yamq import subject, observer, message
55

66

77
class TestSubjectSTOMP(unittest.TestCase):
@@ -60,9 +60,73 @@ def test_subscribers_are_preserved(self):
6060

6161
self.assertDictEqual(obj_2.observers, {observer_1: "auto"})
6262

63-
import pdb; pdb.set_trace() # XXX BREAKPOINT
6463
obj_3 = subject.SubjectSTOMP(name="test_queue_1", loop=self.loop)
6564
self.assertDictEqual(obj_3.observers, {
6665
observer_1: "auto",
6766
observer_2: "auto"
6867
})
68+
69+
obj_1.delete()
70+
obj_2.delete()
71+
obj_3.delete()
72+
73+
def test_get(self):
74+
obj_1 = subject.SubjectSTOMP(name="test_queue_1", loop=self.loop)
75+
returned_obj = subject.SubjectSTOMP.get(obj_1.name)
76+
self.assertIs(obj_1, returned_obj)
77+
78+
returned_obj = subject.SubjectSTOMP.get("test_queue_2")
79+
self.assertIsNone(returned_obj)
80+
81+
obj_1.delete()
82+
83+
def test_delete(self):
84+
obj_1 = subject.SubjectSTOMP(name="test_queue_1", loop=self.loop)
85+
returned_obj = subject.SubjectSTOMP.get("test_queue_1")
86+
87+
self.assertIs(obj_1, returned_obj)
88+
89+
obj_1.delete()
90+
returned_obj = subject.SubjectSTOMP.get("test_queue_1")
91+
92+
self.assertIsNone(returned_obj)
93+
94+
def test_unsubscribe(self):
95+
obj_1 = subject.SubjectSTOMP(name="test_queue_1", loop=self.loop)
96+
97+
observer_1 = observer.ObserverSTOMP(self.loop, self.transport)
98+
observer_2 = observer.ObserverSTOMP(self.loop, self.transport)
99+
100+
obj_1.subscribe(observer_1, "auto")
101+
obj_1.subscribe(observer_2, "auto")
102+
103+
self.assertDictEqual(obj_1.observers, {
104+
observer_1: "auto",
105+
observer_2: "auto"
106+
})
107+
108+
obj_1.unsubscribe(observer_1)
109+
110+
self.assertDictEqual(obj_1.observers, {observer_2: "auto"})
111+
112+
obj_1.unsubscribe(observer_2)
113+
self.assertDictEqual(obj_1.observers, {})
114+
115+
# It must call the subject_obj.delete() if there are no observers.
116+
self.assertIsNone(subject.SubjectSTOMP.get('test_queue_1'))
117+
obj_1.delete()
118+
119+
def test_z_notify(self):
120+
obj_1 = subject.SubjectSTOMP(name="test_queue_1", loop=self.loop)
121+
122+
observer_1 = observer.ObserverSTOMP(self.loop, self.transport)
123+
observer_2 = observer.ObserverSTOMP(self.loop, self.transport)
124+
125+
message_obj = message.Message("Test Message")
126+
127+
observer_1.subscribe(obj_1, "auto", "1")
128+
observer_2.subscribe(obj_1, "auto", "2")
129+
130+
obj_1.notify(message_obj)
131+
132+
obj_1.delete()

yamq/observer.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from bidict import bidict
22

3-
from yamq import stomp
3+
import stomp
44

55

66
class Observer:
@@ -21,7 +21,6 @@ def __init__(self, loop, transport):
2121
self.subscriptions = bidict() # subscription_id: subject_obj
2222

2323
def subscribe(self, subject, ack_type, subscription_id):
24-
import pdb; pdb.set_trace() # XXX BREAKPOINT
2524
subject.subscribe(self, ack_type)
2625
self.subscriptions[subscription_id] = subject
2726

@@ -30,7 +29,6 @@ def unsubscribe(self, subscription_id):
3029
subject.unsubscribe(self)
3130

3231
def update_auto(self, message_frame):
33-
import pdb; pdb.set_trace() # XXX BREAKPOINT
3432
self.transport(stomp.dumps(message_frame))
3533

3634
def update_client(self, message_frame):
@@ -44,8 +42,7 @@ def delete(self):
4442
subject.unsubscribe(self)
4543

4644
def update(self, subject, message, ack):
47-
import pdb; pdb.set_trace() # XXX BREAKPOINT
48-
subscrption_id = self.subscriptions.inv[subject]
45+
subscription_id = self.subscriptions.inv[subject]
4946
frame = stomp.MessageFrame(
5047
message=message,
5148
destination=subject.name,

yamq/server.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,11 @@ def connect(self):
5252
pass
5353

5454
def send(self, destination, raw_message, **headers):
55-
import pdb; pdb.set_trace() # XXX BREAKPOINT
5655
user_subject = subject.SubjectSTOMP(name=destination, loop=event_loop)
5756
message_obj = message.Message(raw_message)
5857
user_subject.notify(message_obj)
5958

6059
def subscribe(self, subscription_id, destination, ack="auto", **headers):
61-
import pdb; pdb.set_trace() # XXX BREAKPOINT
6260
user_subject = subject.SubjectSTOMP(destination, loop=event_loop)
6361
self.observer.subscribe(user_subject, ack, subscription_id)
6462

yamq/stomp/parse.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def loads(raw_frame):
2222
return Frame(
2323
command=lines[0],
2424
headers=headers,
25-
body=lines[body_start:]
25+
body='\r\n'.join(lines[body_start:])
2626
)
2727

2828

yamq/subject.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,12 @@ def __new__(cls, name, loop):
3333
obj = cls._objects[name]
3434
except KeyError as e:
3535
obj = super().__new__(cls)
36+
obj.observers = {}
37+
obj.name = name
38+
obj.loop = loop
3639
cls._objects[name] = obj
3740
return obj
3841

39-
def __init__(self, name, loop):
40-
print("Init called for : {}".format(name))
41-
self.observers = {} # observer_object: ack_type
42-
self.name = name
43-
self.loop = loop
44-
4542
def __repr__(self):
4643
return "<Subject object: %s>" % (self.name,)
4744

@@ -65,9 +62,9 @@ def unsubscribe(self, observer):
6562
# TODO: I am not sure you should silently pass
6663
pass
6764

68-
if not self.objservers:
65+
if not self.observers:
6966
self.delete()
7067

7168
def notify(self, message):
72-
for observer, ack_type in self.observers:
73-
observer.update(message, ack_type, subject=self)
69+
for observer, ack_type in self.observers.items():
70+
observer.update(self, message, ack_type)

0 commit comments

Comments
 (0)