Skip to content

Commit

Permalink
tests: lwm2m: Refactor to use module scoped DUT
Browse files Browse the repository at this point in the history
When testcases share one DUT per module, we save the time
from running bootstrap on each testcase.
Each testcase start with DUT that is registered.

Signed-off-by: Seppo Takalo <[email protected]>
  • Loading branch information
SeppoTakalo authored and carlescufi committed Nov 7, 2023
1 parent 944ad18 commit 86efc9f
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 308 deletions.
148 changes: 148 additions & 0 deletions tests/net/lib/lwm2m/interop/pytest/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""
Common test fixtures
####################
Copyright (c) 2023 Nordic Semiconductor ASA
SPDX-License-Identifier: Apache-2.0
"""

import time
import logging
import os
import binascii
import random
import string
import pytest
from leshan import Leshan

from twister_harness import Shell
from twister_harness import DeviceAdapter

LESHAN_IP: str = '192.0.2.2'
COAP_PORT: int = 5683
COAPS_PORT: int = 5684
BOOTSTRAP_COAPS_PORT: int = 5784

logger = logging.getLogger(__name__)

@pytest.fixture(scope='session')
def leshan() -> Leshan:
"""
Fixture that returns a Leshan object for interacting with the Leshan server.
:return: The Leshan object.
:rtype: Leshan
"""
try:
return Leshan("http://localhost:8080/api")
except RuntimeError:
pytest.skip('Leshan server not available')

@pytest.fixture(scope='session')
def leshan_bootstrap() -> Leshan:
"""
Fixture that returns a Leshan object for interacting with the Bootstrap Leshan server.
:return: The Leshan object.
:rtype: Leshan
"""
try:
return Leshan("http://localhost:8081/api")
except RuntimeError:
pytest.skip('Leshan Bootstrap server not available')

@pytest.fixture(scope='module')
def helperclient() -> object:
"""
Fixture that returns a helper client object for testing.
:return: The helper client object.
:rtype: object
"""
try:
from coapthon.client.helperclient import HelperClient
except ModuleNotFoundError:
pytest.skip('CoAPthon3 package not installed')
return HelperClient(server=('127.0.0.1', COAP_PORT))


@pytest.fixture(scope='module')
def endpoint_nosec(shell: Shell, dut: DeviceAdapter, leshan: Leshan) -> str:
"""Fixture that returns an endpoint that starts on no-secure mode"""
# Allow engine to start & stop once.
time.sleep(2)

# Generate randon device id and password (PSK key)
ep = 'client_' + binascii.b2a_hex(os.urandom(1)).decode()

#
# Registration Interface test cases (using Non-secure mode)
#
shell.exec_command(f'lwm2m write 0/0/0 -s coap://{LESHAN_IP}:{COAP_PORT}')
shell.exec_command('lwm2m write 0/0/1 -b 0')
shell.exec_command('lwm2m write 0/0/2 -u8 3')
shell.exec_command(f'lwm2m write 0/0/3 -s {ep}')
shell.exec_command('lwm2m create 1/0')
shell.exec_command('lwm2m write 0/0/10 -u16 1')
shell.exec_command('lwm2m write 1/0/0 -u16 1')
shell.exec_command('lwm2m write 1/0/1 -u32 86400')
shell.exec_command(f'lwm2m start {ep} -b 0')
dut.readlines_until(regex=f"RD Client started with endpoint '{ep}'", timeout=10.0)

yield ep

# All done
shell.exec_command('lwm2m stop')
dut.readlines_until(regex=r'.*Deregistration success', timeout=10.0)

@pytest.fixture(scope='module')
def endpoint_bootstrap(shell: Shell, dut: DeviceAdapter, leshan: Leshan, leshan_bootstrap: Leshan) -> str:
"""Fixture that returns an endpoint that starts the bootstrap."""
try:
# Generate randon device id and password (PSK key)
ep = 'client_' + binascii.b2a_hex(os.urandom(1)).decode()
bs_passwd = ''.join(random.choice(string.ascii_lowercase) for i in range(16))
passwd = ''.join(random.choice(string.ascii_lowercase) for i in range(16))

logger.debug('Endpoint: %s', ep)
logger.debug('Boostrap PSK: %s', binascii.b2a_hex(bs_passwd.encode()).decode())
logger.debug('PSK: %s', binascii.b2a_hex(passwd.encode()).decode())

# Create device entries in Leshan and Bootstrap server
leshan_bootstrap.create_bs_device(ep, f'coaps://{LESHAN_IP}:{COAPS_PORT}', bs_passwd, passwd)
leshan.create_psk_device(ep, passwd)

# Allow engine to start & stop once.
time.sleep(2)

# Write bootsrap server information and PSK keys
shell.exec_command(f'lwm2m write 0/0/0 -s coaps://{LESHAN_IP}:{BOOTSTRAP_COAPS_PORT}')
shell.exec_command('lwm2m write 0/0/1 -b 1')
shell.exec_command('lwm2m write 0/0/2 -u8 0')
shell.exec_command(f'lwm2m write 0/0/3 -s {ep}')
shell.exec_command(f'lwm2m write 0/0/5 -s {bs_passwd}')
shell.exec_command(f'lwm2m start {ep} -b 1')

yield ep

shell.exec_command('lwm2m stop')
dut.readlines_until(regex=r'.*Deregistration success', timeout=10.0)

finally:
# Remove device and bootstrap information
# Leshan does not accept non-secure connection if device information is provided with PSK
leshan.delete_device(ep)
leshan_bootstrap.delete_bs_device(ep)

@pytest.fixture(scope='module')
def endpoint_registered(endpoint_bootstrap, shell: Shell, dut: DeviceAdapter) -> str:
"""Fixture that returns an endpoint that is registered."""
dut.readlines_until(regex='.*Registration Done', timeout=5.0)
return endpoint_bootstrap

@pytest.fixture(scope='module')
def endpoint(endpoint_registered) -> str:
"""Fixture that returns an endpoint that is registered."""
return endpoint_registered
63 changes: 50 additions & 13 deletions tests/net/lib/lwm2m/interop/pytest/leshan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Copyright (c) 2023 Nordic Semiconductor ASA
#
# SPDX-License-Identifier: Apache-2.0
"""
REST client for Leshan demo server
##################################
Copyright (c) 2023 Nordic Semiconductor ASA
SPDX-License-Identifier: Apache-2.0
"""

from __future__ import annotations

Expand All @@ -12,7 +18,9 @@
from contextlib import contextmanager

class Leshan:
"""This class represents a Leshan client that interacts with demo server's REAT API"""
def __init__(self, url: str):
"""Initialize Leshan client and check if server is available"""
self.api_url = url
self.timeout = 10
#self.format = 'TLV'
Expand All @@ -22,8 +30,8 @@ def __init__(self, url: str):
resp = self.get('/security/clients')
if not isinstance(resp, list):
raise RuntimeError('Did not receive list of endpoints')
except requests.exceptions.ConnectionError:
raise RuntimeError('Leshan not responding')
except requests.exceptions.ConnectionError as exc:
raise RuntimeError('Leshan not responding') from exc

