Skip to content

use_cases

Use cases to interact with devices.

Modules:

Name Description
cpe

Use Cases to check the performance of CPE.

device_getters

Device getters use cases.

device_utilities

Miscellaneous Use Cases to interact with devices.

dhcp

Boardfarm LGI DHCP IPv4 Use Cases.

image_comparison

Compare images.

iperf

Common Iperf use cases.

multicast

Multicast Use Cases.

networking

Common Networking use cases.

ripv2

RIPv2 Use Cases library.

voice

Voice use cases library.

wifi

Wi-Fi Use Cases library.

cpe

Use Cases to check the performance of CPE.

Functions:

Name Description
board_reset_via_console

Reset board via console.

create_upnp_rule

Create UPnP rule on the device.

delete_upnp_rule

Delete UPnP rule on the device.

disable_logs

Disable logs for the specified component on the CPE.

enable_logs

Enable logs for the specified component on the CPE.

factory_reset

Perform factory reset CPE via given method.

get_cpe_provisioning_mode

Get the provisioning mode of the board.

get_cpu_usage

Return the current CPU usage of CPE.

get_memory_usage

Return the memory usage of CPE.

get_seconds_uptime

Return board uptime in seconds.

is_ntp_synchronized

Get the NTP synchronization status.

is_tr069_agent_running

Check if TR069 agent is running or not.

read_tcpdump

Read the tcpdump packets and delete the capture file afterwards.

tcpdump

Contextmanager to perform tcpdump on the board.

transfer_file_via_scp

Copy files and directories between the board and the remote host.

upload_file_to_tftp

Transfer file onto tftp server.

board_reset_via_console

board_reset_via_console(board: CPE) -> None

Reset board via console.

.. hint:: This Use Case implements statements from the test suite such as:

- Reboot from console.

Parameters:

Name Type Description Default

board

CPE

The board instance

required
Source code in boardfarm3/use_cases/cpe.py
230
231
232
233
234
235
236
237
238
239
240
241
def board_reset_via_console(board: CPE) -> None:
    """Reset board via console.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Reboot from console.

    :param board: The board instance
    :type board: CPE
    """
    board.sw.reset(method="sw")
    board.sw.wait_for_boot()

create_upnp_rule

create_upnp_rule(
    device: LAN, int_port: str, ext_port: str, protocol: str, url: str | None = None
) -> str

Create UPnP rule on the device.

.. hint:: This Use Case implements statements from the test suite such as:

- Create UPnP rule through cli.

Parameters:

Name Type Description Default

device

LAN

LAN device instance

required

int_port

str

internal port for UPnP

required

ext_port

str

external port for UPnP

required

protocol

str

protocol to be used

required

url

str | None

url to be used

None

Returns:

Type Description
str

output of UPnP add port command

Source code in boardfarm3/use_cases/cpe.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_upnp_rule(
    device: LAN, int_port: str, ext_port: str, protocol: str, url: str | None = None
) -> str:
    """Create UPnP rule on the device.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Create UPnP rule through cli.

    :param device: LAN device instance
    :type device: LAN
    :param int_port: internal port for UPnP
    :type int_port: str
    :param ext_port: external port for UPnP
    :type ext_port: str
    :param protocol: protocol to be used
    :type protocol: str
    :param url: url to be used
    :type url: str | None
    :return: output of UPnP add port command
    :rtype: str
    """
    if url is None:
        url = _UPNP_URL.safe_substitute(IP=device.get_default_gateway())
    return device.create_upnp_rule(int_port, ext_port, protocol, url)

delete_upnp_rule

delete_upnp_rule(device: LAN, ext_port: str, protocol: str, url: str | None) -> str

Delete UPnP rule on the device.

.. hint:: This Use Case implements statements from the test suite such as:

- Delete UPnP rule through cli.

Parameters:

Name Type Description Default

device

LAN

LAN device instance

required

ext_port

str

external port for UPnP

required

protocol

str

protocol to be used

required

url

str | None

url to be used

required

Returns:

Type Description
str

output of UPnP delete port command

Source code in boardfarm3/use_cases/cpe.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def delete_upnp_rule(device: LAN, ext_port: str, protocol: str, url: str | None) -> str:
    """Delete UPnP rule on the device.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Delete UPnP rule through cli.

    :param device: LAN device instance
    :type device: LAN
    :param ext_port: external port for UPnP
    :type ext_port: str
    :param protocol: protocol to be used
    :type protocol: str
    :param url: url to be used
    :type url: str | None
    :return: output of UPnP delete port command
    :rtype: str
    """
    if url is None:
        url = _UPNP_URL.safe_substitute(IP=device.get_default_gateway())
    return device.delete_upnp_rule(ext_port, protocol, url)

disable_logs

disable_logs(board: CPE, component: str) -> None

Disable logs for the specified component on the CPE.

.. note:: - The component can be one of "voice" and "pacm" for mv2p - The component can be one of "voice", "docsis", "common_components", "gw", "vfe", "vendor_cbn", "pacm" for mv1

Parameters:

Name Type Description Default

board

CPE

The board instance

required

component

str

The component for which logs have to disabled.

required
Source code in boardfarm3/use_cases/cpe.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def disable_logs(board: CPE, component: str) -> None:
    """Disable logs for the specified component on the CPE.

    .. note::
        - The component can be one of "voice" and "pacm" for mv2p
        - The component can be one of "voice", "docsis", "common_components",
            "gw", "vfe", "vendor_cbn", "pacm" for mv1

    :param board: The board instance
    :type board: CPE
    :param component: The component for which logs have to disabled.
    :type component: str
    """
    board.sw.enable_logs(component=component, flag="disable")

enable_logs

enable_logs(board: CPE, component: str) -> None

Enable logs for the specified component on the CPE.

.. note:: - The component can be one of "voice" and "pacm" for mv2p - The component can be one of "voice", "docsis", "common_components", "gw", "vfe", "vendor_cbn", "pacm" for mv1

Parameters:

Name Type Description Default

board

CPE

The board instance

required

component

str

The component for which logs have to be enabled.

required
Source code in boardfarm3/use_cases/cpe.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def enable_logs(board: CPE, component: str) -> None:
    """Enable logs for the specified component on the CPE.

    .. note::
        - The component can be one of "voice" and "pacm" for mv2p
        - The component can be one of "voice", "docsis", "common_components",
            "gw", "vfe", "vendor_cbn", "pacm" for mv1

    :param board: The board instance
    :type board: CPE
    :param component: The component for which logs have to be enabled.
    :type component: str
    """
    board.sw.enable_logs(component=component, flag="enable")

factory_reset

factory_reset(board: CPE, method: None | str = None) -> bool

Perform factory reset CPE via given method.

Parameters:

Name Type Description Default

board

CPE

The board instance.

required

method

None | str

Factory reset method

None

Returns:

Type Description
bool

True on successful factory reset

Source code in boardfarm3/use_cases/cpe.py
184
185
186
187
188
189
190
191
192
193
194
def factory_reset(board: CPE, method: None | str = None) -> bool:
    """Perform factory reset CPE via given method.

    :param board: The board instance.
    :type board: CPE
    :param method: Factory reset method
    :type method: None | str
    :return: True on successful factory reset
    :rtype: bool
    """
    return board.sw.factory_reset(method)

get_cpe_provisioning_mode

get_cpe_provisioning_mode(board: CPE) -> str

Get the provisioning mode of the board.

Parameters:

Name Type Description Default

board

CPE

The board object, from which the provisioning mode is fetched.

required

Returns:

Type Description
str

The provisioning mode of the board.

Source code in boardfarm3/use_cases/cpe.py
219
220
221
222
223
224
225
226
227
def get_cpe_provisioning_mode(board: CPE) -> str:
    """Get the provisioning mode of the board.

    :param board: The board object, from which the provisioning mode is fetched.
    :type board: CPE
    :return: The provisioning mode of the board.
    :rtype: str
    """
    return board.sw.get_provision_mode()

get_cpu_usage

get_cpu_usage(board: CPE) -> float

Return the current CPU usage of CPE.

.. hint:: This Use Case implements statements from the test suite such as:

- Return the current CPU usage of CPE.

Parameters:

Name Type Description Default

board

CPE

CPE device instance

required

Returns:

Type Description
float

current CPU usage of the CPE

Source code in boardfarm3/use_cases/cpe.py
25
26
27
28
29
30
31
32
33
34
35
36
37
def get_cpu_usage(board: CPE) -> float:
    """Return the current CPU usage of CPE.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Return the current CPU usage of CPE.

    :param board: CPE device instance
    :type board: CPE
    :return: current CPU usage of the CPE
    :rtype: float
    """
    return board.sw.get_load_avg()

get_memory_usage

get_memory_usage(board: CPE) -> dict[str, int]

Return the memory usage of CPE.

.. hint:: This Use Case implements statements from the test suite such as:

- Return the memory usage of CPE.

Parameters:

Name Type Description Default

board

CPE

CPE device instance

required

Returns:

Type Description
dict[str, int]

current memory utilization of the CPE

Source code in boardfarm3/use_cases/cpe.py
40
41
42
43
44
45
46
47
48
49
50
51
52
def get_memory_usage(board: CPE) -> dict[str, int]:
    """Return the memory usage of CPE.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Return the memory usage of CPE.

    :param board: CPE device instance
    :type board: CPE
    :return: current memory utilization of the CPE
    :rtype: dict[str, int]
    """
    return board.sw.get_memory_utilization()

get_seconds_uptime

get_seconds_uptime(board: CPE) -> float

Return board uptime in seconds.

Parameters:

Name Type Description Default

board

CPE

The board instance

required

Returns:

Type Description
float

board uptime in seconds

Source code in boardfarm3/use_cases/cpe.py
197
198
199
200
201
202
203
204
205
def get_seconds_uptime(board: CPE) -> float:
    """Return board uptime in seconds.

    :param board: The board instance
    :type board: CPE
    :return: board uptime in seconds
    :rtype: float
    """
    return board.sw.get_seconds_uptime()

is_ntp_synchronized

is_ntp_synchronized(board: CPE) -> bool

Get the NTP synchronization status.

Sample block of the output

.. code-block:: python

[
    {
        "remote": "2001:dead:beef:",
        "refid": ".XFAC.",
        "st": 16,
        "t": "u",
        "when": 65,
        "poll": 18,
        "reach": 0,
        "delay": 0.0,
        "offset": 0.0,
        "jitter": 0.0,
        "state": "*",
    }
]

This Use Case validates the 'state' from the parsed output and returns bool based on the value present in it. The meaning of the indicators are given below,

'*' - synchronized candidate '#' - selected but not synchronized '+' - candidate to be selected [x/-/ /./None] - discarded candidate

Parameters:

Name Type Description Default

board

CPE

CPE device instance

required

Returns:

Type Description
bool

True if NTP is synchronized, false otherwise

Raises:

Type Description
ValueError

when the output has more than one list item

Source code in boardfarm3/use_cases/cpe.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def is_ntp_synchronized(board: CPE) -> bool:
    """Get the NTP synchronization status.

    Sample block of the output

    .. code-block:: python

        [
            {
                "remote": "2001:dead:beef:",
                "refid": ".XFAC.",
                "st": 16,
                "t": "u",
                "when": 65,
                "poll": 18,
                "reach": 0,
                "delay": 0.0,
                "offset": 0.0,
                "jitter": 0.0,
                "state": "*",
            }
        ]

    This Use Case validates the 'state' from the parsed output and returns bool based on
    the value present in it. The meaning of the indicators are given below,

    '*' - synchronized candidate
    '#' - selected but not synchronized
    '+' - candidate to be selected
    [x/-/ /./None] - discarded candidate

    :param board: CPE device instance
    :type board: CPE
    :raises ValueError: when the output has more than one list item
    :return: True if NTP is synchronized, false otherwise
    :rtype: bool
    """
    ntp_output = board.sw.get_ntp_sync_status()
    if len(ntp_output) == 0:
        msg = "No NTP server available to the device"
        raise ValueError(msg)
    if len(ntp_output) > _TOO_MANY_NTPS:
        msg = "Unclear NTP status. There is more than one NTP server present"
        raise ValueError(msg)
    return ntp_output[0]["state"] == "*"

is_tr069_agent_running

is_tr069_agent_running(board: CPE) -> bool

Check if TR069 agent is running or not.

Parameters:

Name Type Description Default

board

CPE

The board instance

required

Returns:

Type Description
bool

True if agent is running, false otherwise

Source code in boardfarm3/use_cases/cpe.py
208
209
210
211
212
213
214
215
216
def is_tr069_agent_running(board: CPE) -> bool:
    """Check if TR069 agent is running or not.

    :param board: The board instance
    :type board: CPE
    :return: True if agent is running, false otherwise
    :rtype: bool
    """
    return board.sw.is_tr069_connected()

read_tcpdump

read_tcpdump(
    fname: str, board: CPE, protocol: str = "", opts: str = "", rm_pcap: bool = True
) -> str

Read the tcpdump packets and delete the capture file afterwards.

Parameters:

Name Type Description Default

fname

str

filename or the complete path of the pcap file

required

board

CPE

CPE device instance

required

protocol

str

protocol to filter, defaults to ""

''

opts

str

defaults to ""

''

rm_pcap

bool

defaults to True

True

Returns:

Type Description
str

output of tcpdump read command

Source code in boardfarm3/use_cases/cpe.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def read_tcpdump(
    fname: str,
    board: CPE,
    protocol: str = "",
    opts: str = "",
    rm_pcap: bool = True,
) -> str:
    """Read the tcpdump packets and delete the capture file afterwards.

    :param fname: filename or the complete path of the pcap file
    :type fname: str
    :param board: CPE device instance
    :type board: CPE
    :param protocol: protocol to filter, defaults to ""
    :type protocol: str
    :param opts: defaults to ""
    :type opts: str
    :param rm_pcap: defaults to True
    :type rm_pcap: bool
    :return: output of tcpdump read command
    :rtype: str
    """
    return board.sw.nw_utility.read_tcpdump(
        fname,
        protocol=protocol,
        opts=opts,
        rm_pcap=rm_pcap,
    )

tcpdump

tcpdump(
    fname: str, interface: str, board: CPE, filters: dict | None = None
) -> Generator[str]

Contextmanager to perform tcpdump on the board.

Start tcpdump on the board console and kill it outside its scope

:yield: yields the process id of the tcp capture started

Parameters:

Name Type Description Default

fname

str

the filename or the complete path of the resource

required

interface

str

interface name on which the tcp traffic will listen to

required

board

CPE

CPE device instance

required

filters

dict | None

filters as key value pair(eg: {"-v": "", "-c": "4"})

None
Source code in boardfarm3/use_cases/cpe.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
@contextmanager
def tcpdump(
    fname: str,
    interface: str,
    board: CPE,
    filters: dict | None = None,
) -> Generator[str]:
    """Contextmanager to perform tcpdump on the board.

    Start ``tcpdump`` on the board console and kill it outside its scope

    :param fname: the filename or the complete path of the resource
    :type fname: str
    :param interface: interface name on which the tcp traffic will listen to
    :type interface: str
    :param board: CPE device instance
    :type board: CPE
    :param filters: filters as key value pair(eg: {"-v": "", "-c": "4"})
    :type filters: dict | None
    :yield: yields the process id of the tcp capture started
    :rtype: Generator[str, None, None]
    """
    pid: str = ""
    try:
        pid = board.sw.nw_utility.start_tcpdump(fname, interface, filters=filters)
        yield pid
    finally:
        board.sw.nw_utility.stop_tcpdump(pid)

transfer_file_via_scp

transfer_file_via_scp(
    source_dev: CPE,
    source_file: str,
    dest_file: str,
    dest_host: LAN | WAN,
    action: Literal["download", "upload"],
    port: int | str = 22,
    ipv6: bool = False,
) -> None

Copy files and directories between the board and the remote host.

Copy is made over SSH.

Parameters:

Name Type Description Default

source_dev

CPE

CPE device instance

required

source_file

str

path on the board

required

dest_file

str

path on the remote host

required

dest_host

LAN | WAN

the remote host instance

required

port

int | str

host port

22

action

Literal['download', 'upload']

scp action to perform i.e upload, download

required

ipv6

bool

whether scp should be done to IPv4 or IPv6, defaults to IPv4

False
Source code in boardfarm3/use_cases/cpe.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def transfer_file_via_scp(  # pylint: disable=protected-access  # noqa: PLR0913
    source_dev: CPE,
    source_file: str,
    dest_file: str,
    dest_host: LAN | WAN,
    action: Literal["download", "upload"],
    port: int | str = 22,
    ipv6: bool = False,
) -> None:
    """Copy files and directories between the board and the remote host.

    Copy is made over SSH.

    :param source_dev: CPE device instance
    :type source_dev: CPE
    :param source_file: path on the board
    :type source_file: str
    :param dest_file: path on the remote host
    :type dest_file: str
    :param dest_host: the remote host instance
    :type dest_host: LAN | WAN
    :param port: host port
    :type port: int | str
    :param action: scp action to perform i.e upload, download
    :type action: Literal["download", "upload"]
    :param port: host port, defaults to 22
    :type port: str
    :param ipv6: whether scp should be done to IPv4 or IPv6, defaults to IPv4
    :type ipv6: bool
    """
    (src, dst) = (
        (source_file, dest_file) if action == "upload" else (dest_file, source_file)
    )
    # TODO: private members should not be used, BOARDFARM-5040
    username = dest_host._username  # type: ignore[union-attr]  # noqa: SLF001
    password = dest_host._password  # type: ignore[union-attr]  # noqa: SLF001
    ip_addr = (
        dest_host.get_interface_ipv6addr(dest_host.iface_dut)
        if ipv6
        else dest_host.get_interface_ipv4addr(dest_host.iface_dut)
    )
    source_dev.sw.nw_utility.scp(ip_addr, port, username, password, src, dst, action)

upload_file_to_tftp

upload_file_to_tftp(
    source_dev: CPE,
    source_file: str,
    tftp_server: LAN | WAN,
    path_on_tftpserver: str,
    ipv6: bool = False,
    timeout: int = 60,
) -> None

Transfer file onto tftp server.

.. hint:: This Use Case helps to copy files from board to tftp servre

- can be used after a tcpdump on board

Parameters:

Name Type Description Default

source_dev

CPE

CPE device instance

required

source_file

str

Path on the board

required

tftp_server

LAN | WAN

the remote tftp server instance

required

path_on_tftpserver

str

Path on the tftp server

required

ipv6

bool

if scp should be done to ipv4 or ipv6, defaults to ipv4

False

timeout

int

timeout value for the usecase

60

Raises:

Type Description
UseCaseFailure

when file not found

Source code in boardfarm3/use_cases/cpe.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def upload_file_to_tftp(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_dev: CPE,
    source_file: str,
    tftp_server: LAN | WAN,
    path_on_tftpserver: str,
    ipv6: bool = False,
    timeout: int = 60,
) -> None:
    """Transfer file onto tftp server.

    .. hint:: This Use Case helps to copy files from board to tftp servre

        - can be used after a tcpdump on board

    :param source_dev: CPE device instance
    :type source_dev: CPE
    :param source_file: Path on the board
    :type source_file: str
    :param tftp_server: the remote tftp server instance
    :type tftp_server: LAN | WAN
    :param path_on_tftpserver: Path on the tftp server
    :type path_on_tftpserver: str
    :param ipv6: if scp should be done to ipv4 or ipv6, defaults to ipv4
    :type ipv6: bool
    :param timeout: timeout value for the usecase
    :type timeout: int
    :raises UseCaseFailure: when file not found
    """
    serv_tftp_folder = "/tftpboot"
    server_ip_addr = (
        tftp_server.get_interface_ipv6addr(tftp_server.iface_dut)
        if ipv6
        else tftp_server.get_interface_ipv4addr(tftp_server.iface_dut)
    )
    _, filename = os.path.split(source_file)
    file_location_on_server = f"{serv_tftp_folder}/{filename}"
    tftp_server.console.execute_command(
        f"chmod 777 {serv_tftp_folder}", timeout=timeout
    )
    source_dev.sw.nw_utility.tftp(
        server_ip_addr, source_file, filename, timeout=timeout
    )
    # move file to given tftp location and perform check of transfer
    mv_command = f"mv {file_location_on_server} {path_on_tftpserver}"
    output = tftp_server.console.execute_command(mv_command, timeout=timeout)
    if "No such file or directory" in output:
        msg = f"file not found {output}"
        raise UseCaseFailure(msg)

device_getters

Device getters use cases.

Functions:

Name Description
device_getter

Provide device of type 'device_type'.

get_lan_clients

Return a list of LAN clients based on given count.

get_wan_clients

Return a list of WAN clients based on given count.

device_getter

device_getter(device_type: type[T]) -> T

Provide device of type 'device_type'.

Parameters:

Name Type Description Default

device_type

type[T]

Type of device to get

required

Returns:

Type Description
T

Instance of device

Raises:

Type Description
ValueError

if no device of given type is available or if more than 1 device of given type is available

Source code in boardfarm3/use_cases/device_getters.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def device_getter(device_type: type[T]) -> T:
    """Provide device of type 'device_type'.

    :param device_type: Type of device to get
    :return: Instance of device
    :raises ValueError: if no device of given type is available or
        if more than 1 device of given type is available
    """
    devs = get_device_manager().get_devices_by_type(device_type)

    if len(devs) < 1:
        msg = f"There are no {device_type} devices available"
        raise ValueError(msg)
    if len(devs) > 1:
        msg = f"More than 1 {device_type} devices found"
        raise ValueError(msg)
    return devs[next(iter(devs))]

get_lan_clients

get_lan_clients(count: int) -> list[LAN]

Return a list of LAN clients based on given count.

Parameters:

Name Type Description Default

count

int

number of LAN clients

required

Returns:

Type Description
List[LAN]

list of LAN clients

Raises:

Type Description
DeviceNotFound

if count of LAN devices is invalid

Source code in boardfarm3/use_cases/device_getters.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def get_lan_clients(count: int) -> list[LAN]:
    """Return a list of LAN clients based on given count.

    :param count: number of LAN clients
    :type count: int
    :return: list of LAN clients
    :rtype: List[LAN]
    :raises DeviceNotFound: if count of LAN devices is invalid
    """
    lan_devices = get_device_manager().get_devices_by_type(
        LAN,  # type: ignore[type-abstract]
    )
    if not 0 < count <= len(lan_devices):
        msg = f"Invalid count provided. Only {len(lan_devices)} LAN clients available"
        raise DeviceNotFound(msg)
    return list(lan_devices.values())[:count]

get_wan_clients

get_wan_clients(count: int) -> list[WAN]

Return a list of WAN clients based on given count.

Parameters:

Name Type Description Default

count

int

number of WAN clients

required

Returns:

Type Description
List[WAN]

list of WAN clients

Raises:

Type Description
DeviceNotFound

if count of WAN devices is invalid

Source code in boardfarm3/use_cases/device_getters.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def get_wan_clients(count: int) -> list[WAN]:
    """Return a list of WAN clients based on given count.

    :param count: number of WAN clients
    :type count: int
    :return: list of WAN clients
    :rtype: List[WAN]
    :raises DeviceNotFound: if count of WAN devices is invalid
    """
    wan_devices = get_device_manager().get_devices_by_type(
        WAN,  # type: ignore[type-abstract]
    )
    if not 0 < count <= len(wan_devices):
        msg = f"Invalid count provided. Only {len(wan_devices)} WAN clients available"
        raise DeviceNotFound(msg)
    return list(wan_devices.values())[:count]

device_utilities

Miscellaneous Use Cases to interact with devices.

General tasks such as reading and setting device's date and time.

Functions:

Name Description
get_device_date

Get the device's date and time.

set_device_date

Set the dut date and time from device console.

get_device_date

get_device_date(device: LAN | WAN | WLAN | CPE) -> str | None

Get the device's date and time.

.. code-block:: python

# example output
"Friday, May 24, 2024 10:43:11"

Parameters:

Name Type Description Default

device

LAN | WAN | WLAN | CPE

device from which the date and time needs to be fetched

required

Returns:

Type Description
str | None

date from device console

Source code in boardfarm3/use_cases/device_utilities.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def get_device_date(device: LAN | WAN | WLAN | CPE) -> str | None:
    """Get the device's date and time.

    .. code-block:: python

        # example output
        "Friday, May 24, 2024 10:43:11"

    :param device: device from which the date and time needs to be fetched
    :type device: Union[DebianLAN, DebianWAN, DebianWifi, BoardTemplate]
    :return: date from device console
    :rtype: str | None
    """
    if isinstance(device, CPE):
        return device.sw.get_date()
    return device.get_date()

set_device_date

set_device_date(device: LAN | WAN | WLAN | CPE, date: str) -> None

Set the dut date and time from device console.

Parameters:

Name Type Description Default

device

LAN | WAN | WLAN | CPE

device on which the date and time needs to be set

required

date

str

value to be changed eg: Tue Dec 20 2022, 12:40:23 UTC, etc

required

Raises:

Type Description
UseCaseFailure

fails the usecase if the date is not set properly

Source code in boardfarm3/use_cases/device_utilities.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def set_device_date(device: LAN | WAN | WLAN | CPE, date: str) -> None:
    """Set the dut date and time from device console.

    :param device: device on which the date and time needs to be set
    :type device: Union[DebianLAN, DebianWAN, DebianWifi, BoardTemplate]
    :param date: value to be changed eg: Tue Dec 20 2022, 12:40:23 UTC, etc
    :type date: str
    :raises UseCaseFailure: fails the usecase if the date is not set properly
    :rtype: str
    """
    if isinstance(device, CPE):
        out = device.sw.set_date(date)
    else:
        out = device.set_date("-s", date)
    if not out:
        msg = f"Can't set the date '{date}' on '{device}'"
        raise UseCaseFailure(msg)

dhcp

Boardfarm LGI DHCP IPv4 Use Cases.

Classes:

Name Description
DHCPTraceData

Provides a DHCPTraceData data class.

Functions:

Name Description
configure_dhcp_inform

Configure dhclient.conf to send DHCPINFORM messages.

configure_dhcp_option125

Configure device's vendor-specific suboptions in DHCP option 125.

dhcp_renew_ipv4

Release and renew IPv4 in the device and return IPv4.

dhcp_renew_stateful_ipv6

Release and renew stateful IPv6 in the device and return IPv6.

dhcp_renew_stateless_ipv6

Release and renew stateless IPv6 in the device and return IPv6.

get_all_dhcp_options

Get all the DHCP options in a DHCP packet.

get_all_dhcpv6_options

Get all the DHCPv6 options in a DHCPv6 packet.

get_dhcp_option_details

Get all required option details when option is provided.

get_dhcp_packet_by_message

Get the DHCP packets for the particular message from the pcap file.

get_dhcp_suboption_details

Get all required sub option details when option & sub option are provided.

get_dhcpv6_packet_by_message

Get the DHCPv6 packets for the particular message from the pcap file.

parse_dhcp_trace

Read and filter the DHCP packets from the pcap file and returns the DHCP packets.

parse_dhcpv6_trace

Read and filter the DHCPv6 packets from the pcap file.

remove_dhcp_inform_config

Remove the DHCPINFORM related configuration on dhclient.conf.

remove_dhcp_option125

Remove the information in DHCP option 125.

DHCPTraceData dataclass

DHCPTraceData(
    source: IPAddresses,
    destination: IPAddresses,
    dhcp_packet: RecursiveDict,
    dhcp_message_type: int,
)

Provides a DHCPTraceData data class.

Holds source, destination, dhcp_packet and dhcp_message_type.

configure_dhcp_inform

configure_dhcp_inform(client: LAN | WAN) -> None

Configure dhclient.conf to send DHCPINFORM messages.

Parameters:

Name Type Description Default

client

LAN | WAN

Device where dhclient.conf needs to be configured for DHCPINFORM,=,

