-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtest_set_property.py
62 lines (49 loc) · 1.88 KB
/
test_set_property.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#
# Copyright © 2025 Agora
# This file is part of TEN Framework, an open source project.
# Licensed under the Apache License, Version 2.0, with certain conditions.
# Refer to the "LICENSE" file in the root directory for more information.
#
from pathlib import Path
from ten import (
ExtensionTester,
TenEnvTester,
Cmd,
CmdResult,
StatusCode,
)
import httpx
import threading
class ExtensionTesterSetProperty(ExtensionTester):
def __init__(self):
super().__init__()
self.thread = None
def on_cmd(self, ten_env: TenEnvTester, cmd: Cmd) -> None:
ten_env.log_debug(f"on_cmd name {cmd.get_name()}")
ten_env.return_result(CmdResult.create(StatusCode.OK), cmd)
def on_start(self, ten_env: TenEnvTester) -> None:
self.thread = threading.Thread(target=self._async_test, args=[ten_env])
self.thread.start()
ten_env.on_start_done()
def _async_test(self, ten_env: TenEnvTester) -> None:
property_json = {"num": 1, "str": "111"}
r = httpx.post("http://127.0.0.1:8899/cmd/abc", json=property_json, timeout=5)
ten_env.log_debug(f"{r}")
if r.status_code == httpx.codes.OK:
ten_env.stop_test()
def test_set_property():
# change port
property_json_1 = '{"listen_port":8899}'
tester_1 = ExtensionTesterSetProperty()
tester_1.set_test_mode_single("http_server_python", property_json_1)
tester_1.run()
# change port with localhost
property_json_2 = '{"listen_addr":"127.0.0.1","listen_port":8899}'
tester_2 = ExtensionTesterSetProperty()
tester_2.set_test_mode_single("http_server_python", property_json_2)
tester_2.run()
# change port with any addr
property_json_3 = '{"listen_addr":"0.0.0.0","listen_port":8899}'
tester_3 = ExtensionTesterSetProperty()
tester_3.set_test_mode_single("http_server_python", property_json_3)
tester_3.run()