@staticmethod
def handle_response(resp: requests.models.Response):
Expand All @@ -47,23 +55,26 @@ def handle_response(resp: requests.models.Response):
return None

def get(self, path: str):
"""Send HTTP GET query"""
"""Send HTTP GET query with typical parameters"""
params = {'timeout': self.timeout}
if self.format is not None:
params['format'] = self.format
resp = self._s.get(f'{self.api_url}{path}', params=params, timeout=self.timeout)
return Leshan.handle_response(resp)

def put_raw(self, path: str, data: str | dict | None = None, headers: dict | None = None):
"""Send HTTP PUT query without any default parameters"""
resp = self._s.put(f'{self.api_url}{path}', data=data, headers=headers, timeout=self.timeout)
return Leshan.handle_response(resp)

def put(self, path: str, data: str | dict, uri_options: str = ''):
"""Send HTTP PUT query with typical parameters"""
if isinstance(data, dict):
data = json.dumps(data)
return self.put_raw(f'{path}?timeout={self.timeout}&format={self.format}' + uri_options, data=data, headers={'content-type': 'application/json'})

def post(self, path: str, data: str | dict | None = None):
"""Send HTTP POST query"""
if isinstance(data, dict):
data = json.dumps(data)
if data is not None:
Expand All @@ -76,13 +87,16 @@ def post(self, path: str, data: str | dict | None = None):
return Leshan.handle_response(resp)

def delete(self, path: str):
"""Send HTTP DELETE query"""
resp = self._s.delete(f'{self.api_url}{path}', timeout=self.timeout)
return Leshan.handle_response(resp)

def execute(self, endpoint: str, path: str):
"""Send LwM2M EXECUTE command"""
return self.post(f'/clients/{endpoint}/{path}')

def write(self, endpoint: str, path: str, value: bool | int | str):
"""Send LwM2M WRITE command to a single resource or resource instance"""
if len(path.split('/')) == 3:
kind = 'singleResource'
else:
Expand All @@ -91,14 +105,17 @@ def write(self, endpoint: str, path: str, value: bool | int | str):
return self.put(f'/clients/{endpoint}/{path}', self._define_resource(rid, value, kind))