required
Source code in boardfarm3/use_cases/dhcp.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def configure_dhcp_inform(client: LAN | WAN) -> None:
    """Configure dhclient.conf to send DHCPINFORM messages.

    :param client: Device where dhclient.conf needs to be configured for DHCPINFORM,=,
    :type client: LAN | WAN
    """
    msg_type = "send dhcp-message-type 8;"
    out = client.console.execute_command(f"egrep '{msg_type}' /etc/dhcp/dhclient.conf")
    if not re.search(msg_type, out):
        # following statement is unreachable, according to mypy
        # however if you try with out = "" you can execute this branch
        # pylint:disable-next=line-too-long
        client.console.execute_command("cat>>/etc/dhcp/dhclient.conf<<EOF")  # type:ignore[unreachable]
        client.console.execute_command(msg_type)
        client.console.execute_command("EOF")

configure_dhcp_option125

configure_dhcp_option125(client: LAN | WAN) -> None

Configure device's vendor-specific suboptions in DHCP option 125.

This function modifies the device's dhclient.conf.

Parameters:

Name Type Description Default

client

LAN | WAN

Linux device to be configured.

required
Source code in boardfarm3/use_cases/dhcp.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def configure_dhcp_option125(client: LAN | WAN) -> None:
    """Configure device's vendor-specific suboptions in DHCP option 125.

    This function modifies the device's `dhclient.conf`.

    :param client: Linux device to be configured.
    :type client: LAN | WAN
    """
    # this is an copy/paste of BFv2 boardfarm/lib/dhcpoption.py
    out = client.console.execute_command(
        "egrep 'request option-125' /etc/dhcp/dhclient.conf"
    )
    if not re.search("request option-125,", out):
        # unreachable seems to be a false positive by MyPy
        client.console.execute_command(  # type:ignore[unreachable]
            "sed -i -e "
            "'s|request |\\noption option-125 code 125 = string;\\n\\nrequest option-125, |' "
            "/etc/dhcp/dhclient.conf"
        )
        # details of Text for HexaDecimal value as
        # Enterprise code (3561) 00:00:0D:E9 length  (22)16
        # code 01  length 06  (BFVER0) 42:46:56:45:52:30
        # code 03  length 06  (BFCLAN)  42:46:43:4c:41:4e
        mac = client.get_interface_macaddr(client.iface_dut)
        value = "VAAU" + "".join(mac.split(":")[0:4]).upper()
        encoded_name = str.encode(value)
        hex_name = iter(binascii.hexlify(encoded_name).decode("utf-8"))
        code_02 = ":".join([f"{j}{k}" for j, k in zip(hex_name, hex_name)])
        len_02 = hex(len(value)).replace("0x", "").zfill(2)
        total_len = hex(18 + len(value)).replace("0x", "").zfill(2)
        option_125 = (
            f"00:00:0D:E9:{total_len}:01:06:44:38:42:36:42:37:02:"
            f"{len_02}:{code_02}:03:06:42:46:43:4c:41:4e"
        )
        client.execute_command("cat >> /etc/dhcp/dhclient.conf << EOF")
        client.execute_command(f"send option-125 = {option_125};")
        client.execute_command("")
        client.execute_command("EOF")

dhcp_renew_ipv4

dhcp_renew_ipv4(host: LAN | WLAN) -> IPv4Address

Release and renew IPv4 in the device and return IPv4.

.. hint:: This Use Case implements statements from the test suite such as:

- Trigger DHCP DISCOVER for the LAN Client IPv4 acquisition
- Verify the IP acquisition on LAN devices
- Check if the LAN Client connected to CPE obtains both IPv4 and IPv6 address

Parameters:

Name Type Description Default

host

LAN | WLAN

host where the IP has to be renewed

required

Returns:

Type Description
IPv4Address

IPv4 address of the device

Source code in boardfarm3/use_cases/dhcp.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def dhcp_renew_ipv4(host: LAN | WLAN) -> IPv4Address:
    """Release and renew IPv4 in the device and return IPv4.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Trigger DHCP DISCOVER for the LAN Client IPv4 acquisition
        - Verify the IP acquisition on LAN devices
        - Check if the LAN Client connected to CPE obtains both IPv4 and IPv6 address

    :param host: host where the IP has to be renewed
    :type host: LAN | WLAN
    :return: IPv4 address of the device
    :rtype: IPv4Address
    """
    host.release_dhcp(host.iface_dut)
    host.renew_dhcp(host.iface_dut)
    return IPv4Address(host.get_interface_ipv4addr(host.iface_dut))

dhcp_renew_stateful_ipv6

dhcp_renew_stateful_ipv6(host: LAN | WLAN) -> IPv6Address

Release and renew stateful IPv6 in the device and return IPv6.

.. hint:: This Use Case implements statements from the test suite such as:

- Initiate the IPv6 acquisition from LAN Ethernet client
- Initiate the IPv6 process from LAN Ethernet client
- Release and renew IPv6 address on LAN client

Parameters:

Name Type Description Default

host

LAN | WLAN

host where the IP has to be renewed

required

Returns:

Type Description
IPv6Address

IPv6 address of the device

Source code in boardfarm3/use_cases/dhcp.py
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
def dhcp_renew_stateful_ipv6(host: LAN | WLAN) -> IPv6Address:
    """Release and renew stateful IPv6 in the device and return IPv6.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Initiate the IPv6 acquisition from LAN Ethernet client
        - Initiate the IPv6 process from LAN Ethernet client
        - Release and renew IPv6 address on LAN client

    :param host: host where the IP has to be renewed
    :type host: LAN | WLAN
    :return: IPv6 address of the device
    :rtype: IPv6Address
    """
    host.release_ipv6(host.iface_dut)
    host.renew_ipv6(host.iface_dut)
    return IPv6Address(host.get_interface_ipv6addr(host.iface_dut))

dhcp_renew_stateless_ipv6

dhcp_renew_stateless_ipv6(host: LAN | WLAN) -> IPv6Address

Release and renew stateless IPv6 in the device and return IPv6.

.. hint:: This Use Case implements statements from the test suite such as:

- Initiate the IPv6 stateless acquisition from LAN Ethernet client
- Initiate the IPv6 stateless  process from LAN Ethernet client
- Release and renew stateless IPv6 address on LAN client

Parameters:

Name Type Description Default

host

LAN | WLAN

host where the IP has to be renewed

required

Returns:

Type Description
IPv6Address

IPv6 address of the device

Source code in boardfarm3/use_cases/dhcp.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
def dhcp_renew_stateless_ipv6(host: LAN | WLAN) -> IPv6Address:
    """Release and renew stateless IPv6 in the device and return IPv6.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Initiate the IPv6 stateless acquisition from LAN Ethernet client
        - Initiate the IPv6 stateless  process from LAN Ethernet client
        - Release and renew stateless IPv6 address on LAN client

    :param host: host where the IP has to be renewed
    :type host: LAN | WLAN
    :return: IPv6 address of the device
    :rtype: IPv6Address
    """
    host.release_ipv6(host.iface_dut, stateless=True)
    host.set_link_state(host.iface_dut, "down")
    host.set_link_state(host.iface_dut, "up")
    host.renew_ipv6(host.iface_dut, stateless=True)
    sleep(10)
    return IPv6Address(host.get_interface_ipv6addr(host.iface_dut))

get_all_dhcp_options

get_all_dhcp_options(packet: DHCPTraceData) -> RecursiveDict

Get all the DHCP options in a DHCP packet.

Parameters:

Name Type Description Default

packet

DHCPTraceData

desired packet from DHCP trace (only one packet)

required

Returns:

Type Description
RecursiveDict

all the DHCP options

Source code in boardfarm3/use_cases/dhcp.py
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_all_dhcp_options(packet: DHCPTraceData) -> RecursiveDict:
    """Get all the DHCP options in a DHCP packet.

    :param packet: desired packet from DHCP trace (only one packet)
    :type packet: DHCPTraceData
    :return: all the DHCP options
    :rtype: RecursiveDict
    """
    return {
        key: value
        for key, value in packet.dhcp_packet.items()
        if "dhcp.option.type" in key
    }

get_all_dhcpv6_options

get_all_dhcpv6_options(packet: DHCPV6TraceData) -> DHCPV6Options

Get all the DHCPv6 options in a DHCPv6 packet.

.. hint:: This Use Case implements statements from the test suite such as:

- DHCPv6 includes the [] option

Parameters:

Name Type Description Default

packet

DHCPV6TraceData

desired packet from DHCPv6 trace (only one packet)

required

Returns:

Type Description
DHCPV6Options

all the DHCPv6 options

Source code in boardfarm3/use_cases/dhcp.py
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
def get_all_dhcpv6_options(packet: DHCPV6TraceData) -> DHCPV6Options:
    """Get all the DHCPv6 options in a DHCPv6 packet.

    .. hint:: This Use Case implements statements from the test suite such as:

        - DHCPv6 includes the [] option


    :param packet: desired packet from DHCPv6 trace (only one packet)
    :type packet: DHCPV6TraceData
    :return: all the DHCPv6 options
    :rtype: DHCPV6Options
    """
    if packet.dhcpv6_message_type in [12, 13]:
        out = dict(packet.dhcpv6_packet["Relay Message"]["DHCPv6"].items())
    else:
        out = dict(packet.dhcpv6_packet.items())
    return DHCPV6Options(out)

get_dhcp_option_details

get_dhcp_option_details(packet: DHCPTraceData, option: int) -> RecursiveDict

Get all required option details when option is provided.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify packet capture has option
- Verify [] is present in DHCP [] message
- Verify all the Mandatory_Fields are available in DHCP message

Parameters:

Name Type Description Default

packet

DHCPTraceData

the packet data structure

required

option

int

DHCP option

required

Returns:

Type Description
RecursiveDict

option Dict along with suboptions

Raises:

Type Description
UseCaseFailure

on failing to find the option

Source code in boardfarm3/use_cases/dhcp.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def get_dhcp_option_details(packet: DHCPTraceData, option: int) -> RecursiveDict:
    """Get all required option details when option is provided.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify packet capture has option
        - Verify [] is present in DHCP [] message
        - Verify all the Mandatory_Fields are available in DHCP message

    :param packet: the packet data structure
    :type packet: DHCPTraceData
    :param option: DHCP option
    :type option: int
    :raises UseCaseFailure: on failing to find the option
    :return: option Dict along with suboptions
    :rtype: RecursiveDict
    """
    option_data = get_all_dhcp_options(packet)
    # pylint: disable=too-many-nested-blocks
    try:
        for key, value in option_data.items():
            if value == str(option):
                if re.search(r"_\d", key) is None:
                    out = option_data[key + "_tree"]
                else:
                    out = option_data[
                        key.split("_")[0]
                        + "_tree_"
                        + key.split("_")[len(key.split("_")) - 1]
                    ]
                    break
    except KeyError as exception:
        msg = f"Failed to find option {option!s}"
        raise UseCaseFailure(msg) from exception
    return out

get_dhcp_packet_by_message

get_dhcp_packet_by_message(
    trace: list[DHCPTraceData], message_type: str
) -> list[DHCPTraceData]

Get the DHCP packets for the particular message from the pcap file.

.. hint:: This Use Case implements statements from the test suite such as:

- Following messages are exchanged
- Discover, Offer, Request and Ack messages
- DHCP messages are exchanged

Parameters:

Name Type Description Default

trace

list[DHCPTraceData]

sequence of DHCP packets filtered from captured pcap file and stored in DHCPTraceData

required

message_type

str

DHCP message according to RFC2132 and could be any of: * DHCPDISCOVER, * DHCPOFFER, * DHCPREQUEST, * DHCPDECLINE, * DHCPACK, * DHCPACK, * DHCPRELEASE, * DHCPINFORM

required

Returns:

Type Description
List[DHCPTraceData]

Sequence of DHCP packets filtered with the message type

Source code in boardfarm3/use_cases/dhcp.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_dhcp_packet_by_message(
    trace: list[DHCPTraceData],
    message_type: str,
) -> list[DHCPTraceData]:
    """Get the DHCP packets for the particular message from the pcap file.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Following messages are exchanged
        - Discover, Offer, Request and Ack messages
        - DHCP messages are exchanged

    :param trace: sequence of DHCP packets filtered from captured pcap file
                  and stored in DHCPTraceData
    :type trace: List[DHCPTraceData]
    :param message_type: DHCP message according to RFC2132 and could be any of:

        * DHCPDISCOVER,
        * DHCPOFFER,
        * DHCPREQUEST,
        * DHCPDECLINE,
        * DHCPACK,
        * DHCPACK,
        * DHCPRELEASE,
        * DHCPINFORM

    :type message_type: str
    :return: Sequence of DHCP packets filtered with the message type
    :rtype: List[DHCPTraceData]
    """
    dhcp_message_dict = {
        "DHCPDISCOVER": 1,
        "DHCPOFFER": 2,
        "DHCPREQUEST": 3,
        "DHCPDECLINE": 4,
        "DHCPACK": 5,
        "DHCPNAK": 6,
        "DHCPRELEASE": 7,
        "DHCPINFORM": 8,
    }
    return list[DHCPTraceData](
        [
            packet
            for packet in trace
            if dhcp_message_dict.get(message_type) == packet.dhcp_message_type
        ],
    )

get_dhcp_suboption_details

get_dhcp_suboption_details(
    packet: DHCPTraceData, option: int, suboption: int
) -> RecursiveDict

Get all required sub option details when option & sub option are provided.

.. hint:: This Use Case implements statements from the test suite such as:

- DHCP option [] suboptions
- Verify [] suboptions are present in DHCP

Parameters:

Name Type Description Default

packet

DHCPTraceData

the packet data structure

required

option

int

DHCP option

required

suboption

int

DHCP sub option

required

Returns:

Type Description
RecursiveDict

suboption dictionary

Raises:

Type Description
UseCaseFailure

on failing to find the suboption

Source code in boardfarm3/use_cases/dhcp.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def get_dhcp_suboption_details(
    packet: DHCPTraceData,
    option: int,
    suboption: int,
) -> RecursiveDict:
    """Get all required sub option details when option & sub option are provided.

    .. hint:: This Use Case implements statements from the test suite such as:

        - DHCP option [] suboptions
        - Verify [] suboptions are present in DHCP

    :param packet: the packet data structure
    :type packet: DHCPTraceData
    :param option: DHCP option
    :type option: int
    :param suboption: DHCP sub option
    :type suboption: int
    :raises UseCaseFailure: on failing to find the suboption
    :return: suboption dictionary
    :rtype: RecursiveDict
    """
    option_key_dict = {125: "dhcp.option.vi.enterprise_tree"}
    sub_option_data = get_dhcp_option_details(packet, option)
    out = {}
    if option in option_key_dict:
        sub_options = sub_option_data[option_key_dict[option]]
    else:
        sub_options = sub_option_data
    # pylint: disable=too-many-nested-blocks
    try:
        for key, value in sub_options.items():
            if "suboption" in key and value == str(suboption):
                if re.search(r"_\d", key) is None:
                    out = sub_options[key + "_tree"]
                else:
                    out = sub_options[
                        key.split("_")[0]
                        + "_tree_"
                        + key.split("_")[len(key.split("_")) - 1]
                    ]
                    break
        if not out:
            msg = (
                f"Failed to fetch suboption {suboption} for option {option} in"
                f" \n{sub_options}"
            )
            raise UseCaseFailure(msg)
    except KeyError as exception:
        msg = f"Failed to find suboption {suboption!s} "
        raise UseCaseFailure(msg) from exception
    return out

get_dhcpv6_packet_by_message

get_dhcpv6_packet_by_message(
    trace: list[DHCPV6TraceData], message_type: str
) -> list[DHCPV6TraceData]

Get the DHCPv6 packets for the particular message from the pcap file.

.. hint:: This Use Case implements statements from the test suite such as:

- Following messages are exchanged DHCPv6
- Discover, Offer, Request and Ack DHCPv6 messages
- DHCPv6 messages are exchanged

Parameters:

Name Type Description Default

trace

list[DHCPV6TraceData]

sequence of DHCPv6 packets filtered from captured pcap file and stored in DHCPV6TraceData

required

message_type

str

DHCP message according to RFC3315 and could be any of: * SOLICIT, * ADVERTISE, * REQUEST, * CONFIRM, * RENEW, * REBIND, * REPLY, * RELEASE, * DECLINE, * RECONFIGURE, * INFORMATION-REQUEST, * RELAY-FORW, * RELAY-REPL

required

Returns:

Type Description
List[DHCPV6TraceData]

Sequence of DHCPv6 packets filtered with the message type

Source code in boardfarm3/use_cases/dhcp.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
def get_dhcpv6_packet_by_message(
    trace: list[DHCPV6TraceData],
    message_type: str,
) -> list[DHCPV6TraceData]:
    """Get the DHCPv6 packets for the particular message from the pcap file.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Following messages are exchanged DHCPv6
        - Discover, Offer, Request and Ack DHCPv6 messages
        - DHCPv6 messages are exchanged

    :param trace: sequence of DHCPv6 packets filtered from captured pcap file
                  and stored in DHCPV6TraceData
    :type trace: List[DHCPV6TraceData]
    :param message_type: DHCP message according to RFC3315 and could be any of:

        * SOLICIT,
        * ADVERTISE,
        * REQUEST,
        * CONFIRM,
        * RENEW,
        * REBIND,
        * REPLY,
        * RELEASE,
        * DECLINE,
        * RECONFIGURE,
        * INFORMATION-REQUEST,
        * RELAY-FORW,
        * RELAY-REPL

    :type message_type: str
    :return: Sequence of DHCPv6 packets filtered with the message type
    :rtype: List[DHCPV6TraceData]
    """
    dhcpv6_message_dict = {
        "SOLICIT": 1,
        "ADVERTISE": 2,
        "REQUEST": 3,
        "CONFIRM": 4,
        "RENEW": 5,
        "REBIND": 6,
        "REPLY": 7,
        "RELEASE": 8,
        "DECLINE": 9,
        "RECONFIGURE": 10,
        "INFORMATION-REQUEST": 11,
        "RELAY-FORW": 12,
        "RELAY-REPLY": 13,
    }
    return [
        packet
        for packet in trace
        if dhcpv6_message_dict.get(message_type) == packet.dhcpv6_message_type
    ]

parse_dhcp_trace

parse_dhcp_trace(
    on_which_device: LAN | WAN | Provisioner | LTS, fname: str, timeout: int = 30
) -> list[DHCPTraceData]

Read and filter the DHCP packets from the pcap file and returns the DHCP packets.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify from the packet capture
- Verify that the following messages are exchanged
- Check that [] messages are exchanged

Parameters:

Name Type Description Default

on_which_device

LAN | WAN | Provisioner | LTS

Object of the device class where tcpdump is captured

required

fname

str

Name of the captured pcap file

required

timeout

int

time out for tshark read to be executed, defaults to 30

30

Returns:

Type Description
List[DHCPTraceData]

Sequence of DHCP packets filtered from captured pcap file

Raises:

Type Description
UseCaseFailure

on DHCP parse issue

Source code in boardfarm3/use_cases/dhcp.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def parse_dhcp_trace(
    on_which_device: LAN | WAN | Provisioner | LTS,
    fname: str,
    timeout: int = 30,
) -> list[DHCPTraceData]:
    """Read and filter the DHCP packets from the pcap file and returns the DHCP packets.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify from the packet capture
        - Verify that the following messages are exchanged
        - Check that [] messages are exchanged

    :param on_which_device: Object of the device class where tcpdump is captured
    :type on_which_device: LAN | WAN | Provisioner | CMTS
    :param fname: Name of the captured pcap file
    :type fname: str
    :param timeout: time out for ``tshark read`` to be executed, defaults to 30
    :type timeout: int
    :raises UseCaseFailure: on DHCP parse issue
    :return: Sequence of DHCP packets filtered from captured pcap file
    :rtype: List[DHCPTraceData]
    """
    try:
        out = on_which_device.tshark_read_pcap(
            fname=fname,
            additional_args="-Y bootp -T json",
            timeout=timeout,
        )
        output: list[DHCPTraceData] = []
        data = "[" + out.split("[", 1)[-1].replace("\r\n", "")

        # replacing bootp to dhcp as the devices still use older tshark versions
        replaced_data = data.replace("bootp", "dhcp")
        decoder = JSONDecoder(
            object_pairs_hook=_manage_duplicates,  # type: ignore [arg-type]
        )
        obj = decoder.decode(replaced_data)
        output = [
            DHCPTraceData(
                IPAddresses(
                    element["_source"]["layers"]["ip"]["ip.src"],
                    None,
                    None,
                ),
                IPAddresses(
                    element["_source"]["layers"]["ip"]["ip.dst"],
                    None,
                    None,
                ),
                element["_source"]["layers"]["dhcp"],
                int(
                    element["_source"]["layers"]["dhcp"]["dhcp.option.type_tree"][
                        "dhcp.option.dhcp"
                    ],
                ),
            )
            for element in obj
        ]
    except Exception as exception:
        msg = f"Failed to parse DHCP packets due to {exception} "
        raise UseCaseFailure(msg) from exception
    return output

parse_dhcpv6_trace

parse_dhcpv6_trace(
    on_which_device: LAN | WAN | Provisioner | LTS,
    fname: str,
    timeout: int = 30,
    additional_args: str = "dhcpv6",
) -> list[DHCPV6TraceData]

Read and filter the DHCPv6 packets from the pcap file.

.. hint:: This Use Case implements statements from the test suite such as:

- Check that the following messages are exchanged [] DHCPv6
- Verify from the packet capture that DHCPv6

Parameters:

Name Type Description Default

on_which_device

LAN | WAN | Provisioner | LTS

Object of the device class where tcpdump is captured

required

fname

str

Name of the captured pcap file

required

timeout

int

time out for tshark command to be executed, defaults to 30

30

additional_args

str

additional arguments for tshark command to display filtered output, defaults to dhcpv6

'dhcpv6'

Returns:

Type Description
List[DHCPV6TraceData]

sequence of DHCPv6 packets filtered from captured pcap file

Raises:

Type Description
UseCaseFailure

on failure to parse DHCPv6 data

Source code in boardfarm3/use_cases/dhcp.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def parse_dhcpv6_trace(
    on_which_device: LAN | WAN | Provisioner | LTS,
    fname: str,
    timeout: int = 30,
    additional_args: str = "dhcpv6",
) -> list[DHCPV6TraceData]:
    """Read and filter the DHCPv6 packets from the pcap file.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Check that the following messages are exchanged [] DHCPv6
        - Verify from the packet capture that DHCPv6

    :param on_which_device: Object of the device class where tcpdump is captured
    :type on_which_device: LAN | WAN | Provisioner | CMTS
    :param fname: Name of the captured pcap file
    :type fname: str
    :param timeout: time out for ``tshark`` command to be executed, defaults to 30
    :type timeout: int
    :param additional_args: additional arguments for tshark command to
                            display filtered output, defaults to dhcpv6
    :type additional_args: str
    :raises UseCaseFailure: on failure to parse DHCPv6 data
    :return: sequence of DHCPv6 packets filtered from captured pcap file
    :rtype: List[DHCPV6TraceData]
    """
    output: list[DHCPV6TraceData] = []
    key_list: list[str] = []
    val_list: list[str | dict] = []
    out = on_which_device.tshark_read_pcap(
        fname=fname, additional_args=f"-Y '{additional_args}' -T json", timeout=timeout
    )
    data = "[" + out.split("[", 1)[-1].replace("\r\n", "")
    try:
        obj = JSONDecoder(
            object_pairs_hook=_manage_duplicates,  # type: ignore [arg-type]
        ).decode(data)
    except JSONDecodeError as exception:
        msg = f"Failed to parse JSON due to {exception} "
        raise UseCaseFailure(msg) from exception
    try:
        for element in obj:
            # condition for mv3 eth packets because the packets are
            # not wrapped under Relay message.
            # the below logic updates the dhcp packet dict to have
            # the consistent format across gateways
            if "Relay Message" not in element["_source"]["layers"]["dhcpv6"]:
                pkt_dict = element["_source"]["layers"]["dhcpv6"]
                dhcp_dict = _parse_options(pkt_dict, key_list, val_list)
                _update_trace_data(output, dhcp_dict, element)
            else:
                _update_trace_data(
                    output, element["_source"]["layers"]["dhcpv6"], element
                )
        return output  # noqa: TRY300

    except KeyError as exception:
        msg = f"Failed due to missing key {exception} in dictionary"
        raise UseCaseFailure(msg) from exception
    except TypeError as exception:
        msg = f"Failed due to type error: {exception}"
        raise UseCaseFailure(msg) from exception

remove_dhcp_inform_config

remove_dhcp_inform_config(client: LAN | WAN) -> None

Remove the DHCPINFORM related configuration on dhclient.conf.

Parameters:

Name Type Description Default

client

LAN | WAN

Device from where the configuration needs to be removed.

required
Source code in boardfarm3/use_cases/dhcp.py
288
289
290
291
292
293
294
295
296
def remove_dhcp_inform_config(client: LAN | WAN) -> None:
    """Remove the DHCPINFORM related configuration on dhclient.conf.

    :param client: Device from where the configuration needs to be removed.
    :type client: LAN | WAN
    """
    client.console.execute_command(
        "sed -i '/dhcp-message-type 8/d' /etc/dhcp/dhclient.conf"
    )

remove_dhcp_option125

remove_dhcp_option125(client: LAN | WAN) -> None

Remove the information in DHCP option 125.

This function modifies the Linux device's dhclient.conf.

Parameters:

Name Type Description Default

client

LAN | WAN

Linux device to be configured.

required
Source code in boardfarm3/use_cases/dhcp.py
339
340
341
342
343
344
345
346
347
348
349
350
351
def remove_dhcp_option125(client: LAN | WAN) -> None:
    """Remove the information in DHCP option 125.

    This function modifies the Linux device's `dhclient.conf`.

    :param client: Linux device to be configured.
    :type client: LAN | WAN
    """
    # this is an copy/paste of BFv2 boardfarm/lib/dhcpoption.py
    client.console.execute_command(
        "sed -i -e 's|request option-125,|request |' /etc/dhcp/dhclient.conf"
    )
    client.console.execute_command("sed -i '/option-125/d' /etc/dhcp/dhclient.conf")

image_comparison

Compare images.

Functions:

Name Description
compare_images

Compare 2 images and return the similarity score.

compare_images

compare_images(
    first_image: Path,
    second_image: Path,
    ignore_areas: list[tuple[int, int, int, int]] | None,
    show_images: bool = False,
) -> dict[str, float64]

Compare 2 images and return the similarity score.

Parameters:

Name Type Description Default

first_image

Path

Usually the original image (usually a png)

required

second_image

Path

The new image (with same size as the first image)

required

ignore_areas

list[tuple[int, int, int, int]] | None

A list of coordinates for the zones (rectangles) to be ignored during the comparison (xtop, ytop, xbottom, ybottom) 1 tuple is one rectangle. (xtop, ytop) |-----------------| | | | | |-----------------|(xbottom,ybottom)

required

show_images

bool

shows the images, NOT TO BE USED in CI, defaults to False

False

Returns:

Type Description
np.float64

the similarity Score of the original images if there is no ignore_areas otherwise returns the masked images similarity score

