forked from django/channels
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_layers.py
65 lines (53 loc) · 2 KB
/
test_layers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import unittest
import pytest
from django.test import override_settings
from channels import DEFAULT_CHANNEL_LAYER
from channels.exceptions import InvalidChannelLayerError
from channels.layers import InMemoryChannelLayer, channel_layers, get_channel_layer
class TestChannelLayerManager(unittest.TestCase):
@override_settings(
CHANNEL_LAYERS={"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}
)
def test_config_error(self):
"""
If channel layer doesn't specify TEST_CONFIG, `make_test_backend`
should result into error.
"""
with self.assertRaises(InvalidChannelLayerError):
channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)
@override_settings(
CHANNEL_LAYERS={
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
"TEST_CONFIG": {"expiry": 100500},
}
}
)
def test_config_instance(self):
"""
If channel layer provides TEST_CONFIG, `make_test_backend` should
return channel layer instance appropriate for testing.
"""
layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)
self.assertEqual(layer.expiry, 100500)
def test_override_settings(self):
"""
The channel layers cache is reset when the CHANNEL_LAYERS setting
changes.
"""
with override_settings(
CHANNEL_LAYERS={
"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}
}
):
self.assertEqual(channel_layers.backends, {})
get_channel_layer()
self.assertNotEqual(channel_layers.backends, {})
self.assertEqual(channel_layers.backends, {})
### In-memory layer tests
@pytest.mark.asyncio
async def test_send_receive():
layer = InMemoryChannelLayer()
message = {"type": "test.message"}
await layer.send("test.channel", message)
assert message == await layer.receive("test.channel")