def update_obj_instance(self, endpoint: str, path: str, resources: dict):
"""Update object instance"""
data = self._define_obj_inst(path, resources)
return self.put(f'/clients/{endpoint}/{path}', data, uri_options='&replace=false')

def replace_obj_instance(self, endpoint: str, path: str, resources: dict):
"""Replace object instance"""
data = self._define_obj_inst(path, resources)
return self.put(f'/clients/{endpoint}/{path}', data, uri_options='&replace=true')

def create_obj_instance(self, endpoint: str, path: str, resources: dict):
"""Send LwM2M CREATE command"""
data = self._define_obj_inst(path, resources)
path = '/'.join(path.split('/')[:-1]) # Create call should not have instance ID in path
return self.post(f'/clients/{endpoint}/{path}', data)
Expand All @@ -124,13 +141,15 @@ def _type_to_string(cls, value):

@classmethod
def _convert_type(cls, value):
"""Wrapper for special types that are not understood by Json"""
if isinstance(value, datetime):
return int(value.timestamp())
else:
return value

@classmethod
def _define_obj_inst(cls, path: str, resources: dict):
"""Define an object instance for Leshan"""
data = {
"kind": "instance",
"id": int(path.split('/')[-1]), # ID is last element of path
Expand All @@ -146,6 +165,7 @@ def _define_obj_inst(cls, path: str, resources: dict):

@classmethod
def _define_resource(cls, rid, value, kind='singleResource'):
"""Define a resource for Leshan"""
if kind in ('singleResource', 'resourceInstance'):
return {
"id": rid,
Expand Down Expand Up @@ -208,6 +228,7 @@ def _decode_obj(cls, content):
return {content['id']: instances}

def read(self, endpoint: str, path: str):
"""Send LwM2M READ command and decode the response to a Python dictionary"""
resp = self.get(f'/clients/{endpoint}/{path}')
if not resp['success']:
return resp
Expand All @@ -223,9 +244,10 @@ def read(self, endpoint: str, path: str):
raise RuntimeError(f'Unhandled type {content["kind"]}')

@classmethod
def parse_composite(cls, content: dict):
def parse_composite(cls, payload: dict):
"""Decode the Leshan's response to composite query back to a Python dictionary"""
data = {}
for path, content in content.items():
for path, content in payload.items():
keys = [int(key) for key in path.lstrip("/").split('/')]
if len(keys) == 1:
data.update(cls._decode_obj(content))
Expand All @@ -251,14 +273,22 @@ def parse_composite(cls, content: dict):
raise RuntimeError(f'Unhandled path {path}')
return data

def composite_read(self, endpoint: str, paths: list[str]):
paths = [path if path.startswith('/') else '/' + path for path in paths]
def _composite_params(self, paths: list[str] | None = None):
"""Common URI parameters for composite query"""
parameters = {
'pathformat': self.format,
'nodeformat': self.format,
'timeout': self.timeout,
'paths': ','.join(paths)
'timeout': self.timeout
}
if paths is not None:
paths = [path if path.startswith('/') else '/' + path for path in paths]
parameters['paths'] = ','.join(paths)

return parameters

def composite_read(self, endpoint: str, paths: list[str]):
"""Send LwM2M Composite-Read command and decode the response to a Python dictionary"""
parameters = self._composite_params(paths)
resp = self._s.get(f'{self.api_url}/clients/{endpoint}/composite', params=parameters, timeout=self.timeout)
payload = Leshan.handle_response(resp)
if not payload['status'] == 'CONTENT(205)':
Expand All @@ -267,7 +297,7 @@ def composite_read(self, endpoint: str, paths: list[str]):

def composite_write(self, endpoint: str, resources: dict):
"""
Do LwM2m composite write operation.
Send LwM2m Composite-Write operation.
Targeted resources are defined as a dictionary with the following structure:
{
Expand Down Expand Up @@ -356,11 +386,18 @@ def get_event_stream(self, endpoint: str):
r.close()

class LeshanEventsIterator:
"""Iterator for Leshan event stream"""
def __init__(self, req: requests.Response, timeout: int):
"""Initialize the iterator in line mode"""
self._it = req.iter_lines(chunk_size=1, decode_unicode=True)
self._timeout = timeout

def next_event(self, event: str):
"""
Finds the next occurrence of a specific event in the stream.
If timeout happens, the returns None.
"""
timeout = time.time() + self._timeout
try:
for line in self._it:
Expand Down
Loading

0 comments on commit 86efc9f

Please sign in to comment.