Source code in boardfarm3/use_cases/image_comparison.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def compare_images(  # noqa: PLR0915  # pylint: disable=too-many-locals,too-many-statements
    first_image: Path,
    second_image: Path,
    ignore_areas: list[tuple[int, int, int, int]] | None,
    show_images: bool = False,
) -> dict[str, np.float64]:
    """Compare 2 images and return the similarity score.

    :param first_image: Usually the original image (usually a png)
    :type first_image: Path
    :param second_image: The new image (with same size as the first image)
    :type second_image: Path
    :param ignore_areas: A list of coordinates for the zones (rectangles) to be
                         ignored during the comparison (xtop, ytop, xbottom, ybottom)
                         1 tuple is one rectangle.

                         (xtop, ytop)
                              |-----------------|
                              |                 |
                              |                 |
                              |-----------------|(xbottom,ybottom)

    :type ignore_areas: list[tuple[int, int, int, int, int]] | None
    :param show_images: shows the images, NOT TO BE USED in CI, defaults to False
    :type show_images: bool
    :return: the similarity Score of the original images if there is no ignore_areas
             otherwise returns the masked images similarity score
    :rtype: np.float64
    """
    masked_score = None
    first = cv2.imread(str(first_image.absolute().resolve()))
    second = cv2.imread(str(second_image.absolute().resolve()))
    first_masked = None
    second_masked = None

    # this needed for the structural_similarity comparison
    first_gray = cv2.cvtColor(first, cv2.COLOR_BGR2GRAY)
    second_gray = cv2.cvtColor(second, cv2.COLOR_BGR2GRAY)

    # this is the % of similarity between the 2 original images (i.e. unmasked)
    # NB: the images MUST BE of the SAME SIZE!!!!
    unmasked_score, unmasked_diff = structural_similarity(  # type: ignore[no-untyped-call]
        first_gray,
        second_gray,
        full=True,
    )
    msg = f"Unmasked Similarity Score: {unmasked_score * 100:.3f}%"
    _LOGGER.info(msg)
    diff = unmasked_diff = (unmasked_diff * 255).astype("uint8")

    # same as above but on the masked images (if any areas are to be ignored)
    if ignore_areas:
        # these are the same images with the areas to be ignored blacked out
        first_masked = _build_mask(first, ignore_areas)
        second_masked = _build_mask(second, ignore_areas)
        masked_score, masked_diff = structural_similarity(  # type: ignore[no-untyped-call]
            cv2.cvtColor(first_masked, cv2.COLOR_BGR2GRAY),
            cv2.cvtColor(second_masked, cv2.COLOR_BGR2GRAY),
            full=True,
        )
        msg = f"Masked Similarity Score: {masked_score * 100:.3f}%"
        _LOGGER.info(msg)

        # The diff image contains the actual image differences between the two images
        # and is represented as a floating point data type so we must convert the array
        # to 8-bit unsigned integers in the range [0,255] before we can use it with OpenCV
        diff = (masked_diff * 255).astype("uint8")

    # Threshold the difference image, followed by finding contours to
    # obtain the regions that differ between the two images
    thresh = cv2.threshold(
        unmasked_diff,
        0,
        255,
        cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU,
    )[1]
    contours = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if len(contours) == 2:  # noqa: PLR2004, SIM108
        contours = contours[0]  # type: ignore[assignment]
    else:
        contours = contours[1]  # type: ignore[assignment, unreachable]

    # Highlight differences
    mask = np.zeros(first.shape, dtype="uint8")
    filled = second.copy()

    for c in contours:
        area = cv2.contourArea(c)  # type: ignore[arg-type]
        if area > 100:  # noqa: PLR2004
            x, y, w, h = cv2.boundingRect(c)  # type: ignore[arg-type]
            cv2.rectangle(first, (x, y), (x + w, y + h), (36, 255, 12), 2)
            cv2.rectangle(second, (x, y), (x + w, y + h), (36, 255, 12), 2)
            cv2.drawContours(mask, [c], 0, (0, 255, 0), -1)  # type: ignore[list-item]
            cv2.drawContours(filled, [c], 0, (0, 255, 0), -1)  # type: ignore[list-item]

    cv2.imwrite(_append_qualifier(first_image, "contour"), first)
    cv2.imwrite(_append_qualifier(second_image, "contour"), second)
    cv2.imwrite(_append_qualifier(second_image, "diff"), diff)
    cv2.imwrite(_append_qualifier(second_image, "mask"), mask)
    cv2.imwrite(_append_qualifier(second_image, "filled"), filled)
    if first_masked is not None:
        cv2.imwrite(_append_qualifier(first_image, "masked"), first_masked)
    if second_masked is not None:
        cv2.imwrite(_append_qualifier(second_image, "masked"), second_masked)

    # NB: cv2.waitKey() is interactive and shold not be used in CI environments
    # but you knew that already 😁!
    if show_images:
        cv2.imshow("first", first)
        cv2.imshow("second", second)
        cv2.imshow("diff", diff)
        cv2.imshow("mask", mask)
        cv2.imshow("filled", filled)
        if first_masked is not None:
            cv2.imshow("first_masked", first_masked)
        if second_masked is not None:
            cv2.imshow("second_masked", second_masked)
        cv2.waitKey()

    return (
        round(unmasked_score * 100, 3)
        if masked_score is None
        else round(masked_score * 100, 3)
    )

iperf

Common Iperf use cases.

Functions:

Name Description
get_iperf_logs

Check logs and returns traffic flow.

parse_iperf_logs

Parse iperf logs and return bitrate, transfer etc.

set_device_interface_state

Toggle the interface based on the action passed.

start_iperf_ipv4

Initiate IPv4 downstream traffic from source device to destination device.

start_iperf_ipv4_bidirectional

Initiate IPv4 bidirectional traffic from source device to destination device.

start_iperf_ipv4_downstream

Initiate IPv4 downstream traffic from source device to destination device.

start_iperf_ipv6

Initiate IPv6 downstream traffic from source device to destination device.

start_iperf_ipv6_bidirectional

Initiate IPv6 bidirectional traffic from source device to destination device.

start_iperf_ipv6_downstream

Initiate IPv6 downstream traffic from source device to destination device.

stop_iperf_traffic

Stop the iPerf3 processes on sender as well as receiver.

get_iperf_logs

get_iperf_logs(iperf_data: IPerf3TrafficGenerator) -> dict

Check logs and returns traffic flow.

Parameters:

Name Type Description Default

iperf_data

IPerf3TrafficGenerator

IPerf3TrafficGenerator, holds sender and reciever info.

required

Returns:

Type Description
dict

traffic logs of both server and client.

Source code in boardfarm3/use_cases/iperf.py
493
494
495
496
497
498
499
500
501
502
503
def get_iperf_logs(iperf_data: IPerf3TrafficGenerator) -> dict:
    """Check logs and returns traffic flow.

    :param iperf_data: IPerf3TrafficGenerator, holds sender and reciever info.
    :type iperf_data: IPerf3TrafficGenerator
    :return: traffic logs of both server and client.
    :rtype: dict
    """
    server_logs = iperf_data.traffic_receiver.get_iperf_logs(iperf_data.server_log_file)
    client_logs = iperf_data.traffic_sender.get_iperf_logs(iperf_data.client_log_file)
    return {"client_logs": client_logs, "server_logs": server_logs}

parse_iperf_logs

parse_iperf_logs(
    iperf_logs: str, is_client_log: bool = False, udp_only: bool | None = None
) -> dict[str, str]

Parse iperf logs and return bitrate, transfer etc.

Parameters:

Name Type Description Default

iperf_logs

str

client or server logs

required

is_client_log

bool

True if client logs to be prased, defaults to False

False

udp_only

bool | None

to be used if protocol is UDP only, backward compatibility with iperf version 2

None

Returns:

Type Description
dict[str, str]

dict with throughput, transfer, interval values.

Raises:

Type Description
UseCaseFailure

If unable to parse output

Source code in boardfarm3/use_cases/iperf.py
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
def parse_iperf_logs(
    iperf_logs: str, is_client_log: bool = False, udp_only: bool | None = None
) -> dict[str, str]:
    """Parse iperf logs and return bitrate, transfer etc.

    :param iperf_logs: client or server logs
    :type iperf_logs: str
    :param is_client_log: True if client logs to be prased, defaults to False
    :type is_client_log: bool
    :param udp_only: to be used if protocol is UDP only,
        backward compatibility with iperf version 2
    :type udp_only: bool, optional
    :raises UseCaseFailure: If unable to parse output
    :return: dict with throughput, transfer, interval values.
    :rtype: dict[str, str]
    """
    if udp_only:
        search_pattern = ""
    else:
        search_pattern = r"[\d]+\s+sender" if is_client_log else "receiver"
    if matching_object := re.search(
        r"([\d.]+-[\d.]+)\s+sec\s+([\d.]+\s+\w?Bytes)\s+([\d.]+\s+\w?bits/sec)\s+"
        f"{search_pattern}",
        iperf_logs,
    ):
        interval, transfer, bitrate = matching_object.groups()
        return {"Interval": interval, "Transfer": transfer, "Bitrate": bitrate}
    msg = "Sender / Receiver data not found in the output."  # type:ignore[unreachable]
    raise UseCaseFailure(msg)

set_device_interface_state

set_device_interface_state(
    device: LAN | WAN | WLAN, interface: str, action: Literal["up", "down"]
) -> None

Toggle the interface based on the action passed.

Parameters:

Name Type Description Default

device

LAN | WAN | WLAN

device instance

required

interface

str

name of the interface

required

action

Literal['up', 'down']

up or down

required
Source code in boardfarm3/use_cases/iperf.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def set_device_interface_state(
    device: LAN | WAN | WLAN,
    interface: str,
    action: Literal["up", "down"],
) -> None:
    """Toggle the interface based on the action passed.

    :param device: device instance
    :type device: LAN | WAN | WLAN
    :param interface: name of the interface
    :type interface: str
    :param action: up or down
    :type action: Literal["up", "down"]
    """
    device.set_link_state(interface, action)

start_iperf_ipv4

start_iperf_ipv4(
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    direction: str | None = None,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    client_port: int | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator

Initiate IPv4 downstream traffic from source device to destination device.

Starts the iPerf3 server on a traffic receiver and triggers the IPv4 only traffic from source device.

if unable to start traffic sender, stops the process for receiver

.. hint:: This Use Case implements statements from the test suite such as:

- Start an iPerf server on LAN/WAN host
- Start an iPerf client on LAN/WAN host

Parameters:

Name Type Description Default

source_device

LAN | WAN | WLAN

device instance

required

destination_device

LAN | WAN | WLAN

device instance

required

source_port

int

source port to listen on/connect to

required

time

int

time in seconds to transmit

required

udp_protocol

bool

use UDP rather than TCP

required

direction

str | None

--reverse to run in reverse mode (server sends, client receives) defaults to None

None

destination_port

int | None

destination port to listen on/connect to

None

bind_sender_ip

str | None

bind to the interface associated with the client address, defaults to None

None

bind_receiver_ip

str | None

bind to the interface associated with the host address, defaults to None

None

destination_ip

str | None

IPv4 address used for iPerf traffic, defaults to None

None

client_port

int | None

client port from where the traffic is getting started

None

udp_only

bool | None

to be used if protocol is UDP only, backward compatibility with iperf version 2

None

Returns:

Type Description
IPerf3TrafficGenerator

IPerf3TrafficGenerator data class that holds sender/receiver devices, their process ids and log file details

Source code in boardfarm3/use_cases/iperf.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def start_iperf_ipv4(  # pylint: disable=too-many-arguments,R0914  # noqa: PLR0913
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    direction: str | None = None,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    client_port: int | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator:
    """Initiate IPv4 downstream traffic from source device to destination device.

    Starts the iPerf3 server on a traffic receiver and triggers the IPv4 only
    traffic from source device.

    if unable to start traffic sender, stops the process for receiver

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an iPerf server on LAN/WAN host
        - Start an iPerf client on LAN/WAN host

    :param source_device: device instance
    :type source_device: LAN | WAN | WLAN
    :param destination_device: device instance
    :type destination_device: LAN | WAN | WLAN
    :param source_port: source port to listen on/connect to
    :type source_port: int
    :param time: time in seconds to transmit
    :type time: int
    :param udp_protocol: use UDP rather than TCP
    :type udp_protocol: bool
    :param direction: `--reverse` to run in reverse mode (server sends, client receives)
        defaults to None
    :type direction: str | None
    :param destination_port: destination port to listen on/connect to
    :type destination_port: int | None
    :param bind_sender_ip: bind to the interface associated with the
        client address, defaults to None
    :type bind_sender_ip: str | None
    :param bind_receiver_ip: bind to the interface associated with the
        host address, defaults to None
    :type bind_receiver_ip: str | None
    :param destination_ip: IPv4 address used for iPerf traffic, defaults to None
    :type destination_ip: str | None
    :param client_port: client port from where the traffic is getting started
    :type client_port: int | None
    :param udp_only: to be used if protocol is UDP only,
        backward compatibility with iperf version 2
    :type udp_only: bool, optional
    :return: IPerf3TrafficGenerator data class that holds
        sender/receiver devices, their process ids and log
        file details
    :rtype: IPerf3TrafficGenerator
    """
    dest_ip = (
        destination_device.get_interface_ipv4addr(destination_device.iface_dut)
        if destination_ip is None
        else destination_ip
    )
    destination_port = source_port if destination_port is None else destination_port
    dest_pid, server_log_file = destination_device.start_traffic_receiver(
        destination_port, bind_to_ip=bind_receiver_ip, ip_version=4, udp_only=udp_only
    )
    try:
        source_pid, client_log_file = source_device.start_traffic_sender(
            dest_ip,
            source_port,
            bind_to_ip=bind_sender_ip,
            ip_version=4,
            udp_protocol=udp_protocol,
            time=time,
            direction=direction,
            client_port=client_port,
            udp_only=udp_only,
        )
    # handles scenario where server started but unable to start traffic sender(client)
    # IPerf3TrafficGenerator is sent with empty pid for receiver, so that sender's
    # process can be killed by test case.
    except CodeError:
        source_pid = None
        client_log_file = ""
        stop_iperf_traffic(
            IPerf3TrafficGenerator(
                source_device, source_pid, destination_device, dest_pid
            )
        )
    return IPerf3TrafficGenerator(
        source_device,
        source_pid,
        destination_device,
        dest_pid,
        server_log_file,
        client_log_file,
    )

start_iperf_ipv4_bidirectional

start_iperf_ipv4_bidirectional(
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator

Initiate IPv4 bidirectional traffic from source device to destination device.

Executes the initiate_v4_traffic Use Case in bidirectional mode.

.. hint:: This Use Case implements statements from the test suite such as:

- Start an iPerf server on LAN/WAN host
- Start an iPerf client on LAN/WAN host

Parameters:

Name Type Description Default

source_device

LAN | WAN | WLAN

device instance

required

destination_device

LAN | WAN | WLAN

device instance

required

source_port

int

source port to listen on/connect to

required

time

int

time in seconds to transmit

required

udp_protocol

bool

use UDP rather than TCP

required

destination_port

int | None

destination port to listen on/connect to

None

bind_sender_ip

str | None

bind to the interface associated with the client address, defaults to None

None

bind_receiver_ip

str | None

bind to the interface associated with the host address, defaults to None

None

destination_ip

str | None

IPv4 address used for iPerf traffic, defaults to None

None

udp_only

bool | None

to be used if protocol is UDP only, backward compatibility with iperf version 2

None

Returns:

Type Description
IPerf3TrafficGenerator

IPerf3TrafficGenerator data class that holds sender/receiver devices, their process ids and log file details

Source code in boardfarm3/use_cases/iperf.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def start_iperf_ipv4_bidirectional(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator:
    """Initiate IPv4 bidirectional traffic from source device to destination device.

    Executes the initiate_v4_traffic Use Case in bidirectional mode.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an iPerf server on LAN/WAN host
        - Start an iPerf client on LAN/WAN host

    :param source_device: device instance
    :type source_device: LAN | WAN | WLAN
    :param destination_device: device instance
    :type destination_device: LAN | WAN | WLAN
    :param source_port: source port to listen on/connect to
    :type source_port: int
    :param time: time in seconds to transmit
    :type time: int
    :param udp_protocol: use UDP rather than TCP
    :type udp_protocol: bool
    :param destination_port: destination port to listen on/connect to
    :type destination_port: int | None
    :param bind_sender_ip: bind to the interface associated with the
        client address, defaults to None
    :type bind_sender_ip: str | None
    :param bind_receiver_ip: bind to the interface associated with the
        host address, defaults to None
    :type bind_receiver_ip: str | None
    :param destination_ip: IPv4 address used for iPerf traffic, defaults to None
    :type destination_ip: str | None
    :param udp_only: to be used if protocol is UDP only,
            backward compatibility with iperf version 2
    :type udp_only: bool, optional
    :return: IPerf3TrafficGenerator data class that holds
        sender/receiver devices, their process ids and log
        file details
    :rtype: IPerf3TrafficGenerator
    """
    return start_iperf_ipv4(
        source_device=source_device,
        destination_device=destination_device,
        direction="--bidir",
        source_port=source_port,
        time=time,
        udp_protocol=udp_protocol,
        destination_port=destination_port,
        bind_sender_ip=bind_sender_ip,
        bind_receiver_ip=bind_receiver_ip,
        destination_ip=destination_ip,
        udp_only=udp_only,
    )

start_iperf_ipv4_downstream

start_iperf_ipv4_downstream(
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator

Initiate IPv4 downstream traffic from source device to destination device.

Executes the initiate_v4_traffic Use Case in downstream mode.

.. hint:: This Use Case implements statements from the test suite such as:

- Start an iPerf server on LAN/WAN host
- Start an iPerf client on LAN/WAN host

Parameters:

Name Type Description Default

source_device

LAN | WAN | WLAN

device instance

required

destination_device

LAN | WAN | WLAN

device instance

required

source_port

int

source port to listen on/connect to

required

time

int

time in seconds to transmit

required

udp_protocol

bool

use UDP rather than TCP

required

destination_port

int | None

destination port to listen on/connect to

None

bind_sender_ip

str | None

bind to the interface associated with the client address, defaults to None

None

bind_receiver_ip

str | None

bind to the interface associated with the host address, defaults to None

None

destination_ip

str | None

IPv4 address used for iPerf traffic, defaults to None

None

udp_only

bool | None

to be used if protocol is UDP only, backward compatibility with iperf version 2

None

Returns:

Type Description
IPerf3TrafficGenerator

IPerf3TrafficGenerator data class that holds sender/receiver devices, their process ids and log file details

Source code in boardfarm3/use_cases/iperf.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def start_iperf_ipv4_downstream(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator:
    """Initiate IPv4 downstream traffic from source device to destination device.

    Executes the initiate_v4_traffic Use Case in downstream mode.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an iPerf server on LAN/WAN host
        - Start an iPerf client on LAN/WAN host

    :param source_device: device instance
    :type source_device: LAN | WAN | WLAN
    :param destination_device: device instance
    :type destination_device: LAN | WAN | WLAN
    :param source_port: source port to listen on/connect to
    :type source_port: int
    :param time: time in seconds to transmit
    :type time: int
    :param udp_protocol: use UDP rather than TCP
    :type udp_protocol: bool
    :param destination_port: destination port to listen on/connect to
    :type destination_port: int | None
    :param bind_sender_ip: bind to the interface associated with the
        client address, defaults to None
    :type bind_sender_ip: str | None
    :param bind_receiver_ip: bind to the interface associated with the
        host address, defaults to None
    :type bind_receiver_ip: str | None
    :param destination_ip: IPv4 address used for iPerf traffic, defaults to None
    :type destination_ip: str | None
    :param udp_only: to be used if protocol is UDP only,
            backward compatibility with iperf version 2
    :type udp_only: bool, optional
    :return: IPerf3TrafficGenerator data class that holds
        sender/receiver devices, their process ids and log
        file details
    :rtype: IPerf3TrafficGenerator
    """
    return start_iperf_ipv4(
        source_device=source_device,
        destination_device=destination_device,
        direction="--reverse",
        source_port=source_port,
        time=time,
        udp_protocol=udp_protocol,
        destination_port=destination_port,
        bind_sender_ip=bind_sender_ip,
        bind_receiver_ip=bind_receiver_ip,
        destination_ip=destination_ip,
        udp_only=udp_only,
    )

start_iperf_ipv6

start_iperf_ipv6(
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    direction: str | None = None,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    client_port: int | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator

Initiate IPv6 downstream traffic from source device to destination device.

Starts the iPerf3 server on a traffic receiver and triggers the IPv6 only traffic from source device.

if unable to start traffic sender, stops the process for receiver

.. hint:: This Use Case implements statements from the test suite such as:

- Start an iPerf server on LAN/WAN host
- Start an iPerf client on LAN/WAN host

Parameters:

Name Type Description Default

source_device

LAN | WAN | WLAN

device instance

required

destination_device

LAN | WAN | WLAN

device instance

required

source_port

int

source port to listen on/connect to

required

time

int

time in seconds to transmit

required

udp_protocol

bool

use UDP rather than TCP

required

direction

Literal['--reverse', '--bidir']

--reverse to run in reverse mode (server sends, client receives) defaults to None

None

destination_port

int | None

destination port to listen on/connect to

None

bind_sender_ip

str | None

bind to the interface associated with the client address, defaults to None

None

bind_receiver_ip

str | None

bind to the interface associated with the host address, defaults to None

None

destination_ip

str | None

IPv6 address used for iPerf traffic, defaults to None

None

client_port

int | None

client port from where the traffic is getting started

None

udp_only

bool | None

to be used if protocol is UDP only, backward compatibility with iperf version 2

None

Returns:

Type Description
IPerf3TrafficGenerator

IPerf3TrafficGenerator data class that holds sender/receiver devices, their process ids and log file details

Source code in boardfarm3/use_cases/iperf.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def start_iperf_ipv6(  # pylint: disable=too-many-arguments,R0914  # noqa: PLR0913
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    direction: str | None = None,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    client_port: int | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator:
    """Initiate IPv6 downstream traffic from source device to destination device.

    Starts the iPerf3 server on a traffic receiver and triggers the IPv6 only
    traffic from source device.

    if unable to start traffic sender, stops the process for receiver

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an iPerf server on LAN/WAN host
        - Start an iPerf client on LAN/WAN host

    :param source_device: device instance
    :type source_device: LAN | WAN | WLAN
    :param destination_device: device instance
    :type destination_device: LAN | WAN | WLAN
    :type direction: Literal["--reverse", "--bidir"]
    :param source_port: source port to listen on/connect to
    :type source_port: int
    :param time: time in seconds to transmit
    :type time: int
    :param udp_protocol: use UDP rather than TCP
    :type udp_protocol: bool
    :param direction: `--reverse` to run in reverse mode (server sends, client receives)
        defaults to None
    :type direction: str | None
    :param destination_port: destination port to listen on/connect to
    :type destination_port: int | None
    :param bind_sender_ip: bind to the interface associated with the
        client address, defaults to None
    :type bind_sender_ip: str | None
    :param bind_receiver_ip: bind to the interface associated with the
        host address, defaults to None
    :type bind_receiver_ip: str | None
    :param destination_ip: IPv6 address used for iPerf traffic, defaults to None
    :type destination_ip: str | None
    :param client_port: client port from where the traffic is getting started
    :type client_port: int | None
    :param udp_only: to be used if protocol is UDP only,
            backward compatibility with iperf version 2
    :type udp_only: bool, optional
    :return: IPerf3TrafficGenerator data class that holds
        sender/receiver devices, their process ids and log
        file details
    :rtype: IPerf3TrafficGenerator
    """
    dest_ip6 = (
        destination_device.get_interface_ipv6addr(destination_device.iface_dut)
        if destination_ip is None
        else destination_ip
    )
    destination_port = source_port if destination_port is None else destination_port
    dest_pid, server_log_file = destination_device.start_traffic_receiver(
        destination_port, bind_to_ip=bind_receiver_ip, ip_version=6, udp_only=udp_only
    )
    try:
        source_pid, client_log_file = source_device.start_traffic_sender(
            dest_ip6,
            source_port,
            bind_to_ip=bind_sender_ip,
            ip_version=6,
            udp_protocol=udp_protocol,
            time=time,
            direction=direction,
            client_port=client_port,
            udp_only=udp_only,
        )
    # handles scenario where server started but unable to start traffic sender(client)
    # IPerf3TrafficGenerator is sent with empty pid for receiver, so that sender's
    # process can be killed by test case.
    except CodeError:
        source_pid = None
        client_log_file = ""
        stop_iperf_traffic(
            IPerf3TrafficGenerator(
                source_device, source_pid, destination_device, dest_pid
            )
        )
    return IPerf3TrafficGenerator(
        source_device,
        source_pid,
        destination_device,
        dest_pid,
        server_log_file,
        client_log_file,
    )

start_iperf_ipv6_bidirectional

start_iperf_ipv6_bidirectional(
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator

Initiate IPv6 bidirectional traffic from source device to destination device.

Executes the initiate_v6_traffic Use Case in bidirectional mode.

.. hint:: This Use Case implements statements from the test suite such as:

- Start an iPerf server on LAN/WAN host
- Start an iPerf client on LAN/WAN host

Parameters:

Name Type Description Default

source_device

LAN | WAN | WLAN

device instance for iperf client

required

destination_device

LAN | WAN | WLAN

device instance for iPerf server

required

source_port

int

server port to listen on/connect to

required

time

int

time in seconds to transmit

required

udp_protocol

bool

use UDP rather than TCP

required

destination_port

int | None

destination port to listen on/connect to, defaults to None

None

bind_sender_ip

str | None

bind to the interface associated with the client address,, defaults to None

None

bind_receiver_ip

str | None

bind to the interface associated with the host address, defaults to None

None

destination_ip

str | None

IPv6 address used for iPerf traffic, defaults to None

None

udp_only

bool | None

to be used if protocol is UDP only, backward compatibility with iperf version 2

None

Returns:

Type Description
IPerf3TrafficGenerator

IPerf3TrafficGenerator data class that holds sender/receiver devices, their process ids and log file details

Source code in boardfarm3/use_cases/iperf.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def start_iperf_ipv6_bidirectional(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator:
    """Initiate IPv6 bidirectional traffic from source device to destination device.

    Executes the initiate_v6_traffic Use Case in bidirectional mode.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an iPerf server on LAN/WAN host
        - Start an iPerf client on LAN/WAN host

    :param source_device: device instance for iperf client
    :type source_device: LAN | WAN | WLAN
    :param destination_device: device instance for iPerf server
    :type destination_device: LAN | WAN | WLAN
    :param source_port: server port to listen on/connect to
    :type source_port: int
    :param time: time in seconds to transmit
    :type time: int
    :param udp_protocol: use UDP rather than TCP
    :type udp_protocol: bool
    :param destination_port: destination port to listen on/connect to, defaults to None
    :type destination_port: int | None
    :param bind_sender_ip: bind to the interface associated with the
        client address,, defaults to None
    :type bind_sender_ip: str | None
    :param bind_receiver_ip: bind to the interface associated with the
        host address, defaults to None
    :type bind_receiver_ip: str | None,
    :param destination_ip: IPv6 address used for iPerf traffic, defaults to None
    :type destination_ip: str | None
    :param udp_only: to be used if protocol is UDP only,
            backward compatibility with iperf version 2
    :type udp_only: bool, optional
    :return: IPerf3TrafficGenerator data class that holds
        sender/receiver devices, their process ids and log
        file details
    :rtype: IPerf3TrafficGenerator
    """
    return start_iperf_ipv6(
        source_device=source_device,
        destination_device=destination_device,
        direction="--bidir",
        source_port=source_port,
        time=time,
        udp_protocol=udp_protocol,
        destination_port=destination_port,
        bind_sender_ip=bind_sender_ip,
        bind_receiver_ip=bind_receiver_ip,
        destination_ip=destination_ip,
        udp_only=udp_only,
    )

start_iperf_ipv6_downstream

start_iperf_ipv6_downstream(
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator

Initiate IPv6 downstream traffic from source device to destination device.

Executes the initiate_v6_traffic Use Case with downstream mode.

.. hint:: This Use Case implements statements from the test suite such as:

- Start an iPerf server on LAN/WAN host
- Start an iPerf client on LAN/WAN host

Parameters:

Name Type Description Default

source_device

LAN | WAN | WLAN

device instance for iperf client

required

destination_device

LAN | WAN | WLAN

device instance for iPerf server

required

source_port

int

server port to listen on/connect to

required

time

int

time in seconds to transmit

required

udp_protocol

bool

use UDP rather than TCP

required

destination_port

int | None

destination port to listen on/connect to, defaults to None

None

bind_sender_ip

str | None

bind to the interface associated with the client address,, defaults to None

None

bind_receiver_ip

str | None

bind to the interface associated with the host address, defaults to None

None

destination_ip

str | None

IPv6 address used for iPerf traffic, defaults to None

None

udp_only

bool | None

to be used if protocol is UDP only, backward compatibility with iperf version 2

None

Returns:

Type Description
IPerf3TrafficGenerator

IPerf3TrafficGenerator data class that holds sender/receiver devices, their process ids and log file details

Source code in boardfarm3/use_cases/iperf.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def start_iperf_ipv6_downstream(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_device: LAN | WAN | WLAN,
    destination_device: LAN | WAN | WLAN,
    source_port: int,
    time: int,
    udp_protocol: bool,
    destination_port: int | None = None,
    bind_sender_ip: str | None = None,
    bind_receiver_ip: str | None = None,
    destination_ip: str | None = None,
    udp_only: bool | None = None,
) -> IPerf3TrafficGenerator:
    """Initiate IPv6 downstream traffic from source device to destination device.

    Executes the initiate_v6_traffic Use Case with downstream mode.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an iPerf server on LAN/WAN host
        - Start an iPerf client on LAN/WAN host

    :param source_device: device instance for iperf client
    :type source_device: LAN | WAN | WLAN
    :param destination_device: device instance for iPerf server
    :type destination_device: LAN | WAN | WLAN
    :param source_port: server port to listen on/connect to
    :type source_port: int
    :param time: time in seconds to transmit
    :type time: int
    :param udp_protocol: use UDP rather than TCP
    :type udp_protocol: bool
    :param destination_port: destination port to listen on/connect to, defaults to None
    :type destination_port: int | None
    :param bind_sender_ip: bind to the interface associated with the
        client address,, defaults to None
    :type bind_sender_ip: str | None
    :param bind_receiver_ip: bind to the interface associated with the
        host address, defaults to None
    :type bind_receiver_ip: str | None,
    :param destination_ip: IPv6 address used for iPerf traffic, defaults to None
    :type destination_ip: str | None
    :param udp_only: to be used if protocol is UDP only,
            backward compatibility with iperf version 2
    :type udp_only: bool, optional
    :return: IPerf3TrafficGenerator data class that holds
        sender/receiver devices, their process ids and log
        file details
    :rtype: IPerf3TrafficGenerator
    """
    return start_iperf_ipv6(
        source_device=source_device,
        destination_device=destination_device,
        direction="--reverse",
        source_port=source_port,
        time=time,
        udp_protocol=udp_protocol,
        destination_port=destination_port,
        bind_sender_ip=bind_sender_ip,
        bind_receiver_ip=bind_receiver_ip,
        destination_ip=destination_ip,
        udp_only=udp_only,
    )

stop_iperf_traffic

stop_iperf_traffic(iperf_generator: IPerf3TrafficGenerator) -> None

Stop the iPerf3 processes on sender as well as receiver.

Parameters:

Name Type Description Default

iperf_generator

IPerf3TrafficGenerator

data class that holds sender/receiver devices and their process IDs

required

Raises:

Type Description
UseCaseFailure

when either iPerf3 server or client PID can't be killed

Source code in boardfarm3/use_cases/iperf.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def stop_iperf_traffic(iperf_generator: IPerf3TrafficGenerator) -> None:
    """Stop the iPerf3 processes on sender as well as receiver.

    :param iperf_generator: data class that holds sender/receiver devices and
        their process IDs
    :type iperf_generator: IPerf3TrafficGenerator
    :raises UseCaseFailure: when either iPerf3 server or client PID can't be killed
    """
    sender = (
        iperf_generator.traffic_sender.stop_traffic(iperf_generator.sender_pid)
        if iperf_generator.traffic_sender and iperf_generator.sender_pid
        else None
    )

    receiver = (
        iperf_generator.traffic_receiver.stop_traffic(iperf_generator.receiver_pid)
        if iperf_generator.traffic_receiver and iperf_generator.receiver_pid
        else None
    )

    if (iperf_generator.sender_pid and not sender) or (
        iperf_generator.receiver_pid and not receiver
    ):
        msg = (
            "Either Sender(Client) or Receiver(Server) process cannot be killed:",
            f"{sender=} - {receiver=}",
        )
        raise UseCaseFailure(msg)

multicast

Multicast Use Cases.

This will include connecting to a multicast stream via iPerf, ip-mroute or smcroute.

Functions:

Name Description
join_iperf_multicast_asm_group

Start an iPerf server binding to a multicast address in background.

join_iperf_multicast_ssm_group

Start an iPerf server binding to a multicast address in background.

kill_all_iperf

Kill all iPerf sessions on target devices.

leave_iperf_multicast_group

Send IGMP leave to stop receiving multicast traffic.

parse_mcast_trace

Compare captured PCAP file against an expected sequence of packets.

start_iperf_multicast_stream

Start an iPerf client sending data on multicast address in background.

tcpdump

Start packet capture using tcpdump and kill the process at the end.

wait_for_multicast_stream_to_end

Wait for all multicast streams to end.

join_iperf_multicast_asm_group

join_iperf_multicast_asm_group(
    on_which_device: IperfDevice, multicast_group_addr: str, port: int
) -> IPerfSession

Start an iPerf server binding to a multicast address in background.

This Use Case is applicable for ASM (any-source multicast) channels (*,G)

Session will have the following parameters by default: - 1s interval between periodic bandwidth, jitter, and loss reports.

The Use Case will return an iPerf Session object holding following info: - Target device class object on which iperf command is executed - PID of the iPerf session - Multicast group address - Multicast port - CSV output file of the iperf session

.. note::

- CSV output file can only be accessed once you leave the multicast group.

.. hint:: This Use Case implements statements from the test suite such as:

- Start an Ethernet LAN client to request CPE for
  IPv4 ASM traffic from WAN multicast server
- Start client to join/subscribe any source multicast channel (S,G)
  by sending IGMPv3 report

Parameters:

Name Type Description Default

on_which_device

IperfDevice

Object of the device that joins the mcast group

required

multicast_group_addr

str

multicast stream's group IP address to join

required

port

int

multicast stream's port number

required

Returns:

Type Description
IPerfSession

object holding data on the iPerf Session

Raises:

Type Description
UseCaseFailure

if there is a multicat stream.

Source code in boardfarm3/use_cases/multicast.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def join_iperf_multicast_asm_group(
    on_which_device: IperfDevice,
    multicast_group_addr: str,
    port: int,
) -> IPerfSession:
    r"""Start an iPerf server binding to a multicast address in background.

    This Use Case is applicable for ASM (any-source multicast) channels (\*,G)

    Session will have the following parameters by default:
        - 1s interval between periodic bandwidth, jitter,
          and loss reports.

    The Use Case will return an iPerf Session object holding
    following info:
    - Target device class object on which iperf command is executed
    - PID of the iPerf session
    - Multicast group address
    - Multicast port
    - CSV output file of the iperf session

    .. note::

        - CSV output file can only be accessed once you leave the multicast group.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an Ethernet LAN client to request CPE for
          IPv4 ASM traffic from WAN multicast server
        - Start client to join/subscribe any source multicast channel (S,G)
          by sending IGMPv3 report

    :param on_which_device: Object of the device that joins the mcast group
    :type on_which_device: IperfDevice
    :param multicast_group_addr: multicast stream's group IP address to join
    :type multicast_group_addr: str
    :param port: multicast stream's port number
    :type port: int
    :return: object holding data on the iPerf Session
    :rtype: IPerfSession
    :raises UseCaseFailure: if there is a multicat stream.
    """
    dev = on_which_device
    ipv6_flag = ""

    if isinstance(ip_address(multicast_group_addr), IPv6Address):
        ipv6_flag = "-V"

    # Cannot have any iperf session running for the same mul
    if _is_multicast_stream_active(dev, multicast_group_addr, port):
        msg = (
            f"{dev} already has an iperf session with "
            f"port {port} and multicast address {multicast_group_addr}"
        )
        raise UseCaseFailure(msg)

    fname = f"mclient_{port}.txt"

    # run iperf, format result as CSV
    dev.console.execute_command(
        f"iperf {ipv6_flag} -s -f m -u -U -p {port} "
        f"-B {multicast_group_addr} "
        f"-i 1 -y C > {fname} &"
    )

    pid = dev.console.execute_command(
        f"pgrep iperf -a | grep {port} | awk '{{print$1}}'"
    )
    return IPerfSession(on_which_device, pid, multicast_group_addr, port, fname)

join_iperf_multicast_ssm_group

join_iperf_multicast_ssm_group(
    on_which_device: IperfDevice,
    multicast_source_addr: str,
    multicast_group_addr: str,
    port: int,
) -> IPerfSession

Start an iPerf server binding to a multicast address in background.

This Use Case is applicable for SSM (source-specific multicast) channels (S,G)

Session will have the following parameters by default: - 1s interval between periodic bandwidth, jitter, and loss reports.

The Use Case will return an iPerf Session object holding following info: - Target device class object on which iperf command is executed - PID of the iperf session - Multicast group address - Multicast port - CSV output file of the iPerf session

.. note::

- The multicast source will always be a WAN device.
- CSV output file can only be accessed once you leave the multicast group.

.. hint:: This Use Case implements statements from the test suite such as:

- Start an Ethernet LAN client to request CPE for
  IPv4 SSM traffic from WAN multicast server
- Start client to join/subscribe a specific source and Group (S,G)
  channel by sending IGMPv3 report

Parameters:

Name Type Description Default

on_which_device

IperfDevice

Object of the device that joins the mcast group

required

multicast_source_addr

str

WAN IP address used to run the mcast stream

required

multicast_group_addr

str

multicast stream's group IP address to join

required

port

int

multicast stream's port number

required

Returns:

Type Description
IPerfSession

object holding data on the iPerf Session

Raises:

Type Description
UseCaseFailure

if there is a multicat stream.

Source code in boardfarm3/use_cases/multicast.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def join_iperf_multicast_ssm_group(
    on_which_device: IperfDevice,
    multicast_source_addr: str,
    multicast_group_addr: str,
    port: int,
) -> IPerfSession:
    """Start an iPerf server binding to a multicast address in background.

    This Use Case is applicable for SSM (source-specific multicast) channels (S,G)

    Session will have the following parameters by default:
        - 1s interval between periodic bandwidth, jitter,
          and loss reports.

    The Use Case will return an iPerf Session object holding
    following info:
    - Target device class object on which iperf command is executed
    - PID of the iperf session
    - Multicast group address
    - Multicast port
    - CSV output file of the iPerf session

    .. note::

        - The multicast source will always be a WAN device.
        - CSV output file can only be accessed once you leave the multicast group.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an Ethernet LAN client to request CPE for
          IPv4 SSM traffic from WAN multicast server
        - Start client to join/subscribe a specific source and Group (S,G)
          channel by sending IGMPv3 report

    :param on_which_device: Object of the device that joins the mcast group
    :type on_which_device: IperfDevice
    :param multicast_source_addr: WAN IP address used to run the mcast stream
    :type multicast_source_addr: str
    :param multicast_group_addr: multicast stream's group IP address to join
    :type multicast_group_addr: str
    :param port: multicast stream's port number
    :type port: int
    :return: object holding data on the iPerf Session
    :rtype: IPerfSession
    :raises UseCaseFailure: if there is a multicat stream.
    """
    dev = on_which_device
    ipv6_flag = ""

    if isinstance(ip_address(multicast_source_addr), IPv6Address) and isinstance(
        ip_address(multicast_group_addr), IPv6Address
    ):
        ipv6_flag = "-V"

    # Cannot have any iperf session running for the same mul
    if _is_multicast_stream_active(dev, multicast_group_addr, port):
        msg = (
            f"{dev} already has an iperf session with "
            f"port {port} and multicast address {multicast_group_addr}"
        )
        raise UseCaseFailure(msg)

    fname = f"mclient_{port}.txt"

    # run iperf, format result as CSV
    dev.console.execute_command(
        f"iperf {ipv6_flag} -s -f m -u -U -p {port} "
        f"-B {multicast_group_addr} --ssm-host {multicast_source_addr} "
        f"-i 1 -y C > {fname} &"
    )

    pid = dev.console.execute_command(
        f"pgrep iperf -a | grep {port} | awk '{{print$1}}'"
    )
    return IPerfSession(on_which_device, pid, multicast_group_addr, port, fname)

kill_all_iperf

kill_all_iperf(device_list: list[IperfDevice]) -> None

Kill all iPerf sessions on target devices.

This should be called for cleaning purposes.

.. hint:: This Use Case implements statements from the test suite such as:

- Kill all iPerf session on target devices.

Parameters:

Name Type Description Default

device_list

list[IperfDevice]

list of target devices

required
Source code in boardfarm3/use_cases/multicast.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def kill_all_iperf(device_list: list[IperfDevice]) -> None:
    """Kill all iPerf sessions on target devices.

    This should be called for cleaning purposes.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Kill all iPerf session on target devices.

    :param device_list: list of target devices
    :type device_list: list[IPerfDevice]
    """
    for device in device_list:
        device.multicast.kill_all_iperf_sessions()

leave_iperf_multicast_group

leave_iperf_multicast_group(session: IPerfSession) -> IPerfResult

Send IGMP leave to stop receiving multicast traffic.

This is achieved by stopping the iPerf server bounded to a multicast channel ASM/SSM.

Executes a kill -15 on target device. In case of IGMPv3, will send a block old sources membership report.

Parameters:

Name Type Description Default

session

IPerfSession

Session object created during the join

required

Returns:

Type Description
IPerfResult

iPerf result

Raises:

Type Description
UseCaseFailure

If the device fails to leave the multicast group.

Source code in boardfarm3/use_cases/multicast.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def leave_iperf_multicast_group(session: IPerfSession) -> IPerfResult:
    """Send IGMP leave to stop receiving multicast traffic.

    This is achieved by stopping the iPerf server bounded
    to a multicast channel ASM/SSM.

    Executes a kill -15 <iPerf session pid> on target device.
    In case of IGMPv3, will send a block old sources membership report.

    :param session: Session object created during the join
    :type session: IPerfSession
    :return: iPerf result
    :rtype: IPerfResult
    :raises UseCaseFailure: If the device fails to leave the multicast group.
    """
    dev = session.device

    if not _is_multicast_stream_active(dev, session.address, session.port):
        # Something is wrong, there should be a process ID always.

        err_msg = (
            f"iperf session with port {session.port} "
            f"and {session.address} multicast group does "
            f"not exist on {dev}"
        )

        raise UseCaseFailure(err_msg)

    # kill -15 iperf session
    dev.console.execute_command(f"kill -15 {session.pid}")
    out = dev.console.execute_command(f"cat {session.output_file}")

    # remove the file after reading results
    dev.console.execute_command(f"rm {session.output_file}")
    if not out.strip():
        return IPerfResult(None)

    csv = pd.read_csv(StringIO(out.strip()))
    cols = [
        "timestamp",
        "source_address",
        "source_port",
        "destination_address",
        "destination_port",
        "id",
        "interval",
        "transferred_bytes",
        "bandwidth",
        "jitter",
        "lost",
        "total",
    ]

    results = pd.DataFrame(csv.iloc[:, :-2].values, columns=cols)
    return IPerfResult(results)

parse_mcast_trace

parse_mcast_trace(
    dev: IperfDevice,
    fname: str,
    expected_sequence: list[tuple[str, ...]],
    ip_version: int = 4,
) -> list[tuple[str, ...]]

Compare captured PCAP file against an expected sequence of packets.

This returns a matched subset of the whole packet trace. The sequence of the matched packets must align with expected sequence. The length of the matched sequence is equal to expected sequence.

In case a packet is missing in captured sequence, an empty value is maintained in output at the same index as that of the expected sequence.

IP packets in expected sequence must follow the following order:

- IP source
- IP destination
- MAC source
- MAC destination
- IP protocol number (1 - ICMP, 2 - IGMP, 6 - TCP, 17 - UDP)
- IGMP version (v3 by default)
- IGMP Record Type number (5 - Allow new sources, 6 - Block old sources)
- IGMP Multicast Address (if provided in group records)
- IGMP Source Address (if provided in group records)

IPv6 packets will be parsed and following values are returned in a list:

- IPv6 source
- IPv6 destination
- MAC source
- MAC destination
- IPv6 Next Header (0 - ICMPv6, 6 - TCP, 17 - UDP)
- MLDv2 version (130 - MLDv2 Query, 143 - MLDv2 Report)
- MLDv2 Record Type number (5 - Allow new sources, 6 - Block old sources)
- MLDv2 Multicast Address (if provided in group records)
- MLDv2 Source Address (if provided in group records)

You can use * to mark a field as Any

.. hint:: This Use Case implements statements from the test suite such as: test suite such as:

- Check IGMPv3 report to subscribe to (S,G) from LAN on eRouter
  LAN interface
- Check Multicast traffic from WAN multicast server is received
  on eRouter WAN interface and forwarded to Ethernet LAN client

Parameters:

Name Type Description Default

dev

IperfDevice

Descriptor of iPerf capable device with PCAP file

required

fname

str

name of the PCAP file

required

expected_sequence

list[tuple[str, ...]]

expected sequence to match against captured sequence

required

ip_version

int

IP version, defaults to 4

4

Returns:

Type Description
list[tuple[str, ...]]

matched captured sequence against the expected sequence

Source code in boardfarm3/use_cases/multicast.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def parse_mcast_trace(
    dev: IperfDevice,
    fname: str,
    expected_sequence: list[tuple[str, ...]],
    ip_version: int = 4,
) -> list[tuple[str, ...]]:
    """Compare captured PCAP file against an expected sequence of packets.

    This returns a matched subset of the whole packet trace.
    The sequence of the matched packets must align with expected sequence.
    The length of the matched sequence is equal to expected sequence.

    In case a packet is missing in captured sequence, an empty value is
    maintained in output at the same index as that of the expected sequence.

    IP packets in expected sequence must follow the following order:

        - IP source
        - IP destination
        - MAC source
        - MAC destination
        - IP protocol number (1 - ICMP, 2 - IGMP, 6 - TCP, 17 - UDP)
        - IGMP version (v3 by default)
        - IGMP Record Type number (5 - Allow new sources, 6 - Block old sources)
        - IGMP Multicast Address (if provided in group records)
        - IGMP Source Address (if provided in group records)

    IPv6 packets will be parsed and following values are returned in a list:

        - IPv6 source
        - IPv6 destination
        - MAC source
        - MAC destination
        - IPv6 Next Header (0 - ICMPv6, 6 - TCP, 17 - UDP)
        - MLDv2 version (130 - MLDv2 Query, 143 - MLDv2 Report)
        - MLDv2 Record Type number (5 - Allow new sources, 6 - Block old sources)
        - MLDv2 Multicast Address (if provided in group records)
        - MLDv2 Source Address (if provided in group records)

    You can use * to mark a field as Any

    .. hint:: This Use Case implements statements from the test suite such as:
       test suite such as:

        - Check IGMPv3 report to subscribe to (S,G) from LAN on eRouter
          LAN interface
        - Check Multicast traffic from WAN multicast server is received
          on eRouter WAN interface and forwarded to Ethernet LAN client

    :param dev: Descriptor of iPerf capable device with PCAP file
    :type dev: IperfDevice
    :param fname: name of the PCAP file
    :type fname: str
    :param expected_sequence: expected sequence to match against captured sequence
    :type expected_sequence: list[tuple[str, ...]]
    :param ip_version: IP version, defaults to 4
    :type ip_version: int
    :return: matched captured sequence against the expected sequence
    :rtype: list[tuple[str, ...]]
    """
    return dev.multicast.parse_mcast_trace(fname, expected_sequence, ip_version)

start_iperf_multicast_stream

start_iperf_multicast_stream(
    on_which_device: WAN,
    multicast_group_addr: str,
    port: int,
    time: int,
    bit_rate: float,
) -> IPerfStream

Start an iPerf client sending data on multicast address in background.

Session will have the following parameters by default: - TTL value set to 5

.. hint:: This Use Case implements statements from the test suite such as:

- Start multicast server on WAN network to provide
  the multicast traffic in unreserved multicast group IP
  range 232.0.0.0/8
- Start multicast server on WAN network to provide
  the multicast traffic in unreserved multicast group IP
  range FF38::8000:0/96
- Start multicast stream on a specific Group channel
  by sending IGMPv3 report

Parameters:

Name Type Description Default

on_which_device

WAN

WAN object that runs the multicast stream.

required

multicast_group_addr

str

multicast stream's group IP address

required

port

int

multicast stream's port number

required

time

int

total time the session should run for

required

bit_rate

float

bit_rate of data to be sent (in Mbps)

required

Returns:

Type Description
IPerfStream

object holding data on the iPerf stream.

Raises:

Type Description
UseCaseFailure

if there is a multicat stream.

Source code in boardfarm3/use_cases/multicast.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def start_iperf_multicast_stream(
    on_which_device: WAN,
    multicast_group_addr: str,
    port: int,
    time: int,
    bit_rate: float,
) -> IPerfStream:
    """Start an iPerf client sending data on multicast address in background.

    Session will have the following parameters by default:
        - TTL value set to 5

    .. hint:: This Use Case implements statements from the
       test suite such as:

        - Start multicast server on WAN network to provide
          the multicast traffic in unreserved multicast group IP
          range 232.0.0.0/8
        - Start multicast server on WAN network to provide
          the multicast traffic in unreserved multicast group IP
          range FF38::8000:0/96
        - Start multicast stream on a specific Group channel
          by sending IGMPv3 report

    :param on_which_device: WAN object that runs the multicast stream.
    :type on_which_device: WAN
    :param multicast_group_addr: multicast stream's group IP address
    :type multicast_group_addr: str
    :param port: multicast stream's port number
    :type port: int
    :param time: total time the session should run for
    :type time: int
    :param bit_rate: bit_rate of data to be sent (in Mbps)
    :type bit_rate: float
    :return: object holding data on the iPerf stream.
    :rtype: IPerfStream
    :raises UseCaseFailure: if there is a multicat stream.
    """
    dev = on_which_device
    ipv6_flag = ""

    if isinstance(ip_address(multicast_group_addr), IPv6Address):
        ipv6_flag = "-V"

    # Ensure there is no exisiting stream with same IP and port.
    if _is_multicast_stream_active(dev, multicast_group_addr, port):
        msg = (
            f"{dev} already has an iperf session with "
            f"port {port} and multicast address {multicast_group_addr}"
        )
        raise UseCaseFailure(msg)
    fname = f"mserver_{port}.txt"

    dev.console.execute_command(
        f"iperf {ipv6_flag} -u -f m -c {multicast_group_addr} "
        f"-p {port} --ttl 5 "
        f"-t {time} -b {bit_rate}m > {fname} &"
    )

    pid = dev.console.execute_command(
        f"pgrep iperf -a | grep {port} | awk '{{print$1}}'"
    )
    return IPerfStream(on_which_device, pid, multicast_group_addr, port, fname, time)

tcpdump

tcpdump(
    dev: IperfDevice, fname: str, filters: str | None = "ip multicast"
) -> Generator[str]

Start packet capture using tcpdump and kill the process at the end.

Applies specific filter for multicast traffic only.

.. hint:: This Use Case implements statements from the test suite such as:

- Capturing packets sent from and to eRouter WAN and eRouter LAN interfaces
- Make sure you can capture packets sent from and to eRouter WAN interface
  and Ethernet LAN client

:yield: process ID

Parameters:

Name Type Description Default

dev

IperfDevice

LAN, WAN or WLAN device instance

required

fname

str

name of the pcap file to which the capture will be stored

required

filters

str | None

additional filters for capture, defaults to "ip multicast"

'ip multicast'
Source code in boardfarm3/use_cases/multicast.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@contextmanager
def tcpdump(
    dev: IperfDevice,
    fname: str,
    filters: str | None = "ip multicast",
) -> Generator[str]:
    """Start packet capture using tcpdump and kill the process at the end.

    Applies specific filter for multicast traffic only.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Capturing packets sent from and to eRouter WAN and eRouter LAN interfaces
        - Make sure you can capture packets sent from and to eRouter WAN interface
          and Ethernet LAN client

    :param dev: LAN, WAN or WLAN device instance
    :type dev: IperfDevice
    :param fname: name of the pcap file to which the capture will be stored
    :type fname: str
    :param filters: additional filters for capture, defaults to "ip multicast"
    :type filters: str | None
    :yield: process ID
    :rtype: Generator[str, None, None]
    """
    with dev.tcpdump_capture(fname, dev.iface_dut, filters) as pid:
        yield pid

wait_for_multicast_stream_to_end

wait_for_multicast_stream_to_end(stream_list: list[IPerfStream]) -> None

Wait for all multicast streams to end.

The Use Case will wait for a time equal to the stream with the highest wait time.

If a stream from the list does not exit within the max wait time, then throw an error.

.. hint:: To be used along with the Use Case start_iperf_multicast_stream

Parameters:

Name Type Description Default

stream_list

list[IPerfStream]

List of IPerfStreams

required

Raises:

Type Description
ValueError

if empty list is passed

UseCaseFailure

if a session fails to exit.

Source code in boardfarm3/use_cases/multicast.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def wait_for_multicast_stream_to_end(stream_list: list[IPerfStream]) -> None:
    """Wait for all multicast streams to end.

    The Use Case will wait for a time equal to the stream with
    the highest wait time.

    If a stream from the list does not exit within the
    max wait time, then throw an error.

    .. hint:: To be used along with the Use Case start_iperf_multicast_stream

    :param stream_list: List of IPerfStreams
    :type stream_list: list[IPerfStream]
    :raises ValueError: if empty list is passed
    :raises UseCaseFailure: if a session fails to exit.
    """
    if not stream_list:
        msg = "Cannot pass an empty session list!"
        raise ValueError(msg)

    max_time_to_wait = max(stream.time for stream in stream_list)

    # Should expect all streams to be closed by now
    # This is not asyncio, no high expectations
    sleep(max_time_to_wait)

    failed_sessions = []
    for stream in stream_list:
        # try twice before raising exception.
        dev = stream.device
        for _ in range(2):
            if not dev.console.execute_command(
                f"pgrep iperf -a | grep {stream.port}| grep {stream.address}"
            ):
                break
            sleep(1)
        else:
            dev.console.execute_command(f"kill -9 {stream.pid}")
            failed_sessions.append(
                f"{stream.address}:{stream.port} did not exit within {stream.time}s"
            )
        dev.console.execute_command(f"rm {stream.output_file}")

    if failed_sessions:
        raise UseCaseFailure("Following sessions failed:\n".join(failed_sessions))

networking

Common Networking use cases.

Functions:

Name Description
connect_via_ssh

SSH from a device to another.

create_tcp_session

Create a TCP session from source to destination device on a port.

create_tcp_udp_session

Create both TCP and UDP session from source to destination device on a port.

create_udp_session

Create a UDP session from source to destination device on a port.

delete_arp_table_entry

Delete arp table entry.

disable_ipv6

Disable ipv6 on the specified interface.

dns_lookup

Perform dig command in the devices to resolve DNS.

enable_ipv6

Enable IPv6 on the specified interface.

flush_arp_cache

Flushes arp cache entries.

get_arp_table_info

Fetch arp entries.

get_interface_mac_addr

Get the MAC address of the provided interface.

get_ip6tables_list

Return ip6tables rules as dictionary.

get_ip6tables_policy

Get firewall's ip6tables policy.

get_iptables_list

Return iptables rules as dictionary.

get_iptables_policy

Return iptables policies as dictionary.

get_nslookup_data

Perform nslookup with given arguments and return the parsed results.

hping_flood

Validate SYN, UDP and ICMP flood operation.

http_get

Check if the given HTTP server is running.

is_client_ip_in_pool

Check for client IP in IP pool.

is_icmp_packet_present

Check whether the expected ICMP sequence matches with the captured sequence.

is_ip6table_empty

Return True if ip6tables is empty.

is_iptable_empty

Return True if iptables is empty.

netcat

Run netcat command to initiate brute force.

netstat_all_tcp

Get all TCP ports.

netstat_all_udp

Get all UDP ports.

netstat_listening_ports

Get all listening ports.

nmap_scan

Perform Complete scan on destination via nmap network utility on source device.

nping

Perform nping command and kill process once done.

ping

Ping remote host IP.

start_http_server

Start http server on given client.

trigger_ip_flood

Perform IP flooding via nmap network utility on source device.

verify_tunnel_packets

Verify the expected encapsulated info with the captured sequence.

connect_via_ssh

connect_via_ssh(
    from_which_device: SSHDeviceType,
    to_which_device: SSHDeviceType,
    protocol: int = 4,
    username: str = "root",
    password: str = "bigfoot1",
) -> bool

SSH from a device to another.

This use case validates if SSH is possible from a device to another.

Parameters:

Name Type Description Default

from_which_device

SSHDeviceType

Device initiating the SSH connection

required

to_which_device

SSHDeviceType

Target SSH device

required

protocol

int

IP address family, defaults to 4

4

username

str

SSH username, defaults to root

'root'

password

str

SSH password, defaults to bigfoot1

'bigfoot1'

Returns:

Type Description
bool

True if SSH successful, else False

Raises:

Type Description
ConnectionError

If connectivity exists, but SSH not successful

Source code in boardfarm3/use_cases/networking.py
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
def connect_via_ssh(
    from_which_device: SSHDeviceType,
    to_which_device: SSHDeviceType,
    protocol: int = 4,
    username: str = "root",
    password: str = "bigfoot1",  # noqa: S107
) -> bool:
    """SSH from a device to another.

    This use case validates if SSH is possible from a device to another.

    :param from_which_device: Device initiating the SSH connection
    :type from_which_device: SSHDeviceType
    :param to_which_device: Target SSH device
    :type to_which_device: SSHDeviceType
    :param protocol: IP address family, defaults to 4
    :type protocol: int
    :param username: SSH username, defaults to root
    :type username: str
    :param password: SSH password, defaults to bigfoot1
    :type password: str
    :raises ConnectionError: If connectivity exists, but SSH not successful
    :return: True if SSH successful, else False
    :rtype: bool
    """
    def_protocol = 4
    address = (
        to_which_device.ipv4_addr
        if protocol == def_protocol
        else to_which_device.ipv6_addr
    )
    ssh_command = f"ssh -o StrictHostKeyChecking=no -o ConnectTimeout=5 \
        -o UserKnownHostsFile=/dev/null {username}@{address} exit"
    from_which_device.console.sendline(ssh_command)
    index = from_which_device.console.expect(
        ["assword:", DEFAULT_BASH_SHELL_PROMPT_PATTERN]
    )
    if index == 0:
        from_which_device.console.sendline(f"{password}")
        index = from_which_device.console.expect(
            ["assword:", DEFAULT_BASH_SHELL_PROMPT_PATTERN]
        )
        if index == 0:
            from_which_device.console.sendcontrol("c")
            from_which_device.console.expect(DEFAULT_BASH_SHELL_PROMPT_PATTERN)
            return False
        if from_which_device.console.execute_command("echo $?") != "0":
            msg = "SSH not successful - needs troubleshooting."
            raise ConnectionError(msg)
        return True
    return False

create_tcp_session

create_tcp_session(
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int,
    max_retries: int = 4,
    timeout: int = 30,
) -> dict[str, str]

Create a TCP session from source to destination device on a port.

.. hint:: This Use Case implements statements from the test suite such as:

- Create a TCP session from source to destination device on a port.

Runs nmap network utility on source device.

Parameters:

Name Type Description Default

source_device

LAN | WLAN | WAN

Source device

required

destination_device

LAN | WLAN | WAN | CPE

destination device

required

ip_type

str

type of IP address: "ipv4", "ipv6"

required

port

str | int

port or range of ports: "666-999"

required

max_retries

int

maximum number retries for nmap

4

timeout

int

pexpect timeout for the command in seconds, defaults to 30

30

Returns:

Type Description
dict[str,str]

XML output of the nmap command in form of dictionary

Source code in boardfarm3/use_cases/networking.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def create_tcp_session(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int,
    max_retries: int = 4,
    timeout: int = 30,
) -> dict[str, str]:
    """Create a TCP session from source to destination device on a port.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Create a TCP session from source to destination device on a port.

    Runs nmap network utility on source device.

    :param source_device: Source device
    :type source_device: LAN | WLAN | WAN
    :param destination_device: destination device
    :type destination_device: LAN | WLAN | WAN | CPE
    :param ip_type: type of IP address: "ipv4", "ipv6"
    :type ip_type: str
    :param port: port or range of ports: "666-999"
    :type port: str | int
    :param max_retries: maximum number retries for nmap
    :type max_retries: int
    :param timeout: pexpect timeout for the command in seconds, defaults to 30
    :type timeout: int
    :return: XML output of the nmap command in form of dictionary
    :rtype: dict[str,str]
    """
    return nmap(
        source_device,
        destination_device,
        ip_type,
        port,
        "-sT",
        max_retries,
        timeout=timeout,
    )

create_tcp_udp_session

create_tcp_udp_session(
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int,
    max_retries: int = 4,
    timeout: int = 30,
) -> dict[str, str]

Create both TCP and UDP session from source to destination device on a port.

.. hint:: This Use Case implements statements from the test suite such as:

- Run nmap from client to erouter WAN IP.

Runs nmap network utility on source device.

Parameters:

Name Type Description Default

source_device

LAN | WLAN | WAN

source device

required

destination_device

LAN | WLAN | WAN | CPE

destination device

required

ip_type

str

type of IP address: "ipv4", "ipv6"

required

port

str | int

port or range of ports: "666-999"

required

max_retries

int

maximum number retries for nmap

4

timeout

int

pexpect timeout for the command in seconds, defaults to 30

30

Returns:

Type Description
dict[str,str]

XML output of the nmap command in form of dictionary

Source code in boardfarm3/use_cases/networking.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def create_tcp_udp_session(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int,
    max_retries: int = 4,
    timeout: int = 30,
) -> dict[str, str]:
    """Create both TCP and UDP session from source to destination device on a port.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Run nmap from client to erouter WAN IP.

    Runs nmap network utility on source device.

    :param source_device: source device
    :type source_device: LAN | WLAN | WA
    :param destination_device: destination device
    :type destination_device: LAN | WLAN | WAN | CPE
    :param ip_type: type of IP address: "ipv4", "ipv6"
    :type ip_type: str
    :param port: port or range of ports: "666-999"
    :type port: str | int
    :param max_retries: maximum number retries for nmap
    :type max_retries: int
    :param timeout: pexpect timeout for the command in seconds, defaults to 30
    :type timeout: int
    :return: XML output of the nmap command in form of dictionary
    :rtype: dict[str,str]
    """
    return nmap(
        source_device,
        destination_device,
        ip_type,
        port,
        "-sU -sT",
        max_retries,
        timeout=timeout,
    )

create_udp_session

create_udp_session(
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int,
    max_retries: int,
    timeout: int = 30,
) -> dict[str, str]

Create a UDP session from source to destination device on a port.

.. hint:: This Use Case implements statements from the test suite such as:

- Create a UDP session from source to destination device on a port.

Runs nmap network utility on source device.

Parameters:

Name Type Description Default

source_device

LAN | WLAN | WAN

Source device

required

destination_device

LAN | WLAN | WAN | CPE

Destination device

required

ip_type

str

type of ipaddress: "ipv4", "ipv6"

required

port

str | int

port or range of ports: "666-999"

required

max_retries

int

maximum number retries for nmap

required

timeout

int

pexpect timeout for the command in seconds, defaults to 30

30

Returns:

Type Description
dict[str,str]

XML output of the nmap command in form of dictionary

Source code in boardfarm3/use_cases/networking.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def create_udp_session(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int,
    max_retries: int,
    timeout: int = 30,
) -> dict[str, str]:
    """Create a UDP session from source to destination device on a port.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Create a UDP session from source to destination device on a port.

    Runs nmap network utility on source device.

    :param source_device: Source device
    :type source_device: LAN | WLAN | WAN
    :param destination_device: Destination device
    :type destination_device: LAN | WLAN | WAN | CPE
    :param ip_type: type of ipaddress: "ipv4", "ipv6"
    :type ip_type: str
    :param port: port or range of ports: "666-999"
    :type port: str | int
    :param max_retries: maximum number retries for nmap
    :type max_retries: int
    :param timeout: pexpect timeout for the command in seconds, defaults to 30
    :type timeout: int
    :return: XML output of the nmap command in form of dictionary
    :rtype: dict[str,str]
    """
    return nmap(
        source_device,
        destination_device,
        ip_type,
        port,
        "-sU",
        max_retries,
        timeout=timeout,
    )

delete_arp_table_entry

delete_arp_table_entry(device: LAN, ip: str, intf: str) -> None

Delete arp table entry.

.. hint:: This Use Case implements statements from the test suite such as:

Delete ARP table entry

Parameters:

Name Type Description Default

device

LAN

device on which cache to be cleared

required

ip

str

ip of the host entry to be deleted

required

intf

str

interface on which the host needs to be deleted

required
Source code in boardfarm3/use_cases/networking.py
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
def delete_arp_table_entry(device: LAN, ip: str, intf: str) -> None:
    """Delete arp table entry.

    .. hint:: This Use Case implements statements from the test suite such as:

        Delete ARP table entry

    :param device: device on which cache to be cleared
    :type device: LAN
    :param ip: ip of the host entry to be deleted
    :type ip: str
    :param intf: interface on which the host needs to be deleted
    :type intf: str
    """
    device.delete_arp_table_entry(ip, intf)

disable_ipv6

disable_ipv6(device: LAN | WLAN) -> None

Disable ipv6 on the specified interface.

The use case executes the following commands: - sysctl net.ipv6.conf..disable_ipv6=1

.. hint:: This Use Case implements statements from the test suite such as:

- Disable IPv6 on the specified interface.

Parameters:

Name Type Description Default

device

LAN | WLAN

LAN or WLAN device object

required
Source code in boardfarm3/use_cases/networking.py
494
495
496
497
498
499
500
501
502
503
504
505
506
507
def disable_ipv6(device: LAN | WLAN) -> None:
    """Disable ipv6 on the specified interface.

    The use case executes the following commands:
        - sysctl net.ipv6.conf.<interface>.disable_ipv6=1

    .. hint:: This Use Case implements statements from the test suite such as:

        - Disable IPv6 on the specified interface.

    :param device: LAN or WLAN device object
    :type device: LAN | WLAN
    """
    device.disable_ipv6()

dns_lookup

dns_lookup(
    host: LAN | WAN, domain_name: str, ipv6: bool = False, opts: str = ""
) -> list[dict[str, Any]]

Perform dig command in the devices to resolve DNS.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify that IPv4 domain name can be resolved to IP.
- Verify the DNS IPv4 address assigned by the service provider

Parameters:

Name Type Description Default

host

LAN | WAN

host where the dig command has to be run

required

domain_name

str

domain name which needs lookup

required

ipv6

bool

flag to perform IPv4 or IPv6 lookup, defaults to False

False

opts

str

options to be provided to dig command, defaults to ""

''

Returns:

Type Description
List[Dict[str, Any]]

returns dig output from jc parser

Raises:

Type Description
UseCaseFailure

when domain_name cannot resolve

Source code in boardfarm3/use_cases/networking.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def dns_lookup(
    host: LAN | WAN, domain_name: str, ipv6: bool = False, opts: str = ""
) -> list[dict[str, Any]]:
    """Perform ``dig`` command in the devices to resolve DNS.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify that IPv4 domain name can be resolved to IP.
        - Verify the DNS IPv4 address assigned by the service provider

    :param host: host where the dig command has to be run
    :type host: LAN | WAN
    :param domain_name: domain name which needs lookup
    :type domain_name: str
    :param ipv6: flag to perform IPv4 or IPv6 lookup, defaults to False
    :type ipv6: bool
    :param opts: options to be provided to dig command, defaults to ""
    :type opts: str
    :return: returns dig output from jc parser
    :rtype: List[Dict[str, Any]]
    :raises UseCaseFailure: when domain_name cannot resolve
    """
    record_type = "AAAA" if ipv6 else "A"
    result = host.dns_lookup(domain_name, record_type, opts)
    if result:
        return result
    msg = f"Failed to resolve {domain_name}"
    raise UseCaseFailure(msg)

enable_ipv6

enable_ipv6(device: LAN | WLAN) -> None

Enable IPv6 on the specified interface.

The Use Case executes the following commands: - sysctl net.ipv6.conf..disable_ipv6=0 - sysctl net.ipv6.conf..accept_ra=2

.. hint:: This Use Case implements statements from the test suite such as:

- Enable IPv6 on the specified interface.

Parameters:

Name Type Description Default

device

LAN | WLAN

LAN or WLAN device object

required
Source code in boardfarm3/use_cases/networking.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def enable_ipv6(device: LAN | WLAN) -> None:
    """Enable IPv6 on the specified interface.

    The Use Case executes the following commands:
        - sysctl net.ipv6.conf.<interface>.disable_ipv6=0
        - sysctl net.ipv6.conf.<interface>.accept_ra=2

    .. hint:: This Use Case implements statements from the test suite such as:

        - Enable IPv6 on the specified interface.

    :param device: LAN or WLAN device object
    :type device: LAN | WLAN
    """
    device.enable_ipv6()

flush_arp_cache

flush_arp_cache(device: LAN) -> None

Flushes arp cache entries.

.. hint:: This Use Case implements statements from the test suite such as:

Generate ARP Request for...
  • ping can be used post clearing cache and ARP pakcets can be ovserved in pkt capture

Parameters:

Name Type Description Default

device

LAN

device on which cache to be cleared

required
Source code in boardfarm3/use_cases/networking.py
981
982
983
984
985
986
987
988
989
990
991
992
993
994
def flush_arp_cache(device: LAN) -> None:
    """Flushes arp cache entries.

    .. hint:: This Use Case implements statements from the test suite such as:

        Generate ARP Request for...

    - ping can be used post clearing cache and ARP pakcets can be ovserved in
        pkt capture

    :param device: device on which cache to be cleared
    :type device: LAN
    """
    device.flush_arp_cache()

get_arp_table_info

get_arp_table_info(device: LAN) -> list[dict[str, str]]

Fetch arp entries.

.. hint:: This Use Case implements statements from the test suite such as:

Show ARP table / Get ARP entries

Parameters:

Name Type Description Default

device

LAN

device on which cache to be cleared

required

Returns:

Type Description
list[dict[str, str]]

list of parsed ARP table entries

Source code in boardfarm3/use_cases/networking.py
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
def get_arp_table_info(device: LAN) -> list[dict[str, str]]:
    """Fetch arp entries.

    .. hint:: This Use Case implements statements from the test suite such as:

        Show ARP table / Get ARP entries

    :param device: device on which cache to be cleared
    :type device: LAN
    :return: list of parsed ARP table entries
    :rtype: list[dict[str, str]]
    """
    arp_table: list[dict[str, str]] = []
    out = device.get_arp_table()
    arp_regex = re.compile(
        r"(?P<address>\d+.\d+.\d+.\d+)\s+(?P<hw_type>\S+)\s+"
        r"(?P<hw_address>\S+)\s+(?P<flags_mask>\S+)\s+(?P<iface>\S+)"
    )
    for entry in out.splitlines()[1:]:
        arp_entry = re.search(arp_regex, entry)
        if arp_entry:
            arp_table.append(arp_entry.groupdict())
    return arp_table

get_interface_mac_addr

get_interface_mac_addr(device: LAN | WLAN | WAN | CPE, interface: str) -> str

Get the MAC address of the provided interface.

.. hint:: This Use Case implements statements from the test suite such as:

- Get the mac address of the provided interface.

Parameters:

Name Type Description Default

device

LAN | WLAN | WAN | CPE

device having the interface

required

interface

str

interface name

required

Returns:

Type Description
str

MAC address of the provided interface

Source code in boardfarm3/use_cases/networking.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
def get_interface_mac_addr(device: LAN | WLAN | WAN | CPE, interface: str) -> str:
    """Get the MAC address of the provided interface.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Get the mac address of the provided interface.

    :param device: device having the interface
    :type device: LAN | WLAN | WAN | CPE
    :param interface: interface name
    :type interface: str
    :return: MAC address of the provided interface
    :rtype: str
    """
    return (
        device.sw.get_interface_mac_addr(interface)
        if isinstance(device, CPE)
        else device.get_interface_macaddr(interface)
    )

get_ip6tables_list

get_ip6tables_list(
    device: DeviceWithFwType, opts: str = "", extra_opts: str = "-nvL --line-number"
) -> dict[str, list[dict]]

Return ip6tables rules as dictionary.

Parameters:

Name Type Description Default

device

DeviceWithFwType

type of the device

required

opts

str

_command line arguments for ip6tables command, defaults to ""

''

extra_opts

str

extra command line arguments for ip6tables command, defaults to -nvL --line-number

'-nvL --line-number'

Returns:

Type Description
dict[str, list[dict]]

ip6tables rules dictionary

Source code in boardfarm3/use_cases/networking.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
def get_ip6tables_list(
    device: DeviceWithFwType,
    opts: str = "",
    extra_opts: str = "-nvL --line-number",
) -> dict[str, list[dict]]:
    """Return ip6tables rules as dictionary.

    :param device: type of the device
    :type device: DeviceWithFwType
    :param opts: _command line arguments for ip6tables command, defaults to ""
    :type opts: str
    :param extra_opts: extra command line arguments for ip6tables command,
        defaults to -nvL --line-number
    :type extra_opts: str
    :return: ip6tables rules dictionary
    :rtype: dict[str, list[dict]]
    """
    return (
        device.firewall.get_ip6tables_list(opts, extra_opts)
        if not isinstance(device, CPE)
        else device.sw.firewall.get_ip6tables_list(opts, extra_opts)
    )

get_ip6tables_policy

get_ip6tables_policy(
    device: DeviceWithFwType, opts: str = "", extra_opts: str = "-nvL --line-number"
) -> dict[str, str]

Get firewall's ip6tables policy.

.. hint:: This Use Case implements statements from the test suite such as:

- Get the Firewall's ip6tables policy

Parameters:

Name Type Description Default

device

DeviceWithFwType

device instance

required

opts

str

options for ip6tables command, defaults to ""

''

extra_opts

str

options for ip6tables command, defaults to "-nvL --line-number"

'-nvL --line-number'

Returns:

Type Description
dict[str, str]

dict of ip6tables policy

Source code in boardfarm3/use_cases/networking.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
def get_ip6tables_policy(
    device: DeviceWithFwType,
    opts: str = "",
    extra_opts: str = "-nvL --line-number",
) -> dict[str, str]:
    """Get firewall's ip6tables policy.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Get the Firewall's ip6tables policy

    :param device: device instance
    :type device: DeviceWithFwType
    :param opts: options for ip6tables command, defaults to ""
    :type opts: str
    :param extra_opts: options for ip6tables command, defaults to "-nvL --line-number"
    :type extra_opts: str
    :return: dict of ip6tables policy
    :rtype: dict[str, str]
    """
    return (
        device.firewall.get_ip6tables_policy(opts, extra_opts)
        if not isinstance(device, CPE)
        else device.sw.firewall.get_ip6tables_policy(opts, extra_opts)
    )

get_iptables_list

get_iptables_list(
    device: DeviceWithFwType, opts: str = "", extra_opts: str = "-nvL --line-number"
) -> dict[str, list[dict]]

Return iptables rules as dictionary.

Parameters:

Name Type Description Default

device

DeviceWithFwType

type of the device

required

opts

str

_command line arguments for iptables command, defaults to ""

''

extra_opts

str

extra command line arguments for iptables command, defaults to -nvL --line-number

'-nvL --line-number'

Returns:

Type Description
dict[str, list[dict]]

iptables rules dictionary

Source code in boardfarm3/use_cases/networking.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
def get_iptables_list(
    device: DeviceWithFwType,
    opts: str = "",
    extra_opts: str = "-nvL --line-number",
) -> dict[str, list[dict]]:
    """Return iptables rules as dictionary.

    :param device: type of the device
    :type device: DeviceWithFwType
    :param opts: _command line arguments for iptables command, defaults to ""
    :type opts: str
    :param extra_opts: extra command line arguments for iptables command,
        defaults to -nvL --line-number
    :type extra_opts: str
    :return: iptables rules dictionary
    :rtype: dict[str, list[dict]]
    """
    return (
        device.firewall.get_iptables_list(opts, extra_opts)
        if not isinstance(device, CPE)
        else device.sw.firewall.get_iptables_list(opts, extra_opts)
    )

get_iptables_policy

get_iptables_policy(
    device: DeviceWithFwType, opts: str = "", extra_opts: str = "-nvL --line-number"
) -> dict[str, str]

Return iptables policies as dictionary.

Parameters:

Name Type Description Default

device

DeviceWithFwType

type of the device

required

opts

str

command line arguments for iptables command, defaults to ""

''

extra_opts

str

extra command line arguments for iptables command, defaults to "-nvL --line-number"

'-nvL --line-number'

Returns:

Type Description
dict[str, str]

iptables policies dictionary

Source code in boardfarm3/use_cases/networking.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
def get_iptables_policy(
    device: DeviceWithFwType,
    opts: str = "",
    extra_opts: str = "-nvL --line-number",
) -> dict[str, str]:
    """Return iptables policies as dictionary.

    :param device: type of the device
    :type device: DeviceWithFwType
    :param opts: command line arguments for iptables command, defaults to ""
    :type opts: str
    :param extra_opts: extra command line arguments for iptables command,
        defaults to "-nvL --line-number"
    :type extra_opts: str
    :return: iptables policies dictionary
    :rtype: dict[str, str]
    """
    return (
        device.firewall.get_iptables_policy(opts, extra_opts)
        if not isinstance(device, CPE)
        else device.sw.firewall.get_iptables_policy(opts, extra_opts)
    )

get_nslookup_data

get_nslookup_data(
    device: LAN | WAN, domain_name: str, opts: str = "", extra_opts: str = ""
) -> dict[str, Any]

Perform nslookup with given arguments and return the parsed results.

to get A records, pass -q=A in opts

to get AAAA records, pass -q=AAAA in opts

to just get the DNS records, opts and extra opts are not needed

Parameters:

Name Type Description Default

device

LAN | WAN

type of the device

required

domain_name

str

domain name to perform nslookup on

required

opts

str

nslookup command line options, defaults to ""

''

extra_opts

str

nslookup additional command line options, defaults to ""

''

Returns:

Type Description
dict[str, Any]

parsed nslookup results as dictionary

Source code in boardfarm3/use_cases/networking.py
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
def get_nslookup_data(
    device: LAN | WAN,
    domain_name: str,
    opts: str = "",
    extra_opts: str = "",
) -> dict[str, Any]:
    """Perform nslookup with given arguments and return the parsed results.

    to get A records, pass -q=A in opts

    to get AAAA records, pass -q=AAAA in opts

    to just get the DNS records, opts and extra opts are not needed

    :param device: type of the device
    :type device: Union[LAN, WAN]
    :param domain_name: domain name to perform nslookup on
    :type domain_name: str
    :param opts: nslookup command line options, defaults to ""
    :type opts: str
    :param extra_opts: nslookup additional command line options, defaults to ""
    :type extra_opts: str
    :return: parsed nslookup results as dictionary
    :rtype: dict[str, Any]
    """
    return device.nslookup.nslookup(domain_name, opts, extra_opts)

hping_flood

hping_flood(
    device: LAN | WAN,
    protocol: str,
    target: str,
    packet_count: str,
    extra_args: str | None = None,
    pkt_interval: str = "",
) -> str

Validate SYN, UDP and ICMP flood operation.

.. hint:: This Use Case implements statements from the test suite such as:

- To validate SYN flood, ICMP flood (from WAN & LAN) and UDP flood (from WAN)

Parameters:

Name Type Description Default

device

LAN | WAN

object of the device class where tcpdump is captured

required

protocol

str

mode, for ex 'S': syn-flood '1': ping-flood (icmp) '2': udp

required

target

str

target IP addr

required

packet_count

str

number of packets to be transmitted.

required

extra_args

str | None

extra arguments to be passed, defaults to None

None

pkt_interval

str

wait for X microseconds before sending next packet uX, defaults to "", uX for X microseconds, for example -i u1000

''

Returns:

Type Description
str

command output

Source code in boardfarm3/use_cases/networking.py
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
def hping_flood(  # noqa: PLR0913
    device: LAN | WAN,
    protocol: str,
    target: str,
    packet_count: str,
    extra_args: str | None = None,
    pkt_interval: str = "",
) -> str:
    """Validate SYN, UDP and ICMP flood operation.

    .. hint:: This Use Case implements statements from the test suite such as:

        - To validate SYN flood, ICMP flood (from WAN & LAN) and UDP flood (from WAN)

    :param device: object of the device class where tcpdump is captured
    :type device: LAN | WAN
    :param protocol: mode, for ex 'S': syn-flood '1': ping-flood (icmp) '2': udp
    :type protocol: str
    :param target: target IP addr
    :type target: str
    :param packet_count: number of packets to be transmitted.
    :type packet_count: str
    :param extra_args: extra arguments to be passed, defaults to None
    :type extra_args: str
    :param pkt_interval: wait for X microseconds before sending next packet uX,
        defaults to "", uX for X microseconds, for example -i u1000
    :type pkt_interval: str
    :return: command output
    :rtype: str
    """
    return device.hping_flood(
        protocol=protocol,
        target=target,
        packet_count=packet_count,
        extra_args=extra_args,
        pkt_interval=pkt_interval,
    )

http_get

http_get(
    device: LAN | WAN,
    url: str,
    timeout: int = 20,
    no_proxy: bool = False,
    is_insecure: bool = False,
    follow_redirects: bool = False,
) -> HTTPResult

Check if the given HTTP server is running.

This Use Case executes a curl command with a given timeout from the given client. The destination is specified by the URL parameter

.. hint:: This Use Case implements statements from the test suite such as:

- Verify HTTP server is accessible from [] via eRouter IP
- Verify that the HTTP server running on the client is accessible
- Try to connect to the HTTP server from [] client

Parameters:

Name Type Description Default

device

LAN | WAN

the device from where HTTP response to get

required

url

str

URL to get the response

required

timeout

int

connection timeout for the curl command in seconds, default 20

20

no_proxy

bool

no_proxy option for curl command, defaults to False

False

is_insecure

bool

is_insecure option for curl command, defaults to False

False

follow_redirects

bool

follow_redirects option for curl command, defaults to False

False

Returns:

Type Description
HTTPResult

parsed HTTP Get response

Source code in boardfarm3/use_cases/networking.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def http_get(  # noqa: PLR0913
    device: LAN | WAN,
    url: str,
    timeout: int = 20,
    no_proxy: bool = False,
    is_insecure: bool = False,
    follow_redirects: bool = False,
) -> HTTPResult:
    """Check if the given HTTP server is running.

    This Use Case executes a curl command with a given timeout from the given
    client. The destination is specified by the URL parameter

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify HTTP server is accessible from [] via eRouter IP
        - Verify that the HTTP server running on the client is accessible
        - Try to connect to the HTTP server from [] client

    :param device: the device from where HTTP response to get
    :type device: LAN | WAN
    :param url: URL to get the response
    :type url: str
    :param timeout: connection timeout for the curl command in seconds, default 20
    :type timeout: int
    :param no_proxy: no_proxy option for curl command, defaults to False
    :type no_proxy: bool
    :param is_insecure: is_insecure option for curl command, defaults to False
    :type is_insecure: bool
    :param follow_redirects: follow_redirects option for curl command, defaults to False
    :type follow_redirects: bool
    :return: parsed HTTP Get response
    :rtype: HTTPResult
    """
    options = ""
    if no_proxy:
        options += "--noproxy '*' "
    if is_insecure:
        options += "-k "
    if follow_redirects:
        options += "-L "
    return device.http_get(url, timeout, options)

is_client_ip_in_pool

is_client_ip_in_pool(
    pool_bounds: tuple[IPv4Address, IPv4Address], client: LAN | WAN
) -> bool

Check for client IP in IP pool.

.. hint:: This Use Case implements statements from the test suite such as:

- Configure the LAN client with Static IP from the higher range of the subnet
  defined in the config file and Default gateway is set to the lowest IP
  (eRouter LAN interface) address of subnet

Parameters:

Name Type Description Default

pool_bounds

tuple[IPv4Address, IPv4Address]

lowest and highest IP from DHCP pool

required

client

LAN | WAN

client to be checked

required

Returns:

Type Description
bool

True if LAN/WIFILAN IP is lowest in pool range

Source code in boardfarm3/use_cases/networking.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def is_client_ip_in_pool(
    pool_bounds: tuple[ipaddress.IPv4Address, ipaddress.IPv4Address],
    client: LAN | WAN,
) -> bool:
    """Check for client IP in IP pool.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Configure the LAN client with Static IP from the higher range of the subnet
          defined in the config file and Default gateway is set to the lowest IP
          (eRouter LAN interface) address of subnet

    :param pool_bounds: lowest and highest IP from DHCP pool
    :type pool_bounds: tuple[ipaddress.IPv4Address, ipaddress.IPv4Address]
    :param client: client to be checked
    :type client: LAN | WAN
    :return: True if LAN/WIFILAN IP is lowest in pool range
    :rtype: bool
    """
    lan_ip_address = ipaddress.IPv4Address(
        client.get_interface_ipv4addr(client.iface_dut),
    )
    ip_range = ip_pool_to_list(*pool_bounds)
    return lan_ip_address in ip_range

is_icmp_packet_present

is_icmp_packet_present(
    captured_sequence: list[ICMPPacketData], expected_sequence: list[ICMPPacketData]
) -> bool

Check whether the expected ICMP sequence matches with the captured sequence.

Parameters:

Name Type Description Default

captured_sequence

list[ICMPPacketData]

Sequence of ICMP packets filtered from captured pcap file

required

expected_sequence

list[ICMPPacketData]

Example for IPv4 source and destination and query_code as 8 (Echo Request) .. code-block:: python [ ICMPPacketData( IPAddresses(IPv4Address("172.25.1.109"), None, None), IPAddresses(IPv4Address("192.168.178.22"), None, None), 8, ), ] .. hint:: This Use Case implements statements from the test suite such as: - Check whether the expected ICMP sequence matches with the captured sequence.

required

Returns:

Type Description
bool

True if ICMP expected sequences matches with the captured sequence

Source code in boardfarm3/use_cases/networking.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def is_icmp_packet_present(
    captured_sequence: list[ICMPPacketData],
    expected_sequence: list[ICMPPacketData],
) -> bool:
    """Check whether the expected ICMP sequence matches with the captured sequence.

    :param captured_sequence: Sequence of ICMP packets filtered from captured pcap file
    :type captured_sequence: List[ICMPPacketData]
    :param expected_sequence: Example for IPv4 source and destination and ``query_code``
        as 8 (Echo Request)

            .. code-block:: python

                [
                    ICMPPacketData(
                        IPAddresses(IPv4Address("172.25.1.109"), None, None),
                        IPAddresses(IPv4Address("192.168.178.22"), None, None),
                        8,
                    ),
                ]

    .. hint:: This Use Case implements statements from the test suite such as:

        - Check whether the expected ICMP sequence matches with the captured sequence.

    :type expected_sequence: List[ICMPPacketData]
    :return: True if ICMP expected sequences matches with the captured sequence
    :rtype: bool
    """
    last_check = 0
    final_result = []
    for icmp_packet_expected in expected_sequence:
        for i in range(last_check, len(captured_sequence)):
            if captured_sequence[i] == icmp_packet_expected:
                last_check = i
                _LOGGER.info(
                    colored(
                        f"Verified ICMP packet:\t{icmp_packet_expected.source}\t"
                        f"-->>\t{icmp_packet_expected.destination}\tType:"
                        f" {icmp_packet_expected.query_code}",
                        color="green",
                    ),
                )
                final_result.append(True)
                break
        else:
            _LOGGER.info(
                colored(
                    "Couldn't verify ICMP packet:\t"
                    f"{icmp_packet_expected.source}\t-->>\t"
                    f"{icmp_packet_expected.destination}\tType:"
                    f" {icmp_packet_expected.query_code}",
                    color="red",
                ),
            )
            final_result.append(False)
    return all(final_result)

is_ip6table_empty

is_ip6table_empty(
    device: DeviceWithFwType, opts: str = "", extra_opts: str = "-nvL --line-number"
) -> bool

Return True if ip6tables is empty.

Parameters:

Name Type Description Default

device

DeviceWithFwType

type of the device

required

opts

str

command line arguments for ip6tables command, defaults to ""

''

extra_opts

str

extra command line arguments for ip6tables command, defaults to ""

'-nvL --line-number'

Returns:

Type Description
bool

True if ip6tables is empty, False otherwise

Source code in boardfarm3/use_cases/networking.py
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
def is_ip6table_empty(
    device: DeviceWithFwType,
    opts: str = "",
    extra_opts: str = "-nvL --line-number",
) -> bool:
    """Return True if ip6tables is empty.

    :param device: type of the device
    :type device: DeviceWithFwType
    :param opts: command line arguments for ip6tables command, defaults to ""
    :type opts: str
    :param extra_opts: extra command line arguments for ip6tables command,
        defaults to ""
    :type extra_opts: str
    :return: True if ip6tables is empty, False otherwise
    :rtype: bool
    """
    return (
        device.firewall.is_ip6table_empty(opts, extra_opts)
        if not isinstance(device, CPE)
        else device.sw.firewall.is_ip6table_empty(opts, extra_opts)
    )

is_iptable_empty

is_iptable_empty(
    device: DeviceWithFwType, opts: str = "", extra_opts: str = "-nvL --line-number"
) -> bool

Return True if iptables is empty.

Parameters:

Name Type Description Default

device

DeviceWithFwType

type of the device

required

opts

str

command line arguments for iptables command, defaults to ""

''

extra_opts

str

extra command line arguments for iptables command, defaults to ""

'-nvL --line-number'

Returns:

Type Description
bool

True if iptables is empty, False otherwise

Source code in boardfarm3/use_cases/networking.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
def is_iptable_empty(
    device: DeviceWithFwType,
    opts: str = "",
    extra_opts: str = "-nvL --line-number",
) -> bool:
    """Return True if iptables is empty.

    :param device: type of the device
    :type device: DeviceWithFwType
    :param opts: command line arguments for iptables command, defaults to ""
    :type opts: str
    :param extra_opts: extra command line arguments for iptables command, defaults to ""
    :type extra_opts: str
    :return: True if iptables is empty, False otherwise
    :rtype: bool
    """
    return (
        device.firewall.is_iptable_empty(opts, extra_opts)
        if not isinstance(device, CPE)
        else device.sw.firewall.is_iptable_empty(opts, extra_opts)
    )

netcat

netcat(device: LAN, host_ip: str, port: str, additional_args: str) -> None

Run netcat command to initiate brute force.

Parameters:

Name Type Description Default

device

LAN

lan device

required

host_ip

str

host ip address

required

port

str

port number of the host

required

additional_args

str

additional args to be provided in netcat command

required
Source code in boardfarm3/use_cases/networking.py
705
706
707
708
709
710
711
712
713
714
715
716
717
def netcat(device: LAN, host_ip: str, port: str, additional_args: str) -> None:
    """Run netcat command to initiate brute force.

    :param device: lan device
    :type device: LAN
    :param host_ip: host ip address
    :type host_ip: str
    :param port: port number of the host
    :type port: str
    :param additional_args: additional args to be provided in netcat command
    :type additional_args: str
    """
    device.netcat(host_ip, port, additional_args)

netstat_all_tcp

netstat_all_tcp(
    device: CPE | LAN | WAN, opts: str = "-at", extra_opts: str = ""
) -> DataFrame

Get all TCP ports.

Parameters:

Name Type Description Default

device

CPE | LAN | WAN

type of the device

required

opts

str

command line options

'-at'

extra_opts

str

extra command line options

''

Returns:

Type Description
DataFrame

parsed netstat output

Source code in boardfarm3/use_cases/networking.py
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
def netstat_all_tcp(
    device: CPE | LAN | WAN,
    opts: str = "-at",
    extra_opts: str = "",
) -> DataFrame:
    """Get all TCP ports.

    :param device: type of the device
    :type device: CPE | LAN | WAN
    :param opts: command line options
    :type opts: str
    :param extra_opts: extra command line options
    :type extra_opts: str
    :return: parsed netstat output
    :rtype: DataFrame
    """
    device_nw = __get_dev_s_network_utility(device)
    return device_nw.netstat(opts, extra_opts)

netstat_all_udp

netstat_all_udp(
    device: CPE | LAN | WAN, opts: str = "-au", extra_opts: str = ""
) -> DataFrame

Get all UDP ports.

Parameters:

Name Type Description Default

device

CPE | LAN | WAN

type of the device

required

opts

str

command line options

'-au'

extra_opts

str

extra command line options

''

Returns:

Type Description
DataFrame

parsed netstat output

Source code in boardfarm3/use_cases/networking.py
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
def netstat_all_udp(
    device: CPE | LAN | WAN,
    opts: str = "-au",
    extra_opts: str = "",
) -> DataFrame:
    """Get all UDP ports.

    :param device: type of the device
    :type device: CPE | LAN | WAN
    :param opts: command line options
    :type opts: str
    :param extra_opts: extra command line options
    :type extra_opts: str
    :return: parsed netstat output
    :rtype: DataFrame
    """
    device_nw = __get_dev_s_network_utility(device)
    return device_nw.netstat(opts, extra_opts)

netstat_listening_ports

netstat_listening_ports(
    device: CPE | LAN | WAN, opts: str = "-nlp", extra_opts: str = ""
) -> DataFrame

Get all listening ports.

Parameters:

Name Type Description Default

device

CPE | LAN | WAN

type of the device

required

opts

str

command line options

'-nlp'

extra_opts

str

extra command line options

''

Returns:

Type Description
DataFrame

parsed netstat output

Source code in boardfarm3/use_cases/networking.py
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
def netstat_listening_ports(
    device: CPE | LAN | WAN,
    opts: str = "-nlp",
    extra_opts: str = "",
) -> DataFrame:
    """Get all listening ports.

    :param device: type of the device
    :type device: CPE | LAN | WAN
    :param opts: command line options
    :type opts: str
    :param extra_opts: extra command line options
    :type extra_opts: str
    :return: parsed netstat output
    :rtype: DataFrame
    """
    device_nw = __get_dev_s_network_utility(device)
    return device_nw.netstat(opts, extra_opts)

nmap_scan

nmap_scan(
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    timeout: int = 30,
) -> dict[str, str]

Perform Complete scan on destination via nmap network utility on source device.

.. hint:: This Use Case implements statements from the test suite such as:

- Perform Complete scan on destination via nmap network utility on source device

Parameters:

Name Type Description Default

source_device

LAN | WLAN | WAN

source device

required

destination_device

LAN | WLAN | WAN | CPE

destination device

required

ip_type

str

type of IP address: "ipv4", "ipv6"

required

timeout

int

pexpect timeout for the command in seconds, defaults to 30

30

Returns:

Type Description
dict[str,str]

XML output of the nmap command in form of dictionary

Source code in boardfarm3/use_cases/networking.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def nmap_scan(
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    timeout: int = 30,
) -> dict[str, str]:
    """Perform Complete scan on destination via nmap network utility on source device.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Perform Complete scan on destination via nmap network utility on source device

    :param source_device: source device
    :type source_device: LAN | WLAN | WAN
    :param destination_device: destination device
    :type destination_device: LAN | WLAN | WAN | CPE
    :param ip_type: type of IP address: "ipv4", "ipv6"
    :type ip_type: str
    :param timeout: pexpect timeout for the command in seconds, defaults to 30
    :type timeout: int
    :return: XML output of the nmap command in form of dictionary
    :rtype: dict[str,str]
    """
    return nmap(source_device, destination_device, ip_type, opts="-F", timeout=timeout)

nping

nping(
    device: LAN,
    interface_ip: str,
    ipv6: bool = False,
    extra_args: str = "",
    port_range: str = "0-65535",
    hit_count: str = "2",
    rate: str = "200",
    mode: str = "udp",
) -> Generator[str]

Perform nping command and kill process once done.

.. hint:: This Use Case implements statements from the test suite such as:

- Execute the command from the connected LAN Client to do nping
  on [] side network

:yield: process id

Parameters:

Name Type Description Default

device

LAN

connected client to perform nping command from.

required

interface_ip

str

interface ip addr

required

ipv6

bool

if ipv6 addr to be used, defaults to False

False

extra_args

str

any extra arguments

''

port_range

str

target port range, defaults to all ports

'0-65535'

hit_count

str

the number of times to target each host, defaults to 2

'2'

rate

str

num of packets per second to send, defaults to 200

'200'

mode

str

probe mode. tcp/udp/icmp etc protocol, defaults to udp

'udp'
Source code in boardfarm3/use_cases/networking.py
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
@contextmanager
def nping(  # pylint: disable=too-many-arguments # noqa: PLR0913
    device: LAN,
    interface_ip: str,
    ipv6: bool = False,
    extra_args: str = "",
    port_range: str = "0-65535",
    hit_count: str = "2",
    rate: str = "200",
    mode: str = "udp",
) -> Generator[str]:
    """Perform nping command and kill process once done.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Execute the command from the connected LAN Client to do nping
          on [] side network

    :param device: connected client to perform nping command from.
    :type device: LAN
    :param interface_ip: interface ip addr
    :type interface_ip: str
    :param ipv6: if ipv6 addr to be used, defaults to False
    :type ipv6: bool
    :param extra_args: any extra arguments
    :type extra_args: str
    :param port_range: target port range, defaults to all ports
    :type port_range: str
    :param hit_count: the number of times to target each host,
        defaults to 2
    :type hit_count: str
    :param rate: num of packets per second to send, defaults to 200
    :type rate: str
    :param mode: probe mode. tcp/udp/icmp etc protocol, defaults to udp
    :type mode: str
    :yield: process id
    :rtype: Generator[str, None, None]
    """
    pid: str = ""
    try:
        pid = device.start_nping(
            interface_ip, ipv6, extra_args, port_range, hit_count, rate, mode
        )
        yield pid
    finally:
        device.stop_nping(process_id=pid)

ping

ping(
    device: LAN | WAN | WLAN,
    ping_ip: str,
    ping_count: int = 4,
    ping_interface: str | None = None,
    timeout: int = 50,
    json_output: bool = False,
    options: str = "",
) -> bool | dict[str, Any]

Ping remote host IP.

Return True if ping has 0% loss or parsed output in JSON if json_output=True flag is provided.

Parameters:

Name Type Description Default

device

LAN | WAN | WLAN

device on which ping is performed

required

ping_ip

str

IP to ping

required

ping_count

int

number of concurrent pings, defaults to 4

4

ping_interface

str | None

ping via interface, defaults to None

None

timeout

int

timeout, defaults to 50

50

json_output

bool

True if ping output in dictionary format else False, defaults to False

False

options

str

extra ping options, defaults to ""

''

Returns:

Type Description
bool | dict[str, Any]

bool or dict of ping output

Source code in boardfarm3/use_cases/networking.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def ping(  # noqa: PLR0913
    device: LAN | WAN | WLAN,
    ping_ip: str,
    ping_count: int = 4,
    ping_interface: str | None = None,
    timeout: int = 50,
    json_output: bool = False,
    options: str = "",
) -> bool | dict[str, Any]:
    """Ping remote host IP.

    Return True if ping has 0% loss or parsed output in JSON if
    json_output=True flag is provided.

    :param device: device on which ping is performed
    :type device: LAN | WAN
    :param ping_ip: IP to ping
    :type ping_ip: str
    :param ping_count: number of concurrent pings, defaults to 4
    :type ping_count: int
    :param ping_interface: ping via interface, defaults to None
    :type ping_interface: str | None
    :param timeout: timeout, defaults to 50
    :type timeout: int
    :param json_output: True if ping output in dictionary format else False,
        defaults to False
    :type json_output: bool
    :param options: extra ping options, defaults to ""
    :type options: str
    :return: bool or dict of ping output
    :rtype: bool | dict[str, Any]
    """
    return device.ping(
        ping_ip,
        ping_count,
        ping_interface,
        timeout=timeout,
        json_output=json_output,
        options=options,
    )

start_http_server

start_http_server(
    device: LAN | WAN, port: int | str, ip_version: str | int
) -> Generator

Start http server on given client.

.. hint:: This Use Case implements statements from the test suite such as:

- Start the HTTP server on the [] client

:yield: PID of the http server process

Parameters:

Name Type Description Default

device

LAN | WAN

device on which server will start

required

port

int | str

port on which the server listen for incomming connections

required

ip_version

str | int

ip version of server values can strictly be 4 or 6

required

Raises:

Type Description
ValueError

wrong ip_version value is given in api call

Source code in boardfarm3/use_cases/networking.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@contextmanager
def start_http_server(
    device: LAN | WAN,
    port: int | str,
    ip_version: str | int,
) -> Generator:
    """Start http server on given client.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start the HTTP server on the [] client

    :param device: device on which server will start
    :type device: LAN | WAN
    :param port: port on which the server listen for incomming connections
    :type port: int | str
    :param ip_version: ip version of server values can strictly be 4 or 6
    :type ip_version: str | int
    :raises ValueError: wrong ip_version value is given in api call
    :yield: PID of the http server process
    """
    port = str(port)
    ip_version = str(ip_version)
    if ip_version not in ["4", "6"]:
        reason = f"Invalid ip_version argument {ip_version}."
        raise ValueError(reason)
    # stop http service if running
    device.stop_http_service(port)
    try:
        yield device.start_http_service(port, ip_version)
    finally:
        device.stop_http_service(port)

trigger_ip_flood

trigger_ip_flood(
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int,
    min_rate: int,
    max_retries: int = 4,
    timeout: int = 30,
) -> dict[str, str]

Perform IP flooding via nmap network utility on source device.

.. hint:: This Use Case implements statements from the test suite such as:

- Perform IP flooding via nmap network utility on source device.

Parameters:

Name Type Description Default

source_device

LAN | WLAN | WAN

source device

required

destination_device

LAN | WLAN | WAN | CPE

destination device

required

ip_type

str

type of IP address: "ipv4", "ipv6"

required

port

str | int

port or range of ports: "666-999"

required

min_rate

int

send packets no slower than min_rate per second

required

max_retries

int

maximum number retries for nmap

4

timeout

int

pexpect timeout for the command in seconds, defaults to 30

30

Returns:

Type Description
dict[str,str]

XML output of the nmap command in form of dictionary

Source code in boardfarm3/use_cases/networking.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def trigger_ip_flood(  # noqa: PLR0913
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int,
    min_rate: int,
    max_retries: int = 4,
    timeout: int = 30,
) -> dict[str, str]:
    """Perform IP flooding via nmap network utility on source device.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Perform IP flooding via nmap network utility on source device.

    :param source_device: source device
    :type source_device: LAN | WLAN | WAN
    :param destination_device: destination device
    :type destination_device: LAN | WLAN | WAN | CPE
    :param ip_type: type of IP address: "ipv4", "ipv6"
    :type ip_type: str
    :param port: port or range of ports: "666-999"
    :type port: str | int
    :param min_rate: send packets no slower than min_rate per second
    :type min_rate: int
    :param max_retries: maximum number retries for nmap
    :type max_retries: int
    :param timeout: pexpect timeout for the command in seconds, defaults to 30
    :type timeout: int
    :return: XML output of the nmap command in form of dictionary
    :rtype: dict[str,str]
    """
    return nmap(
        source_device,
        destination_device,
        ip_type,
        port,
        "-sS",
        max_retries,
        min_rate,
        timeout=timeout,
    )

verify_tunnel_packets

verify_tunnel_packets(
    captured_sequence: list[tuple[str, ...]], expected_sequence: list[tuple[str, ...]]
) -> bool

Verify the expected encapsulated info with the captured sequence.

.. hint:: This Use Case implements statements from the test suite such as:

- Traffic from Ethernet LAN client to Internet, must be encapsulated like below

Parameters:

Name Type Description Default

captured_sequence

list[tuple[str, ...]]

Sequence of packets filtered from captured pcap file

required

expected_sequence

list[tuple[str, ...]]

Example for encapsulated traffic .. code-block:: python [ ( "10.1.2.105,10.15.137.242", "44:d4:54:e1:9e:57,44:d4:54:e1:9e:57", "172.30.113.175,8.8.8.8", "52:54:00:67:85:42,52:54:00:67:85:42", ) ]

required

Returns:

Type Description
bool

True if ICMP expected sequences matches with the captured sequence and encapsulated as expected sequence with outer and inner layer

Source code in boardfarm3/use_cases/networking.py
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
def verify_tunnel_packets(
    captured_sequence: list[tuple[str, ...]],
    expected_sequence: list[tuple[str, ...]],
) -> bool:
    """Verify the expected encapsulated info with the captured sequence.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Traffic from Ethernet LAN client to Internet, must be encapsulated like below

    :param captured_sequence: Sequence of packets filtered from captured pcap file
    :type captured_sequence: list[tuple[str, ...]]
    :param expected_sequence: Example for encapsulated traffic

            .. code-block:: python

                [
                    (
                        "10.1.2.105,10.15.137.242",
                        "44:d4:54:e1:9e:57,44:d4:54:e1:9e:57",
                        "172.30.113.175,8.8.8.8",
                        "52:54:00:67:85:42,52:54:00:67:85:42",
                    )
                ]

    :type expected_sequence: list[tuple[str, ...]]
    :return: True if ICMP expected sequences matches with the captured sequence
        and encapsulated as expected sequence with outer and inner layer
    :rtype: bool
    """
    final_result = []
    last_check = 0
    for packet in expected_sequence:
        for i in range(last_check, len(captured_sequence)):
            if all(
                expected == actual
                for expected, actual in zip(packet, captured_sequence[i])
                if expected != "*"
            ):
                last_check = i
                _LOGGER.debug(
                    "Verified encapsulated packets: %s,%s,%s,%s",
                    packet[0],
                    packet[1],
                    packet[2],
                    packet[3],
                )
                final_result.append(captured_sequence[i])
                break
        else:
            _LOGGER.debug(
                "Failed verification: %s,%s,%s,%s",
                packet[0],
                packet[1],
                packet[2],
                packet[3],
            )
            final_result.append(())
    return all(final_result)

ripv2

RIPv2 Use Cases library.

All Use Cases are independent of the device.

Functions:

Name Description
parse_rip_trace

Read and filter RIP packets from the captured file.

parse_rip_trace

parse_rip_trace(
    dev: LAN | WAN, fname: str, frame_time: bool, rm_pcap: bool
) -> list[RIPv2PacketData]

Read and filter RIP packets from the captured file.

The Routing Information Protocol is one of a family of IP Routing protocols. RIP is a simple vector routing protocol. This usecase parses RIP protocol packets.

.. code-block:: python

# example usage
cmts_packet_cap = read_rip_trace(
    device=LanClient, fname="some_capture.pcap", frame_time=False, rm_pcap=False
)

.. hint:: This Use Case implements statements from the test suite such as:

- Check that CPE is sending and receiving RIPv2 route advertisements to the CMTS
- Verify that CPE is sending RIPv2 route advertisements every 30 seconds.

Parameters:

Name Type Description Default

dev

LAN | WAN

device where captures were taken, LAN, WAN

required

fname

str

PCAP file to be read

required

frame_time

bool

If True stores timestap value in RIPv2PacketData else stores None

required

rm_pcap

bool

if True remove the PCAP file after reading else keeps the file

required

Returns:

Type Description
List[RIPv2PacketData]

list of RIP packets as .. code-block:: python [ ( frame, src ip, dst ip, rip contact, rip msg:media_attribute:connection:info, time ) ]

Raises:

Type Description
UseCaseFailure

when no RIPv2 trace is found in PCAP file

Source code in boardfarm3/use_cases/ripv2.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def parse_rip_trace(  # pylint: disable=too-many-locals
    dev: LAN | WAN,
    fname: str,
    frame_time: bool,
    rm_pcap: bool,
) -> list[RIPv2PacketData]:
    """Read and filter RIP packets from the captured file.

    The Routing Information Protocol is one of a family of IP Routing
    protocols. RIP is a simple vector routing protocol.
    This usecase parses RIP protocol packets.

    .. code-block:: python

        # example usage
        cmts_packet_cap = read_rip_trace(
            device=LanClient, fname="some_capture.pcap", frame_time=False, rm_pcap=False
        )

    .. hint:: This Use Case implements statements from the test suite such as:

        - Check that CPE is sending and receiving RIPv2 route advertisements to the CMTS
        - Verify that CPE is sending RIPv2 route advertisements every 30 seconds.

    :param dev: device where captures were taken, LAN, WAN
    :type dev: LAN | WAN
    :param fname: PCAP file to be read
    :type fname: str
    :param frame_time: If True stores timestap value in RIPv2PacketData else stores None
    :type frame_time: bool
    :param rm_pcap: if True remove the PCAP file after reading else keeps the file
    :type rm_pcap: bool
    :raises UseCaseFailure: when no RIPv2 trace is found in PCAP file
    :return: list of RIP packets as

        .. code-block:: python

            [
                (
                    frame,
                    src ip,
                    dst ip,
                    rip contact,
                    rip msg:media_attribute:connection:info,
                    time
                )
            ]

    :rtype: List[RIPv2PacketData]
    """
    output = []
    time_field = "-e frame.time_epoch" if frame_time else ""
    fields = (
        f" -Y rip -T fields -e ip.src -e ip.dst -e rip.ip -e rip.netmask {time_field}"
    )
    filter_str = fields
    raw_rip_packets = dev.tshark_read_pcap(
        fname=fname,
        additional_args=filter_str,
        rm_pcap=rm_pcap,
    )
    rip_packets = raw_rip_packets.split("This could be dangerous.\r\n")[-1]
    if not rip_packets:
        msg = f"No trace found in PCAP file {fname} with filters: {filter_str}"
        raise UseCaseFailure(msg)

    ftime = None
    for packet in rip_packets.splitlines():
        packet_fields = packet.split("\t")
        try:
            (
                src,
                dst,
                advertised_ips,
                netmask,
            ) = packet_fields[:4]

            if frame_time:
                ftime = datetime.fromtimestamp(
                    float(packet_fields[-1]),
                    tz=tz.tzlocal(),
                )

        except (IndexError, ValueError) as exception:
            msg = f"No RIPv2 trace found in PCAP file {fname}"
            raise UseCaseFailure(msg) from exception

        if advertised_ips:
            output.append(
                RIPv2PacketData(
                    source=IPv4Address(src),
                    destination=IPv4Address(dst),
                    ip_address=[IPv4Address(ip) for ip in advertised_ips.split(",")],
                    subnet=[ip_interface(mask) for mask in netmask.split(",")],
                    frame_time=ftime,
                ),
            )
    return output

voice

Voice use cases library.

This module deals with only SIP end points. All APIs are independent of board under test.

Functions:

Name Description
add_user_profile

Register user profile on the SIP Server.

answer_a_call

Answer a ringing call by target SIP agent.

answer_waiting_call

Answer the waiting call and hang up on the current call.

call_a_number

Have the Caller dial the given phone number.

call_a_phone

Call the Callee by the Caller.

disable_call_forwarding_busy

Disable call forwarding on a phone when busy.

disable_call_forwarding_no_answer

Disable call forwarding on a phone when not answered.

disable_unconditional_call_forwarding

Disable unconditional call forwarding on a phone.

disconnect_the_call

Disconnecting a call on a SIP agent.

enable_call_forwarding_busy

Enable call forwarding on a phone when busy.

enable_call_forwarding_no_answer

Enable call forwarding on a phone when there's no answer.

enable_unconditional_call_forwarding

Enable unconditional call forwarding on a phone.

get_sip_expiry_time

Get the call expiry timer from the config file.

initialize_phone

Configure the phone, and start the application.

is_call_connected

Verify if a call is connected.

is_call_dialing

Verify if a phone is dialing and a call in progress.

is_call_ended

Verify if the call has been disconnected and ended.

is_call_idle

Verify if a phone is in idle state.

is_call_in_conference

Verify if a call is in conference.

is_call_not_answered

Verify if callee did not pick up the call.

is_call_on_hold

Verify if a call is on hold.

is_call_ringing

Verify if a ringtone is detected on a phone device.

is_call_waiting

Verify if the phone notifies for the call on other line to be waiting.

is_code_ended

Verify if the dialed code or number has expired.

is_dialtone_detected

Verify if a dialtone is detected off_hook.

is_incall_connected

Verify if a call is incall connected.

is_incall_dialing

Verify if a phone is incall and call is dialing/in progress.

is_incall_playing_dialtone

Verify phone is connected on one line and playing dialtone on another line.

is_line_busy

To verify if the caller was notified that the callee is BUSY.

is_off_hook_warning

Verify phone has been left off-hook without use for an extended period.

is_playing_dialtone

Verify if the phone is playing a dialtone.

is_user_profile_present

Check whether the user profile is registered on the SIP Server or not.

merge_two_calls

Merge the two calls for conference calling.

place_call_offhold

Place an ongoing call on hold to off-hold.

place_call_onhold

Place an ongoing call on-hold.

press_R_button

Press the R button.

put_phone_offhook

Put the phone off hook.

remove_user_profile

Deregister user profile from the SIP Server.

set_sip_expiry_time

Modify the call expires timer in the config file of the sip_proxy.

shutdown_phone

Go on_hook and stop the phone application.

stop_and_start_sip_server

Stop and start the SIP server.

tcpdump

Start packet capture using tcpdump and kills the process at the end.

toggle_call

Toggle between the calls.

add_user_profile

add_user_profile(where_to_add: VoiceServer, whom_to_add: VoiceClient) -> None

Register user profile on the SIP Server.

.. hint:: This Use Case implements statements from the test suite such as:

- Add user profile of (User) to the SIP Server.

Parameters:

Name Type Description Default

where_to_add

VoiceServer

SIP Server

required

whom_to_add

VoiceClient

Phone device to be registered

required

Raises:

Type Description
VoiceError

if the device already exist on the SIP Server

Source code in boardfarm3/use_cases/voice.py
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
def add_user_profile(where_to_add: VoiceServer, whom_to_add: VoiceClient) -> None:
    """Register user profile on the SIP Server.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Add user profile of (User) to the SIP Server.

    :param where_to_add: SIP Server
    :type where_to_add: VoiceServer
    :param whom_to_add: Phone device to be registered
    :type whom_to_add: VoiceClient
    :raises VoiceError: if the device already exist on the SIP Server
    """
    if whom_to_add.number in where_to_add.sipserver_get_online_users():
        msg = f"User {whom_to_add.name} is already registered"
        raise VoiceError(msg)
    where_to_add.sipserver_user_add(whom_to_add.number)
    # TODO: the need for restart should be specific to the SIP server device class
    where_to_add.sipserver_restart()

answer_a_call

answer_a_call(who_answers: VoiceClient) -> bool

Answer a ringing call by target SIP agent.

Execution order: - Ensure there is a ring on the target agent. - Pick up the call - Ensure the line is connected

.. hint:: This Use Case implements statements from the test suite such as:

- (User) answers to (User)

Parameters:

Name Type Description Default

who_answers

VoiceClient

SIP agent who is suppose to answer the call.

required

Returns:

Type Description
bool

True if call is connected, else False

Raises:

Type Description
VoiceError

In case answering the call fails.

Source code in boardfarm3/use_cases/voice.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def answer_a_call(who_answers: VoiceClient) -> bool:
    """Answer a ringing call by target SIP agent.

    Execution order:
    - Ensure there is a ring on the target agent.
    - Pick up the call
    - Ensure the line is connected

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) answers to (User)

    :param who_answers: SIP agent who is suppose to answer the call.
    :type who_answers: VoiceClient
    :raises VoiceError: In case answering the call fails.
    :return: True if call is connected, else False
    :rtype: bool
    """
    if not who_answers.is_ringing():
        msg = f"{who_answers.name} is not ringing!!"
        raise VoiceError(msg)
    who_answers.answer()

    return who_answers.is_connected()

answer_waiting_call

answer_waiting_call(who_answers: VoiceClient) -> None

Answer the waiting call and hang up on the current call.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) presses R1

Parameters:

Name Type Description Default

who_answers

VoiceClient

SIP agent who is suppose to answer the call.

required
Source code in boardfarm3/use_cases/voice.py
254
255
256
257
258
259
260
261
262
263
264
def answer_waiting_call(who_answers: VoiceClient) -> None:
    """Answer the waiting call and hang up on the current call.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) presses R1

    :param who_answers: SIP agent who is suppose to answer the call.
    :type who_answers: VoiceClient
    """
    who_answers.answer_waiting_call()

call_a_number

call_a_number(caller: VoiceClient, phone_nr: str) -> None

Have the Caller dial the given phone number.

Parameters:

Name Type Description Default

caller

VoiceClient

SIP agent that initiates the call

required

phone_nr

str

Phone number to be dialled

required
Source code in boardfarm3/use_cases/voice.py
83
84
85
86
87
88
89
90
91
def call_a_number(caller: VoiceClient, phone_nr: str) -> None:
    """Have the Caller dial the given phone number.

    :param caller: SIP agent that initiates the call
    :type caller: VoiceClient
    :param phone_nr: Phone number to be dialled
    :type phone_nr: str
    """
    caller.dial(phone_nr)

call_a_phone

call_a_phone(caller: VoiceClient, callee: VoiceClient) -> None

Call the Callee by the Caller.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) calls (User)

Parameters:

Name Type Description Default

caller

VoiceClient

SIP agent who intiates the call

required

callee

VoiceClient

SIP agent who receives the call

required

Raises:

Type Description
VoiceError

In case the call fails.

Source code in boardfarm3/use_cases/voice.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def call_a_phone(caller: VoiceClient, callee: VoiceClient) -> None:
    """Call the Callee by the Caller.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) calls (User)

    :param caller: SIP agent who intiates the call
    :type caller: VoiceClient
    :param callee: SIP agent who receives the call
    :type callee: VoiceClient
    :raises VoiceError: In case the call fails.
    """
    try:
        assert caller is not callee, "Caller and Callee cannot be same!"  # noqa: S101
        caller.dial(callee.number)
    except Exception as exc:  # pylint: disable=broad-except  # BOARDFARM-4980
        msg = f"Failed to initiate a call between: {caller.name} --> {callee.name}"
        raise VoiceError(msg) from exc

disable_call_forwarding_busy

disable_call_forwarding_busy(who_disables: VoiceClient, sip_srv: SIPServer) -> None

Disable call forwarding on a phone when busy.

.. hint:: This Use Case implements statements from the test suite such as:

- Disable call forwarding on (User)

Parameters:

Name Type Description Default

who_disables

VoiceClient

Agent that disables call forwarding busy

required

sip_srv

SIPServer

SIP server to retrieve the Vertical Service Code from

required
Source code in boardfarm3/use_cases/voice.py
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
def disable_call_forwarding_busy(
    who_disables: VoiceClient,
    sip_srv: SIPServer,
) -> None:
    """Disable call forwarding on a phone when busy.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Disable call forwarding on (User)

    :param who_disables: Agent that disables call forwarding busy
    :type who_disables: VoiceClient
    :param sip_srv: SIP server to retrieve the Vertical Service Code from
    :type sip_srv: SIPServer
    """
    vsc = sip_srv.get_vsc_prefix("unset_cf_busy")
    who_disables.dial_feature_code(vsc)

disable_call_forwarding_no_answer

disable_call_forwarding_no_answer(
    who_disables: VoiceClient, sip_srv: SIPServer
) -> None

Disable call forwarding on a phone when not answered.

.. hint:: This Use Case implements statements from the test suite such as:

- Disable call forwarding no answer on (User)

Parameters:

Name Type Description Default

who_disables

VoiceClient

Agent that disables call forwarding no answer

required

sip_srv

SIPServer

SIP server to retrieve the Vertical Service Code from

required
Source code in boardfarm3/use_cases/voice.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def disable_call_forwarding_no_answer(
    who_disables: VoiceClient,
    sip_srv: SIPServer,
) -> None:
    """Disable call forwarding on a phone when not answered.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Disable call forwarding no answer on (User)

    :param who_disables: Agent that disables call forwarding no answer
    :type who_disables: VoiceClient
    :param sip_srv: SIP server to retrieve the Vertical Service Code from
    :type sip_srv: SIPServer
    """
    vsc = sip_srv.get_vsc_prefix("unset_cf_no_answer")
    who_disables.dial_feature_code(vsc)

disable_unconditional_call_forwarding

disable_unconditional_call_forwarding(
    who_disables: VoiceClient, sip_srv: SIPServer
) -> None

Disable unconditional call forwarding on a phone.

Parameters:

Name Type Description Default

who_disables

VoiceClient

Agent that disables call forwarding busy

required

sip_srv

SIPServer

SIP server to retrieve the Vertical Service Code from

required
Source code in boardfarm3/use_cases/voice.py
691
692
693
694
695
696
697
698
699
700
701
702
703
def disable_unconditional_call_forwarding(
    who_disables: VoiceClient,
    sip_srv: SIPServer,
) -> None:
    """Disable unconditional call forwarding on a phone.

    :param who_disables: Agent that disables call forwarding busy
    :type who_disables: VoiceClient
    :param sip_srv: SIP server to retrieve the Vertical Service Code from
    :type sip_srv: SIPServer
    """
    vsc = sip_srv.get_vsc_prefix("unset_cf_unconditional")
    who_disables.dial_feature_code(vsc)

disconnect_the_call

disconnect_the_call(who_disconnects: VoiceClient) -> bool

Disconnecting a call on a SIP agent.

The user will not verify if the call is ongoing. It will simply call the on_hook implementation.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) goes on hook.

Parameters:

Name Type Description Default

who_disconnects

VoiceClient

SIP agent who is suppose to disconnect the call.

required

Returns:

Type Description
bool

True if call is disconnected

Raises:

Type Description
VoiceError

In case disconnecting the call fails.

Source code in boardfarm3/use_cases/voice.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def disconnect_the_call(who_disconnects: VoiceClient) -> bool:
    """Disconnecting a call on a SIP agent.

    The user will not verify if the call is ongoing.
    It will simply call the on_hook implementation.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) goes on hook.

    :param who_disconnects: SIP agent who is suppose to disconnect the call.
    :type who_disconnects: VoiceClient
    :raises VoiceError: In case disconnecting the call fails.
    :return: True if call is disconnected
    :rtype: bool
    """
    try:
        who_disconnects.on_hook()
    except Exception as exc:  # pylint: disable=broad-except  # BOARDFARM-4981
        msg = f"Failed to disconnect the call: {who_disconnects.name}"
        raise VoiceError(msg) from exc
    return True

enable_call_forwarding_busy

enable_call_forwarding_busy(
    who_forwards: VoiceClient, forward_to: VoiceClient, sip_srv: SIPServer
) -> None

Enable call forwarding on a phone when busy.

This thus forwards a call to another user

.. hint:: This Use Case implements statements from the test suite such as:

- Enable Call Forwarding Busy on (User) to (User)

Parameters:

Name Type Description Default

who_forwards

VoiceClient

Agent that enables call forwarding busy

required

forward_to

VoiceClient

SIP Client to which agent forwards the call to

required

sip_srv

SIPServer

sip server

required
Source code in boardfarm3/use_cases/voice.py
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
def enable_call_forwarding_busy(
    who_forwards: VoiceClient,
    forward_to: VoiceClient,
    sip_srv: SIPServer,
) -> None:
    """Enable call forwarding on a phone when busy.

    This thus forwards a call to another user

    .. hint:: This Use Case implements statements from the test suite such as:

        - Enable Call Forwarding Busy on (User) to (User)

    :param who_forwards: Agent that enables call forwarding busy
    :type who_forwards: VoiceClient
    :param forward_to: SIP Client to which agent forwards the call to
    :type forward_to: VoiceClient
    :param sip_srv: sip server
    :type sip_srv: SIPServer
    """
    vsc = sip_srv.get_vsc_prefix("set_cf_busy")
    if vsc.endswith("#"):
        who_forwards.dial_feature_code(f"{vsc[:-1]}{forward_to.number}#")
    else:
        who_forwards.dial_feature_code(f"{vsc}{forward_to.number}")

enable_call_forwarding_no_answer

enable_call_forwarding_no_answer(
    who_forwards: VoiceClient, forward_to: VoiceClient, sip_srv: SIPServer
) -> None

Enable call forwarding on a phone when there's no answer.

This thus forwards a call to another user

.. hint:: This Use Case implements statements from the test suite such as:

- Enable Call Forwarding No Answer on (User) to (User)

Parameters:

Name Type Description Default

who_forwards

VoiceClient

Agent that enables call forwarding no answer

required

forward_to

VoiceClient

SIP Client to which agent forwards the call to

required

sip_srv

SIPServer

SIP server to retrieve the Vertical Service Code from

required
Source code in boardfarm3/use_cases/voice.py
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
def enable_call_forwarding_no_answer(
    who_forwards: VoiceClient,
    forward_to: VoiceClient,
    sip_srv: SIPServer,
) -> None:
    """Enable call forwarding on a phone when there's no answer.

    This thus forwards a call to another user

    .. hint:: This Use Case implements statements from the test suite such as:

        - Enable Call Forwarding No Answer on (User) to (User)

    :param who_forwards: Agent that enables call forwarding no answer
    :type who_forwards: VoiceClient
    :param forward_to: SIP Client to which agent forwards the call to
    :type forward_to: VoiceClient
    :param sip_srv: SIP server to retrieve the Vertical Service Code from
    :type sip_srv: SIPServer
    """
    vsc = sip_srv.get_vsc_prefix("set_cf_no_answer")
    if vsc.endswith("#"):
        who_forwards.dial_feature_code(f"{vsc[:-1]}{forward_to.number}#")
    else:
        who_forwards.dial_feature_code(f"{vsc}{forward_to.number}")

enable_unconditional_call_forwarding

enable_unconditional_call_forwarding(
    who_forwards: VoiceClient, forward_to: VoiceClient, sip_srv: SIPServer
) -> None

Enable unconditional call forwarding on a phone.

This thus forwards a call to another user

Parameters:

Name Type Description Default

who_forwards

VoiceClient

Agent that enables call forwarding busy

required

forward_to

VoiceClient

SIP Client to which agent forwards the call to

required

sip_srv

SIPServer

SIP server

required
Source code in boardfarm3/use_cases/voice.py
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
def enable_unconditional_call_forwarding(
    who_forwards: VoiceClient,
    forward_to: VoiceClient,
    sip_srv: SIPServer,
) -> None:
    """Enable unconditional call forwarding on a phone.

    This thus forwards a call to another user

    :param who_forwards: Agent that enables call forwarding busy
    :type who_forwards: VoiceClient
    :param forward_to: SIP Client to which agent forwards the call to
    :type forward_to: VoiceClient
    :param sip_srv: SIP server
    :type sip_srv: SIPServer
    """
    vsc = sip_srv.get_vsc_prefix("set_cf_unconditional")
    if vsc.endswith("#"):
        who_forwards.dial_feature_code(f"{vsc[:-1]}{forward_to.number}#")
    else:
        who_forwards.dial_feature_code(f"{vsc}{forward_to.number}")

get_sip_expiry_time

get_sip_expiry_time(sip_proxy: VoiceServer) -> int

Get the call expiry timer from the config file.

Parameters:

Name Type Description Default

sip_proxy

VoiceServer

SIP server

required

Returns:

Type Description
int

expiry timer saved in the config

Raises:

Type Description
VoiceError

if the SIP Server is not installed

Source code in boardfarm3/use_cases/voice.py
790
791
792
793
794
795
796
797
798
799
800
801
802
def get_sip_expiry_time(sip_proxy: VoiceServer) -> int:
    """Get the call expiry timer from the config file.

    :param sip_proxy: SIP server
    :type sip_proxy: VoiceServer
    :return: expiry timer saved in the config
    :rtype: int
    :raises VoiceError: if the SIP Server is not installed
    """
    if sip_proxy.sipserver_status() in ["Not installed", "Not Running"]:
        err_msg = "Install the sipserver first"
        raise VoiceError(err_msg)
    return sip_proxy.sipserver_get_expire_timer()

initialize_phone

initialize_phone(target_phone: VoiceClient) -> None

Configure the phone, and start the application.

.. hint:: This Use Case implements statements from the test suite such as:

- Configure the phone, and start the application.

Parameters:

Name Type Description Default

target_phone

VoiceClient

Target phone to be initialized

required
Source code in boardfarm3/use_cases/voice.py
219
220
221
222
223
224
225
226
227
228
229
230
def initialize_phone(target_phone: VoiceClient) -> None:
    """Configure the phone, and start the application.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Configure the phone, and start the application.

    :param target_phone: Target phone to be initialized
    :type target_phone: VoiceClient
    """
    target_phone.phone_start()
    target_phone.on_hook()

is_call_connected

is_call_connected(who_is_connected: VoiceClient) -> bool

Verify if a call is connected.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if a call is connected.

Parameters:

Name Type Description Default

who_is_connected

VoiceClient

SIP client on which connection needs to be checked

required

Returns:

Type Description
bool

True if call is connected.

Source code in boardfarm3/use_cases/voice.py
421
422
423
424
425
426
427
428
429
430
431
432
433
def is_call_connected(who_is_connected: VoiceClient) -> bool:
    """Verify if a call is connected.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if a call is connected.

    :param who_is_connected: SIP client on which connection needs to be checked
    :type who_is_connected: VoiceClient
    :return: True if call is connected.
    :rtype: bool
    """
    return who_is_connected.is_connected()

is_call_dialing

is_call_dialing(who_is_dialing: VoiceClient) -> bool

Verify if a phone is dialing and a call in progress.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if a phone is dialing and a call in progress.

Parameters:

Name Type Description Default

who_is_dialing

VoiceClient

SIP agent used to verify dialing

required

Returns:

Type Description
bool

True if in progress dialing is detected

Source code in boardfarm3/use_cases/voice.py
357
358
359
360
361
362
363
364
365
366
367
368
369
def is_call_dialing(who_is_dialing: VoiceClient) -> bool:
    """Verify if a phone is dialing and a call in progress.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if a phone is dialing and a call in progress.

    :param who_is_dialing: SIP agent used to verify dialing
    :type who_is_dialing: VoiceClient
    :return: True if in progress dialing is detected
    :rtype: bool
    """
    return who_is_dialing.is_dialing()

is_call_ended

is_call_ended(whose_call_ended: VoiceClient) -> bool

Verify if the call has been disconnected and ended.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if the call has been disconnected and ended.

Parameters:

Name Type Description Default

whose_call_ended

VoiceClient

SIP Client

required

Returns:

Type Description
bool

True if call is disconnected

Source code in boardfarm3/use_cases/voice.py
499
500
501
502
503
504
505
506
507
508
509
510
511
def is_call_ended(whose_call_ended: VoiceClient) -> bool:
    """Verify if the call has been disconnected and ended.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if the call has been disconnected and ended.

    :param whose_call_ended: SIP Client
    :type whose_call_ended: VoiceClient
    :return: True if call is disconnected
    :rtype: bool
    """
    return whose_call_ended.is_call_ended()

is_call_idle

is_call_idle(who_is_idle: VoiceClient) -> bool

Verify if a phone is in idle state.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) goes on Hook
- Verify if a phone is in idle state.

Parameters:

Name Type Description Default

who_is_idle

VoiceClient

SIP agent used to verify idle

required

Returns:

Type Description
bool

True if idle is detected

Source code in boardfarm3/use_cases/voice.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def is_call_idle(who_is_idle: VoiceClient) -> bool:
    """Verify if a phone is in idle state.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) goes on Hook
        - Verify if a phone is in idle state.

    :param who_is_idle: SIP agent used to verify idle
    :type who_is_idle: VoiceClient
    :return: True if idle is detected
    :rtype: bool
    """
    return who_is_idle.is_idle()

is_call_in_conference

is_call_in_conference(who_in_conference: VoiceClient) -> bool

Verify if a call is in conference.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if a call is in conference.

Parameters:

Name Type Description Default

who_in_conference

VoiceClient

SIP client on which conference state needs to be checked

required

Returns:

Type Description
bool

True if call is in conference state

Source code in boardfarm3/use_cases/voice.py
468
469
470
471
472
473
474
475
476
477
478
479
480
def is_call_in_conference(who_in_conference: VoiceClient) -> bool:
    """Verify if a call is in conference.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if a call is in conference.

    :param who_in_conference: SIP client on which conference state needs to be checked
    :type who_in_conference: VoiceClient
    :return: True if call is in conference state
    :rtype: bool
    """
    return who_in_conference.is_in_conference()

is_call_not_answered

is_call_not_answered(whose_call: VoiceClient) -> bool

Verify if callee did not pick up the call.

The Caller will receive a NO CARRIER or TIMEOUT reply.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if callee did not pick up the call.

Parameters:

Name Type Description Default

whose_call

VoiceClient

Callee

required

Returns:

Type Description
bool

True is not answered, else False

Source code in boardfarm3/use_cases/voice.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def is_call_not_answered(whose_call: VoiceClient) -> bool:
    """Verify if callee did not pick up the call.

    The Caller will receive a NO CARRIER or TIMEOUT reply.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if callee did not pick up the call.

    :param whose_call: Callee
    :type whose_call: VoiceClient
    :return: True is not answered, else False
    :rtype: bool
    """
    return whose_call.is_call_not_answered()

is_call_on_hold

is_call_on_hold(who_is_onhold: VoiceClient) -> bool

Verify if a call is on hold.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if a call is on hold.

Parameters:

Name Type Description Default

who_is_onhold

VoiceClient

SIP client on which hold state needs to be checked

required

Returns:

Type Description
bool

True if call is on hold.

Source code in boardfarm3/use_cases/voice.py
453
454
455
456
457
458
459
460
461
462
463
464
465
def is_call_on_hold(who_is_onhold: VoiceClient) -> bool:
    """Verify if a call is on hold.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if a call is on hold.

    :param who_is_onhold: SIP client on which hold state needs to be checked
    :type who_is_onhold: VoiceClient
    :return: True if call is on hold.
    :rtype: bool
    """
    return who_is_onhold.is_onhold()

is_call_ringing

is_call_ringing(who_is_ringing: VoiceClient) -> bool

Verify if a ringtone is detected on a phone device.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if a ringtone is detected on a phone device.

Parameters:

Name Type Description Default

who_is_ringing

VoiceClient

SIP agent used to verify ringtone

required

Returns:

Type Description
bool

True if ringtone is detected

Source code in boardfarm3/use_cases/voice.py
406
407
408
409
410
411
412
413
414
415
416
417
418
def is_call_ringing(who_is_ringing: VoiceClient) -> bool:
    """Verify if a ringtone is detected on a phone device.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if a ringtone is detected on a phone device.

    :param who_is_ringing: SIP agent used to verify ringtone
    :type who_is_ringing: VoiceClient
    :return: True if ringtone is detected
    :rtype: bool
    """
    return who_is_ringing.is_ringing()

is_call_waiting

is_call_waiting(who_is_waiting: VoiceClient) -> bool

Verify if the phone notifies for the call on other line to be waiting.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if the phone notifies for the call on other line to be waiting.

Parameters:

Name Type Description Default

who_is_waiting

VoiceClient

SIP Client

required

Returns:

Type Description
bool

True if call is in waiting

Source code in boardfarm3/use_cases/voice.py
530
531
532
533
534
535
536
537
538
539
540
541
542
def is_call_waiting(who_is_waiting: VoiceClient) -> bool:
    """Verify if the phone notifies for the call on other line to be waiting.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if the phone notifies for the call on other line to be waiting.

    :param who_is_waiting: SIP Client
    :type who_is_waiting: VoiceClient
    :return: True if call is in waiting
    :rtype: bool
    """
    return who_is_waiting.is_call_waiting()

is_code_ended

is_code_ended(whose_code_ended: VoiceClient) -> bool

Verify if the dialed code or number has expired.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if the dialed code or number has expired.

Parameters:

Name Type Description Default

whose_code_ended

VoiceClient

SIP Client

required

Returns:

Type Description
bool

True if code ended

Source code in boardfarm3/use_cases/voice.py
515
516
517
518
519
520
521
522
523
524
525
526
527
def is_code_ended(whose_code_ended: VoiceClient) -> bool:
    """Verify if the dialed code or number has expired.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if the dialed code or number has expired.

    :param whose_code_ended: SIP Client
    :type whose_code_ended: VoiceClient
    :return: True if code ended
    :rtype: bool
    """
    return whose_code_ended.is_code_ended()

is_dialtone_detected

is_dialtone_detected(agent: VoiceClient) -> bool

Verify if a dialtone is detected off_hook.

Device will be first placed on_hook. This is done to ensure disconnecting any previous sessions.

After verifying dial tone device will again go back to on_hook state.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) goes off hook

Parameters:

Name Type Description Default

agent

VoiceClient

SIP agent used to verify dialtone.

required

Returns:

Type Description
bool

True if dialtone is detected.

Source code in boardfarm3/use_cases/voice.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def is_dialtone_detected(agent: VoiceClient) -> bool:
    """Verify if a dialtone is detected off_hook.

    Device will be first placed on_hook.
    This is done to ensure disconnecting any previous sessions.

    After verifying dial tone device will again go back to on_hook state.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) goes off hook

    :param agent: SIP agent used to verify dialtone.
    :type agent: VoiceClient
    :return: True if dialtone is detected.
    :rtype: bool
    """
    agent.on_hook()
    return agent.detect_dialtone()

is_incall_connected

is_incall_connected(who_is_incall_connected: VoiceClient) -> bool

Verify if a call is incall connected.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) answers to (User)
- Verify if a call is incall connected.

Parameters:

Name Type Description Default

who_is_incall_connected

VoiceClient

SIP client on which connection needs to be checked

required

Returns:

Type Description
bool

True if phone is incall connected.

Source code in boardfarm3/use_cases/voice.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def is_incall_connected(who_is_incall_connected: VoiceClient) -> bool:
    """Verify if a call is incall connected.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) answers to (User)
        - Verify if a call is incall connected.

    :param who_is_incall_connected: SIP client on which connection needs to be checked
    :type who_is_incall_connected: VoiceClient
    :return: True if phone is incall connected.
    :rtype: bool
    """
    return who_is_incall_connected.is_incall_connected()

is_incall_dialing

is_incall_dialing(who_is_incall_dialing: VoiceClient) -> bool

Verify if a phone is incall and call is dialing/in progress.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if a phone is incall and call is dialing/in progress.

Parameters:

Name Type Description Default

who_is_incall_dialing

VoiceClient

SIP agent used to verify incall dialing

required

Returns:

Type Description
bool

True if in progress dialing is detected

Source code in boardfarm3/use_cases/voice.py
373
374
375
376
377
378
379
380
381
382
383
384
385
def is_incall_dialing(who_is_incall_dialing: VoiceClient) -> bool:
    """Verify if a phone is incall and call is dialing/in progress.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if a phone is incall and call is dialing/in progress.

    :param who_is_incall_dialing: SIP agent used to verify incall dialing
    :type who_is_incall_dialing: VoiceClient
    :return: True if in progress dialing is detected
    :rtype: bool
    """
    return who_is_incall_dialing.is_incall_dialing()

is_incall_playing_dialtone

is_incall_playing_dialtone(who_is_playing_incall_dialtone: VoiceClient) -> bool

Verify phone is connected on one line and playing dialtone on another line.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify phone is connected on one line and playing dialtone on another line.

Parameters:

Name Type Description Default

who_is_playing_incall_dialtone

VoiceClient

SIP Client

required

Returns:

Type Description
bool

True if call is incall playing dialtone state

Source code in boardfarm3/use_cases/voice.py
546
547
548
549
550
551
552
553
554
555
556
557
558
def is_incall_playing_dialtone(who_is_playing_incall_dialtone: VoiceClient) -> bool:
    """Verify phone is connected on one line and playing dialtone on another line.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify phone is connected on one line and playing dialtone on another line.

    :param who_is_playing_incall_dialtone: SIP Client
    :type who_is_playing_incall_dialtone: VoiceClient
    :return: True if call is incall playing dialtone state
    :rtype: bool
    """
    return who_is_playing_incall_dialtone.is_incall_playing_dialtone()

is_line_busy

is_line_busy(on_which_agent: VoiceClient, who_is_busy: VoiceClient) -> bool

To verify if the caller was notified that the callee is BUSY.

Some phone will send this explicitly and will have RINGING 180 as well inside their trace.

.. hint:: This Use Case implements statements from the test suite such as:

- To verify if the caller was notified that the callee is BUSY.

Parameters:

Name Type Description Default

on_which_agent

VoiceClient

Caller who will receive BUSY

required

who_is_busy

VoiceClient

Callee who replies BUSY

required

Returns:

Type Description
bool

True if caller received BUSY, else False

Source code in boardfarm3/use_cases/voice.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def is_line_busy(on_which_agent: VoiceClient, who_is_busy: VoiceClient) -> bool:
    """To verify if the caller was notified that the callee is BUSY.

    Some phone will send this explicitly and will have RINGING 180
    as well inside their trace.

    .. hint:: This Use Case implements statements from the test suite such as:

        - To verify if the caller was notified that the callee is BUSY.

    :param on_which_agent: Caller who will receive BUSY
    :type on_which_agent: VoiceClient
    :param who_is_busy: Callee who replies BUSY
    :type who_is_busy: VoiceClient
    :return: True if caller received BUSY, else False
    :rtype: bool
    """
    assert on_which_agent is not who_is_busy, "Both args cannot be same"  # noqa: S101

    return on_which_agent.is_line_busy()

is_off_hook_warning

is_off_hook_warning(who_has_offhook_warning: VoiceClient) -> bool

Verify phone has been left off-hook without use for an extended period.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify phone has been left off-hook without use for an extended period.

Parameters:

Name Type Description Default

who_has_offhook_warning

VoiceClient

SIP Client

required

Returns:

Type Description
bool

True if phone generates off hook warning state

Source code in boardfarm3/use_cases/voice.py
561
562
563
564
565
566
567
568
569
570
571
572
573
def is_off_hook_warning(who_has_offhook_warning: VoiceClient) -> bool:
    """Verify phone has been left off-hook without use for an extended period.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify phone has been left off-hook without use for an extended period.

    :param who_has_offhook_warning: SIP Client
    :type who_has_offhook_warning: VoiceClient
    :return: True if phone generates off hook warning state
    :rtype: bool
    """
    return who_has_offhook_warning.has_off_hook_warning()

is_playing_dialtone

is_playing_dialtone(who_is_playing_dialtone: VoiceClient) -> bool

Verify if the phone is playing a dialtone.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify if the phone is playing a dialtone.

Parameters:

Name Type Description Default

who_is_playing_dialtone

VoiceClient

SIP Client

required

Returns:

Type Description
bool

True if dialtone is playing else False

Source code in boardfarm3/use_cases/voice.py
484
485
486
487
488
489
490
491
492
493
494
495
496
def is_playing_dialtone(who_is_playing_dialtone: VoiceClient) -> bool:
    """Verify if the phone is playing a dialtone.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify if the phone is playing a dialtone.

    :param who_is_playing_dialtone: SIP Client
    :type who_is_playing_dialtone: VoiceClient
    :return: True if dialtone is playing else False
    :rtype: bool
    """
    return who_is_playing_dialtone.is_playing_dialtone()

is_user_profile_present

is_user_profile_present(sip_proxy: VoiceServer, whose_profile: VoiceClient) -> bool

Check whether the user profile is registered on the SIP Server or not.

.. hint:: This Use Case implements statements from the test suite such as:

- Make sure that (User) is successfully registered.

Parameters:

Name Type Description Default

sip_proxy

VoiceServer

SIP Server

required

whose_profile

VoiceClient

Phone device to be checked

required

Returns:

Type Description
bool

True if phone device is registered on the SIP Server

Source code in boardfarm3/use_cases/voice.py
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
def is_user_profile_present(sip_proxy: VoiceServer, whose_profile: VoiceClient) -> bool:
    """Check whether the user profile is registered on the SIP Server or not.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Make sure that (User) is successfully registered.

    :param sip_proxy: SIP Server
    :type sip_proxy: VoiceServer
    :param whose_profile: Phone device to be checked
    :type whose_profile: VoiceClient
    :return: True if phone device is registered on the SIP Server
    :rtype: bool
    """
    return whose_profile.number in sip_proxy.sipserver_get_online_users()

merge_two_calls

merge_two_calls(who_is_conferencing: VoiceClient) -> None

Merge the two calls for conference calling.

Ensure call waiting must be enabled. There must be a call on other line to add to conference.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) presses R3 and initiates 3-Way Conference

Parameters:

Name Type Description Default

who_is_conferencing

VoiceClient

SIP agent that adds all calls in a conference.

required
Source code in boardfarm3/use_cases/voice.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def merge_two_calls(who_is_conferencing: VoiceClient) -> None:
    """Merge the two calls for conference calling.

    Ensure call waiting must be enabled.
    There must be a call on other line to add to conference.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) presses R3 and initiates 3-Way Conference

    :param who_is_conferencing: SIP agent that adds all calls in a conference.
    :type who_is_conferencing: VoiceClient
    """
    client: SIPPhone = who_is_conferencing
    client.merge_two_calls()

place_call_offhold

place_call_offhold(who_places: VoiceClient) -> None

Place an ongoing call on hold to off-hold.

There must be an active call to be placed off hold.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) presses flash hook

Parameters:

Name Type Description Default

who_places

VoiceClient

SIP agent that is suppose to place the call off-hold.

required
Source code in boardfarm3/use_cases/voice.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def place_call_offhold(who_places: VoiceClient) -> None:
    """Place an ongoing call on hold to off-hold.

    There must be an active call to be placed off hold.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) presses flash hook

    :param who_places: SIP agent that is suppose to place the call off-hold.
    :type who_places: VoiceClient
    """
    client: SIPPhone = who_places
    client.place_call_offhold()

place_call_onhold

place_call_onhold(who_places: VoiceClient) -> None

Place an ongoing call on-hold.

There must be an active call to be placed on hold.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) presses flash hook

Parameters:

Name Type Description Default

who_places

VoiceClient

SIP agent that is suppose to place the call on-hold.

required

Raises:

Type Description
VoiceError

If there is no on-going call

Source code in boardfarm3/use_cases/voice.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def place_call_onhold(who_places: VoiceClient) -> None:
    """Place an ongoing call on-hold.

    There must be an active call to be placed on hold.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) presses flash hook

    :param who_places: SIP agent that is suppose to place the call on-hold.
    :type who_places: VoiceClient
    :raises VoiceError: If there is no on-going call
    """
    client: SIPPhone = who_places
    if not client.is_connected() and not client.is_incall_connected():
        msg = "No active call in place!!"
        raise VoiceError(msg)
    client.place_call_onhold()

press_R_button

press_R_button(who_presses: VoiceClient) -> None

Press the R button.

Used when we put a call on hold, or during dialing.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) presses R button

Parameters:

Name Type Description Default

who_presses

VoiceClient

Agent that presses the R button.

required
Source code in boardfarm3/use_cases/voice.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def press_R_button(who_presses: VoiceClient) -> None:  # pylint: disable=invalid-name
    """Press the R button.

    Used when we put a call on hold, or during dialing.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) presses R button

    :param who_presses: Agent that presses the R button.
    :type who_presses: VoiceClient
    """
    client: SIPPhone = who_presses
    client.press_R_button()

put_phone_offhook

put_phone_offhook(who_puts_offhook: VoiceClient) -> None

Put the phone off hook.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) goes off Hook.

Parameters:

Name Type Description Default

who_puts_offhook

VoiceClient

SIP agent who puts phone off hook

required
Source code in boardfarm3/use_cases/voice.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def put_phone_offhook(who_puts_offhook: VoiceClient) -> None:
    """Put the phone off hook.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) goes off Hook.

    :param who_puts_offhook: SIP agent who puts phone off hook
    :type who_puts_offhook: VoiceClient
    """
    who_puts_offhook.off_hook()

remove_user_profile

remove_user_profile(where_to_remove: VoiceServer, whom_to_remove: VoiceClient) -> None

Deregister user profile from the SIP Server.

.. hint:: This Use Case implements statements from the test suite such as:

- Remove the user profile of (User A) from SIP Server.

Parameters:

Name Type Description Default

where_to_remove

VoiceServer

SIP Server

required

whom_to_remove

VoiceClient

Phone device to be removed

required

Raises:

Type Description
VoiceError

if the device does not already exist on the SIP Server

Source code in boardfarm3/use_cases/voice.py
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
def remove_user_profile(
    where_to_remove: VoiceServer,
    whom_to_remove: VoiceClient,
) -> None:
    """Deregister user profile from the SIP Server.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Remove the user profile of (User A) from SIP Server.

    :param where_to_remove: SIP Server
    :type where_to_remove: VoiceServer
    :param whom_to_remove: Phone device to be removed
    :type whom_to_remove: VoiceClient
    :raises VoiceError: if the device does not already exist on the SIP Server
    """
    if whom_to_remove.number not in where_to_remove.sipserver_get_online_users():
        msg = f"User {whom_to_remove.name} is not registered"
        raise VoiceError(msg)
    where_to_remove.remove_endpoint_from_sipserver(whom_to_remove.number)
    # TODO: the restart, as it might not be needed for some Voice servers, should be
    # part of the remove_enpoint_...() method
    where_to_remove.sipserver_restart()

set_sip_expiry_time

set_sip_expiry_time(sip_proxy: VoiceServer, to_what_time: int = 60) -> None

Modify the call expires timer in the config file of the sip_proxy.

.. hint:: This Use Case implements statements from the test suite such as:

- Modify the call expires timer in the config file of the sip_proxy.

Parameters:

Name Type Description Default

sip_proxy

VoiceServer

SIP Server

required

to_what_time

int

New expiry time to be set, defaults to 60

60

Raises:

Type Description
VoiceError

if the sipserver is not installed

Source code in boardfarm3/use_cases/voice.py
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
def set_sip_expiry_time(sip_proxy: VoiceServer, to_what_time: int = 60) -> None:
    """Modify the call expires timer in the config file of the sip_proxy.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Modify the call expires timer in the config file of the sip_proxy.

    :param sip_proxy: SIP Server
    :type sip_proxy: VoiceServer
    :param to_what_time: New expiry time to be set, defaults to 60
    :type to_what_time: int
    :raises VoiceError: if the sipserver is not installed
    """
    if sip_proxy.sipserver_status() in ["Not installed", "Not Running"]:
        msg = "Install the sipserver first"
        raise VoiceError(msg)
    sip_proxy.sipserver_set_expire_timer(
        to_timer=to_what_time,
    )

shutdown_phone

shutdown_phone(target_phone: VoiceClient) -> None

Go on_hook and stop the phone application.

.. hint:: This Use Case implements statements from the test suite such as:

- Go on_hook and stop the phone application.

Parameters:

Name Type Description Default

target_phone

VoiceClient

Target phone to be initialized.

required
Source code in boardfarm3/use_cases/voice.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def shutdown_phone(target_phone: VoiceClient) -> None:
    """Go on_hook and stop the phone application.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Go on_hook and stop the phone application.

    :param target_phone: Target phone to be initialized.
    :type target_phone: VoiceClient
    """
    dev = target_phone
    try:
        dev.on_hook()
    except Exception:  # pylint: disable=broad-except  # noqa: BLE001  # BOARDFARM-4982
        _LOGGER.warning(colored("Cannot put phone onhook", color="yellow"))
    dev.phone_kill()

stop_and_start_sip_server

stop_and_start_sip_server(sip_proxy: VoiceServer) -> Generator[None, Any, None]

Stop and start the SIP server.

:yield: in between stopping and starting the SIP server

Parameters:

Name Type Description Default

sip_proxy

VoiceServer

The SIP server to be restarted

required
Source code in boardfarm3/use_cases/voice.py
805
806
807
808
809
810
811
812
813
814
815
816
817
818
@contextmanager
def stop_and_start_sip_server(sip_proxy: VoiceServer) -> Generator[None, Any, None]:
    """Stop and start the SIP server.

    :param sip_proxy: The SIP server to be restarted
    :type sip_proxy: VoiceServer
    :yield: in between stopping and starting the SIP server
    :rtype: Generator[None, Any, None]
    """
    try:
        sip_proxy.stop()
        yield
    finally:
        sip_proxy.start()

tcpdump

tcpdump(dev: VoiceServer, fname: str, filters: str = '') -> Generator[str]

Start packet capture using tcpdump and kills the process at the end.

.. hint:: This Use Case implements statements from the test suite such as:

- Start the packets capture to be able to analyze SIP/RTP/RTCP protocol

:yield: process id

Parameters:

Name Type Description Default

dev

VoiceServer

device object for a VoiceServer

required

fname

str

name of the pcap file to which the capture will be stored

required

filters

str

additional filters for capture, defaults to ""

''
Source code in boardfarm3/use_cases/voice.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@contextmanager
def tcpdump(
    dev: VoiceServer,
    fname: str,
    filters: str = "",
) -> Generator[str]:
    """Start packet capture using tcpdump and kills the process at the end.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start the packets capture to be able to analyze SIP/RTP/RTCP protocol

    :param dev: device object for a VoiceServer
    :type dev: VoiceServer
    :param fname: name of the pcap file to which the capture will be stored
    :type fname: str
    :param filters: additional filters for capture, defaults to ""
    :type filters: str
    :yield: process id
    :rtype: Generator[str, None, None]
    """
    with dev.tcpdump_capture(
        fname=fname,
        interface=dev.iface_dut,
        additional_args=f"-s0 {filters}",
    ) as pid:
        yield pid

toggle_call

toggle_call(who_toggles: VoiceClient) -> None

Toggle between the calls.

.. hint:: This Use Case implements statements from the test suite such as:

- (User) presses R2 button

Need to first validate, there is an incoming call on other line. If not throw an exception.

Parameters:

Name Type Description Default

who_toggles

VoiceClient

SIP agent who is suppose to toggle the call

required
Source code in boardfarm3/use_cases/voice.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def toggle_call(who_toggles: VoiceClient) -> None:
    """Toggle between the calls.

    .. hint:: This Use Case implements statements from the test suite such as:

        - (User) presses R2 button

    Need to first validate, there is an incoming call on other line.
    If not throw an exception.

    :param who_toggles: SIP agent who is suppose to toggle the call
    :type who_toggles: VoiceClient
    """
    who_toggles.toggle_call()

wifi

Wi-Fi Use Cases library.

All APIs are independent of board under test.

Functions:

Name Description
check_and_connect_to_wifi

Check if specific Wi-Fi is enabled and try to connect appropriate client.

connect_wifi_client

Connect client to Wi-Fi.

disconnect_wifi_client

Disconnect client from Wi-Fi.

enable_and_disable_monitor_mode

Enable and disbale monitor mode.

get_bssid

Get the Wi-Fi Basic Service Set Identifier.

get_passphrase

Get the Wi-Fi passphrase.

get_ssid

Get the Wi-Fi SSID.

get_wifi_clients

Return list of wlan_devices based on filters network and band type.

is_wifi_connected

Get the state of the interface.

list_wifi_ssid

Return the list of Wi-Fi SSIDs.

scan_ssid_name

Scan for the particular SSID based on the network type and band.

wifi_check_ssid

Check the SSID provided is present in the scan list.

check_and_connect_to_wifi

check_and_connect_to_wifi(who_to_connect: WLAN, cpe: CPE) -> None

Check if specific Wi-Fi is enabled and try to connect appropriate client.

.. hint:: This Use Case implements statements from the test suite such as:

- Check if specific Wi-Fi is enabled and try to connect appropriate client.

Parameters:

Name Type Description Default

who_to_connect

WLAN

Wi-Fi client device

required

cpe

CPE

the CPE to connect to

required
Source code in boardfarm3/use_cases/wifi.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def check_and_connect_to_wifi(who_to_connect: WLAN, cpe: CPE) -> None:
    """Check if specific Wi-Fi is enabled and try to connect appropriate client.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Check if specific Wi-Fi is enabled and try to connect appropriate client.

    :param who_to_connect: Wi-Fi client device
    :type who_to_connect: WLAN
    :param cpe: the CPE to connect to
    :type cpe: CPE
    """
    wifi = cpe.sw.wifi
    ssid, bssid, passphrase = wifi.enable_wifi(
        who_to_connect.network,
        who_to_connect.band,
    )
    # Connect appropriate client to the network
    who_to_connect.wifi_client_connect(
        ssid_name=ssid,
        password=passphrase,
        bssid=bssid,
        security_mode=who_to_connect.authentication,
    )

connect_wifi_client

connect_wifi_client(
    who_to_connect: WLAN,
    cpe: CPE,
    ssid: str | None = None,
    password: str | None = None,
    bssid: str | None = None,
) -> None

Connect client to Wi-Fi.

.. hint:: This Use Case implements statements from the test suite such as:

- Verify that the connection to 2.4GHz private Wi-Fi SSID is successful.

Parameters:

Name Type Description Default

who_to_connect

WLAN

client to connect

required

cpe

CPE

the CPE to connect to

required

ssid

str | None

SSID of the network to connect

None

password

str | None

password of the network to connect

None

bssid

str | None

BSSID of the network to connect

None
Source code in boardfarm3/use_cases/wifi.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def connect_wifi_client(
    who_to_connect: WLAN,
    cpe: CPE,
    ssid: str | None = None,
    password: str | None = None,
    bssid: str | None = None,
) -> None:
    """Connect client to Wi-Fi.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Verify that the connection to 2.4GHz private Wi-Fi SSID is successful.

    :param who_to_connect: client to connect
    :type who_to_connect: WLAN
    :param cpe: the CPE to connect to
    :type cpe: CPE
    :param ssid: SSID of the network to connect
    :type ssid: str | None
    :param password: password of the network to connect
    :type password: str | None
    :param bssid: BSSID of the network to connect
    :type bssid: str | None
    """
    ssid = (
        ssid if ssid else get_ssid(who_to_connect.network, who_to_connect.band, cpe=cpe)
    )
    bssid = (
        bssid
        if bssid
        else get_bssid(who_to_connect.network, who_to_connect.band, cpe=cpe)
    )
    password = password if password else get_passphrase(who_to_connect.network, cpe=cpe)
    who_to_connect.wifi_client_connect(
        ssid_name=ssid,
        password=password,
        security_mode=who_to_connect.authentication,
        bssid=bssid,
    )

disconnect_wifi_client

disconnect_wifi_client(who_to_disconnect: WLAN) -> None

Disconnect client from Wi-Fi.

.. hint:: This Use Case implements statements from the test suite such as:

- Disconnect the client from 5GHz Wi-Fi network
- Disconnect the client from 2.4GHz Wi-Fi network

Parameters:

Name Type Description Default

who_to_disconnect

WLAN

client to disconnect

required
Source code in boardfarm3/use_cases/wifi.py
159
160
161
162
163
164
165
166
167
168
169
170
def disconnect_wifi_client(who_to_disconnect: WLAN) -> None:
    """Disconnect client from Wi-Fi.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Disconnect the client from 5GHz Wi-Fi network
        - Disconnect the client from 2.4GHz Wi-Fi network

    :param who_to_disconnect: client to disconnect
    :type who_to_disconnect: WLAN
    """
    who_to_disconnect.wifi_disconnect()

enable_and_disable_monitor_mode

enable_and_disable_monitor_mode(device: WLAN) -> Generator[None, Any, None]

Enable and disbale monitor mode.

:yield: in between enabling and disabling monitor mode

Parameters:

Name Type Description Default

device

WLAN

device instance

required
Source code in boardfarm3/use_cases/wifi.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
@contextmanager
def enable_and_disable_monitor_mode(device: WLAN) -> Generator[None, Any, None]:
    """Enable and disbale monitor mode.

    :param device: device instance
    :type device: WLAN
    :yield: in between enabling and disabling monitor mode
    :rtype: Generator[None, Any, None]
    """
    try:
        device.enable_monitor_mode()
        yield
    finally:
        device.disable_monitor_mode()

get_bssid

get_bssid(network: str, band: str, cpe: CPE, mode: str = 'console') -> str | None

Get the Wi-Fi Basic Service Set Identifier.

.. hint:: This Use Case implements statements from the test suite such as:

- Get the Wi-Fi Basic Service Set Identifier.

Parameters:

Name Type Description Default

network

str

network type of the client E.g: private, guest, community

required

band

str

band of the client

required

cpe

CPE

the CPE that is beaming the SSID

required

mode

str

mode to get the BSSID. E.g. console (default), snmp, acs, dmcli

'console'

Returns:

Type Description
str | None

MAC physical address of the access point

Raises:

Type Description
NotImplementedError

not implemented for modes other than console

Source code in boardfarm3/use_cases/wifi.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def get_bssid(network: str, band: str, cpe: CPE, mode: str = "console") -> str | None:
    """Get the Wi-Fi Basic Service Set Identifier.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Get the Wi-Fi Basic Service Set Identifier.

    :param network: network type of the client E.g: private, guest, community
    :type network: str
    :param band: band of the client
    :type band: str
    :param cpe: the CPE that is beaming the SSID
    :type cpe: CPE
    :param mode: mode to get the BSSID. E.g. console (default), snmp, acs, dmcli
    :type mode: str
    :return: MAC physical address of the access point
    :rtype: str | None
    :raises NotImplementedError: not implemented for modes other than console
    """
    if mode == "console":
        return cpe.sw.wifi.get_bssid(network, band)
    msg = "Not implemented for modes other than console"
    raise NotImplementedError(msg)

get_passphrase

get_passphrase(network: str, cpe: CPE) -> str

Get the Wi-Fi passphrase.

.. hint:: This Use Case implements statements from the test suite such as:

- Get the Wi-Fi passphrase.

Parameters:

Name Type Description Default

network

str

network type of the client E.g: private, guest, community

required

cpe

CPE

the CPE that is beaming the SSID

required

Returns:

Type Description
str

encrypted password for a network

Source code in boardfarm3/use_cases/wifi.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_passphrase(network: str, cpe: CPE) -> str:
    """Get the Wi-Fi passphrase.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Get the Wi-Fi passphrase.

    :param network: network type of the client E.g: private, guest, community
    :type network: str
    :param cpe: the CPE that is beaming the SSID
    :type cpe: CPE
    :return: encrypted password for a network
    :rtype: str
    """
    iface = cpe.sw.wifi.wlan_ifaces[network]["5"]
    return cpe.sw.wifi.get_passphrase(iface=iface)

get_ssid

get_ssid(network: str, band: str, cpe: CPE, mode: str = 'console') -> str | None

Get the Wi-Fi SSID.

.. hint:: This Use Case implements statements from the test suite such as:

- Get the Wi-Fi SSID.

Parameters:

Name Type Description Default

network

str

network type of the client E.g: private, guest, community

required

band

str

Wi-Fi band of the client

required

cpe

CPE

the CPE that is beaming the SSID

required

mode

str

way to retrieve the information, defaults to "console"

'console'

Returns:

Type Description
str | None

SSID of the Wi-Fi for a given network type and band

Raises:

Type Description
NotImplementedError

not implemented for modes other than console

Source code in boardfarm3/use_cases/wifi.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get_ssid(
    network: str,
    band: str,
    cpe: CPE,
    mode: str = "console",
) -> str | None:
    """Get the Wi-Fi SSID.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Get the Wi-Fi SSID.

    :param network: network type of the client E.g: private, guest, community
    :type network: str
    :param band: Wi-Fi band of the client
    :type band: str
    :param cpe: the CPE that is beaming the SSID
    :type cpe: CPE
    :param mode: way to retrieve the information, defaults to "console"
    :param mode: mode to get the SSID E.g. snmp, acs, dmcli, console (default)
    :type mode: str
    :raises NotImplementedError: not implemented for modes other than console
    :return: SSID of the Wi-Fi for a given network type and band
    :rtype: str | None
    """
    if mode == "console":
        return cpe.sw.wifi.get_ssid(network, band)
    msg = "Not implemented for modes other than console"
    raise NotImplementedError(msg)

get_wifi_clients

get_wifi_clients(network: str, band: str) -> list[WLAN]

Return list of wlan_devices based on filters network and band type.

.. hint:: This Use Case implements statements from the test suite such as:

- Return list of wlan_devices based on filters network and band type.

Parameters:

Name Type Description Default

network

str

network type of the client Eg: private, guest, community

required

band

str

band of the client in GHz

required

Returns:

Type Description
list[WLAN]

list of WLAN devices

Source code in boardfarm3/use_cases/wifi.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def get_wifi_clients(network: str, band: str) -> list[WLAN]:
    """Return list of wlan_devices based on filters network and band type.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Return list of wlan_devices based on filters network and band type.

    :param network: network type of the client Eg: private, guest, community
    :type network: str
    :param band: band of the client in GHz
    :type band: str
    :return: list of WLAN devices
    :rtype: list[WLAN]
    """
    wifi_devices = get_device_manager().get_devices_by_type(
        WLAN,  # type: ignore[type-abstract]
    )
    return [
        device
        for device in wifi_devices.values()
        if device.network == network and device.band == band
    ]

is_wifi_connected

is_wifi_connected(device: WLAN) -> bool

Get the state of the interface.

Parameters:

Name Type Description Default

device

WLAN

device instance

required

Returns:

Type Description
bool

True if connected

Source code in boardfarm3/use_cases/wifi.py
249
250
251
252
253
254
255
256
257
def is_wifi_connected(device: WLAN) -> bool:
    """Get the state of the interface.

    :param device: device instance
    :type device: WLAN
    :return: True if connected
    :rtype: bool
    """
    return device.is_wlan_connected()

list_wifi_ssid

list_wifi_ssid(device: WLAN) -> list[str]

Return the list of Wi-Fi SSIDs.

.. hint:: This Use Case implements statements from the test suite such as:

- Return the list of Wi-Fi SSIDs.

Parameters:

Name Type Description Default

device

WLAN

WLAN device instance

required

Returns:

Type Description
list[str]

list of Wi-Fi SSIDs

Source code in boardfarm3/use_cases/wifi.py
199
200
201
202
203
204
205
206
207
208
209
210
211
def list_wifi_ssid(device: WLAN) -> list[str]:
    """Return the list of Wi-Fi SSIDs.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Return the list of Wi-Fi SSIDs.

    :param device: WLAN device instance
    :type device: WLAN
    :return: list of Wi-Fi SSIDs
    :rtype: list[str]
    """
    return device.list_wifi_ssids()

scan_ssid_name

scan_ssid_name(device: WLAN, cpe: CPE) -> bool

Scan for the particular SSID based on the network type and band.

.. hint:: This Use Case implements statements from the test suite such as:

- Scan for the particular SSID based on the network type and band.

Parameters:

Name Type Description Default

device

WLAN

WLAN device instance

required

cpe

CPE

the CPE to connect to

required

Returns:

Type Description
bool

true if the SSID is available, false otherwise

Source code in boardfarm3/use_cases/wifi.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def scan_ssid_name(device: WLAN, cpe: CPE) -> bool:
    """Scan for the particular SSID based on the network type and band.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Scan for the particular SSID based on the network type and band.

    :param device: WLAN device instance
    :type device: WLAN
    :param cpe: the CPE to connect to
    :type cpe: CPE
    :return: true if the SSID is available, false otherwise
    :rtype: bool
    """
    ssid_name = get_ssid(device.network, device.band, cpe=cpe)
    return wifi_check_ssid(device, ssid_name)

wifi_check_ssid

wifi_check_ssid(device: WLAN, ssid_name: str) -> bool

Check the SSID provided is present in the scan list.

.. hint:: This Use Case implements statements from the test suite such as:

- Check the SSID provided is present in the scan list.

Parameters:

Name Type Description Default

device

WLAN

WLAN device instance

required

ssid_name

str

SSID name to be verified

required

Returns:

Type Description
bool

true if the ssid_name is available, false otherwise

Source code in boardfarm3/use_cases/wifi.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def wifi_check_ssid(device: WLAN, ssid_name: str) -> bool:
    """Check the SSID provided is present in the scan list.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Check the SSID provided is present in the scan list.

    :param device: WLAN device instance
    :type device: WLAN
    :param ssid_name: SSID name to be verified
    :type ssid_name: str
    :return: true if the ssid_name is available, false otherwise
    :rtype: bool
    """
    return ssid_name in list_wifi_ssid(device)