Skip to content

lib

Boardfarm libs.

Modules:

Name Description
SNMPv2

SNMP v2 module for SNMP communication.

boardfarm_config

Boardfarm environment config module.

boardfarm_pexpect

Boardfarm pexpect session module.

connection_factory

Connection decider module.

connections

Boardfarm connections package.

cpe_sw

Common libraries of CPE Sw component.

custom_typing

Boardfarm3 custom type hints package.

dataclass

Boardfarm3 dataclasses.

device_manager

Boardfarm device manager.

dmcli

RDKB dmcli command line interface module.

docker_factory

Docker Factory v2 related libraries.

gui

Keep SonarQube happy.

hal

Hardware abstraction layers for different CPE components.

interactive_shell

Boardfarm interactive shell module.

mibs_compiler

MIBs to JSON compiler module.

multicast

Multicast library.

network_utils

Network utilities module.

networking

Boardfarm networking module.

odh

Boardfarm ODH client Package.

parsers

Linux command output parsers.

power

Power module.

python_executor

Python executor module.

regexlib

Useful regexes library.

shell_prompt

Boardfarm v3 base device shell prompts.

utils

Boardfarm common utilities module.

wrappers

Boardfarm decorators module.

SNMPv2

SNMP v2 module for SNMP communication.

Classes:

Name Description
SNMPv2

SNMP v2 module for SNMP communication.

SNMPv2

SNMP v2 module for SNMP communication.

Initialize SNMPv2.

Parameters:

Name Type Description Default

device

WAN

device instance

required

target_ip

str

target ip address

required

mibs_compiler

MibsCompiler

mibs compiler instance

required

Methods:

Name Description
snmpbulkget

Perform SNMP bulkget on the device with given arguments.

snmpget

Perform an snmpget with given arguments.

snmpset

Perform an snmpset with given arguments.

snmpwalk

Perform an snmpwalk with given arguments.

Source code in boardfarm3/lib/SNMPv2.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(
    self,
    device: WAN,
    target_ip: str,
    mibs_compiler: MibsCompiler,
) -> None:
    """Initialize SNMPv2.

    :param device: device instance
    :type device: WAN
    :param target_ip: target ip address
    :type target_ip: str
    :param mibs_compiler: mibs compiler instance
    :type mibs_compiler: MibsCompiler
    """
    self._device = device
    self._target_ip = target_ip
    self._mibs_compiler = mibs_compiler

snmpbulkget

snmpbulkget(
    mib_name: str,
    index: int | None = None,
    community: str = "private",
    non_repeaters: int = 0,
    max_repetitions: int = 10,
    retries: int = 3,
    timeout: int = 100,
    extra_args: str = "",
    cmd_timeout: int = 30,
) -> list[tuple[str, str, str]]

Perform SNMP bulkget on the device with given arguments.

Parameters:

Name Type Description Default
mib_name
str

mib name used to perform snmp

required
index
int | None

index used along with mib_name, defaults to None

None
community
str

SNMP Community string, defaults to "private"

'private'
non_repeaters
int

value treated as get request, defaults to 0

0
max_repetitions
int

value treated as get next operation, defaults to 10

10
retries
int

no.of time commands are executed on exception, defaults to 3

3
timeout
int

timeout in seconds, defaults to 100

100
extra_args
str

extra arguments to be passed in the command, defaults to ""

''
cmd_timeout
int

timeout to wait for command to give otuput

30

Returns:

Type Description
list[tuple[str, str, str]]

output of snmpbulkget command

Raises:

Type Description
SNMPError

when MIB is not available

Source code in boardfarm3/lib/SNMPv2.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def snmpbulkget(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    self,
    mib_name: str,
    index: int | None = None,
    community: str = "private",
    non_repeaters: int = 0,
    max_repetitions: int = 10,
    retries: int = 3,
    timeout: int = 100,
    extra_args: str = "",
    cmd_timeout: int = 30,
) -> list[tuple[str, str, str]]:
    """Perform SNMP bulkget on the device with given arguments.

    :param mib_name: mib name used to perform snmp
    :type mib_name: str
    :param index: index used along with mib_name, defaults to None
    :type index: int, optional
    :param community: SNMP Community string, defaults to "private"
    :type community: str
    :param non_repeaters: value treated as get request, defaults to 0
    :type non_repeaters: int
    :param max_repetitions: value treated as get next operation, defaults to 10
    :type max_repetitions: int
    :param retries:  no.of time commands are executed on exception, defaults to 3
    :type retries: int
    :param timeout: timeout in seconds, defaults to 100
    :type timeout: int
    :param extra_args: extra arguments to be passed in the command, defaults to ""
    :type extra_args: str
    :param cmd_timeout: timeout to wait for command to give otuput
    :type cmd_timeout: int
    :raises SNMPError: when MIB is not available
    :return: output of snmpbulkget command
    :rtype: list[tuple[str, str, str]]
    """
    if mib_name:
        try:
            oid = self._get_mib_oid(mib_name)
            if index:
                oid = f"{oid}.{index!s}"
        except (ValueError, SNMPError) as exception:
            msg = f"MIB not available, Error: {exception}"
            raise SNMPError(msg) from exception
    else:
        oid = ""
    cmd = (
        f"snmpbulkget -v2c -On{extra_args} -Cn{non_repeaters} -Cr{max_repetitions}"
        f" -c {community} -t {timeout} -r {retries} {self._target_ip} {oid}"
        if extra_args
        else f"snmpbulkget -v2c -Cn{non_repeaters} -Cr{max_repetitions}"
        f" -c {community} -t {timeout} -r {retries} {self._target_ip} {oid}"
    )
    return self._parse_snmpbulk_output(
        self._device.execute_snmp_command(cmd, timeout=cmd_timeout)
    )

snmpget

snmpget(
    mib_name: str,
    index: int = 0,
    community: str = "private",
    extra_args: str = "",
    timeout: int = 10,
    retries: int = 3,
    cmd_timeout: int = 30,
) -> tuple[str, str, str]

Perform an snmpget with given arguments.

Parameters:

Name Type Description Default
mib_name
str

mib name used to perform snmp

required
index
int

index used along with mib_name

0
community
str

SNMP Community string that allows access to DUT, defaults to 'private'

'private'
extra_args
str

Any extra arguments to be passed in the command

''
timeout
int

timeout in seconds

10
retries
int

the no. of time the commands are executed on exception/timeout

3
cmd_timeout
int

timeout to wait for command to give otuput

30

Returns:

Type Description
Tuple[str, str, str]

value, value type and complete output

Source code in boardfarm3/lib/SNMPv2.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def snmpget(  # noqa: PLR0913 # pylint: disable=too-many-arguments
    self,
    mib_name: str,
    index: int = 0,
    community: str = "private",
    extra_args: str = "",
    timeout: int = 10,
    retries: int = 3,
    cmd_timeout: int = 30,
) -> tuple[str, str, str]:
    """Perform an snmpget with given arguments.

    :param mib_name: mib name used to perform snmp
    :type mib_name: str
    :param index: index used along with mib_name
    :type index: int
    :param community: SNMP Community string that allows access to DUT,
                        defaults to 'private'
    :type community: str
    :param extra_args: Any extra arguments to be passed in the command
    :type extra_args: str
    :param timeout: timeout in seconds
    :type timeout: int
    :param retries: the no. of time the commands are executed on exception/timeout
    :type retries: int
    :param cmd_timeout: timeout to wait for command to give otuput
    :type cmd_timeout: int
    :return: value, value type and complete output
    :rtype: Tuple[str, str, str]
    """
    oid = self._get_mib_oid(mib_name) + f".{index!s}"
    output = self._run_snmp_command(
        "snmpget",
        community,
        oid,
        timeout,
        retries,
        extra_args=extra_args,
        cmd_timeout=cmd_timeout,
    )
    return self._parse_snmp_output(oid, output)

snmpset

snmpset(
    mib_name: str,
    value: str,
    stype: str,
    index: int = 0,
    community: str = "private",
    extra_args: str = "",
    timeout: int = 10,
    retries: int = 3,
    cmd_timeout: int = 30,
) -> tuple[str, str, str]

Perform an snmpset with given arguments.

Parameters:

Name Type Description Default
mib_name
str

mib name used to perform snmp

required
value
str

value to be set for the mib name

required
stype
str

defines the datatype of value to be set for mib_name stype: one of i, u, t, a, o, s, x, d, b i: INTEGER, u: unsigned INTEGER, t: TIMETICKS, a: IPADDRESS o: OBJID, s: STRING, x: HEX STRING, d: DECIMAL STRING, b: BITS U: unsigned int64, I: signed int64, F: float, D: double

required
index
int

index used along with mib_name

0
community
str

SNMP Community string that allows access to DUT, defaults to 'private'

'private'
extra_args
str

Any extra arguments to be passed in the command

''
timeout
int

timeout in seconds

10
retries
int

the no. of time the commands are executed on exception/timeout

3
cmd_timeout
int

timeout to wait for command to give otuput

30

Returns:

Type Description
Tuple[str, str, str]

value, value type and complete output

Source code in boardfarm3/lib/SNMPv2.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def snmpset(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    self,
    mib_name: str,
    value: str,
    stype: str,
    index: int = 0,
    community: str = "private",
    extra_args: str = "",
    timeout: int = 10,
    retries: int = 3,
    cmd_timeout: int = 30,
) -> tuple[str, str, str]:
    """Perform an snmpset with given arguments.

    :param mib_name: mib name used to perform snmp
    :type mib_name: str
    :param value: value to be set for the mib name
    :type value: str
    :param stype: defines the datatype of value to be set for mib_name
                    stype: one of i, u, t, a, o, s, x, d, b
                    i: INTEGER, u: unsigned INTEGER, t: TIMETICKS, a: IPADDRESS
                    o: OBJID, s: STRING, x: HEX STRING, d: DECIMAL STRING, b: BITS
                    U: unsigned int64, I: signed int64, F: float, D: double
    :type stype: str
    :param index: index used along with mib_name
    :type index: int
    :param community: SNMP Community string that allows access to DUT,
                    defaults to 'private'
    :type community: str
    :param extra_args: Any extra arguments to be passed in the command
    :type extra_args: str
    :param timeout: timeout in seconds
    :type timeout: int
    :param retries: the no. of time the commands are executed on exception/timeout
    :type retries: int
    :param cmd_timeout: timeout to wait for command to give otuput
    :type cmd_timeout: int
    :return: value, value type and complete output
    :rtype: Tuple[str, str, str]
    """
    oid = self._get_mib_oid(mib_name) + f".{index!s}"
    if re.findall(r"\s", value.strip()) and stype == "s":
        value = f"{value!r}"
    if str(value).lower().startswith("0x"):
        set_value = f"{stype} '{value[2:].upper()}'"
    else:
        set_value = f"{stype} '{value}'"
    output = self._run_snmp_command(
        "snmpset",
        community,
        oid,
        timeout,
        retries,
        set_value=set_value,
        extra_args=extra_args,
        cmd_timeout=cmd_timeout,
    )
    return self._parse_snmp_output(oid, output, value)

snmpwalk

snmpwalk(
    mib_name: str,
    index: int | None = None,
    community: str = "private",
    retries: int = 3,
    timeout: int = 100,
    extra_args: str = "",
    cmd_timeout: int = 30,
) -> tuple[dict[str, list[str]], str]

Perform an snmpwalk with given arguments.

Parameters:

Name Type Description Default
mib_name
str

mib name used to perform snmp

required
index
int | None

index used along with mib_name

None
community
str

SNMP Community string that allows access to DUT, defaults to 'private'

'private'
retries
int

the no. of time the commands are executed on exception/timeout

3
timeout
int

timeout in seconds

100
extra_args
str

Any extra arguments to be passed in the command

''
cmd_timeout
int

timeout to wait for command to give otuput

30

Returns:

Type Description
Tuple[Dict[str, List[str]], str]

dictionary of mib_oid as key and list(value, type) as value and complete output

Raises:

Type Description
SNMPError

no matching output

Source code in boardfarm3/lib/SNMPv2.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def snmpwalk(  # noqa: PLR0913 # pylint: disable=too-many-arguments
    self,
    mib_name: str,
    index: int | None = None,
    community: str = "private",
    retries: int = 3,
    timeout: int = 100,
    extra_args: str = "",
    cmd_timeout: int = 30,
) -> tuple[dict[str, list[str]], str]:
    """Perform an snmpwalk with given arguments.

    :param mib_name: mib name used to perform snmp
    :type mib_name: str
    :param index: index used along with mib_name
    :type index: int
    :param community: SNMP Community string that allows access to DUT,
                        defaults to 'private'
    :type community: str
    :param retries: the no. of time the commands are executed on exception/timeout
    :type retries: int
    :param timeout: timeout in seconds
    :type timeout: int
    :param extra_args: Any extra arguments to be passed in the command
    :type extra_args: str
    :param cmd_timeout: timeout to wait for command to give otuput
    :type cmd_timeout: int
    :return: dictionary of mib_oid as key and list(value, type) as value
                        and complete output
    :rtype: Tuple[Dict[str, List[str]], str]
    :raises SNMPError: no matching output
    """
    if mib_name:
        try:
            oid = self._get_mib_oid(mib_name)
            if index:
                oid = oid + f".{index!s}"
        except (ValueError, SNMPError) as exception:
            msg = f"MIB not available, Error: {exception}"
            raise SNMPError(msg) from exception
    else:
        oid = ""
    output = self._run_snmp_command(
        "snmpwalk",
        community,
        oid,
        timeout,
        retries,
        extra_args=extra_args,
        cmd_timeout=cmd_timeout,
    )
    return self._parse_snmpwalk_output(oid, output)

boardfarm_config

Boardfarm environment config module.

Classes:

Name Description
BoardfarmConfig

Boardfarm environment config.

Functions:

Name Description
get_inventory_config

Return inventory config based on given arguments.

get_json

Get the inventory json either from a URL or a system path.

parse_boardfarm_config

Get environment config from given json files.

BoardfarmConfig

BoardfarmConfig(
    merged_config: list[dict],
    env_config: dict[str, Any],
    inventory_config: dict[str, Any],
)

Boardfarm environment config.

Initialize boardfarm config.

Parameters:

Name Type Description Default

merged_config

list[dict]

merged devices config

required

env_config

dict[str, Any]

environment configuration

required

inventory_config

dict[str, Any]

inventory configuration

required

Methods:

Name Description
get_board_model

Return the env config ["environment_def"]["board"]["model"].

get_board_sku

Return the env config ["environment_def"]["board"]["SKU"] value.

get_board_station_number

Get station number of the board.

get_device_config

Get device merged config.

get_devices_config

Get merged devices config.

get_prov_mode

Return the provisioning mode of the DUT.

Attributes:

Name Type Description
env_config dict[str, Any]

Environment config dictionary.

inventory_config dict[str, Any]

Inventory config dictionary.

resource_name str

Resource name.

Source code in boardfarm3/lib/boardfarm_config.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(
    self,
    merged_config: list[dict],
    env_config: dict[str, Any],
    inventory_config: dict[str, Any],
):
    """Initialize boardfarm config.

    :param merged_config: merged devices config
    :param env_config: environment configuration
    :param inventory_config: inventory configuration
    """
    self._env_config = env_config
    self._inventory_config = inventory_config
    self._merged_devices_config = merged_config

env_config property

env_config: dict[str, Any]

Environment config dictionary.

Returns:

Type Description
dict[str, Any]

env config response

inventory_config property

inventory_config: dict[str, Any]

Inventory config dictionary.

Returns:

Type Description
dict[str, Any]

inventory config response

resource_name cached property

resource_name: str

Resource name.

Returns:

Type Description
str

resource name of the board

get_board_model

get_board_model() -> str

Return the env config ["environment_def"]["board"]["model"].

Returns:

Type Description
str

Board model

Raises:

Type Description
EnvConfigError

when given model is unknown

Source code in boardfarm3/lib/boardfarm_config.py
102
103
104
105
106
107
108
109
110
111
112
113
114
def get_board_model(self) -> str:
    """Return the env config ["environment_def"]["board"]["model"].

    :return: Board model
    :raises EnvConfigError: when given model is unknown
    """
    try:
        return self.env_config["environment_def"]["board"]["model"]
    except (KeyError, AttributeError) as e:
        msg = "Unable to find board.model entry in env config."
        raise EnvConfigError(
            msg,
        ) from e

get_board_sku

get_board_sku() -> str

Return the env config ["environment_def"]["board"]["SKU"] value.

Returns:

Type Description
str

SKU value

Raises:

Type Description
EnvConfigError

when given sku is unknown

Source code in boardfarm3/lib/boardfarm_config.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_board_sku(self) -> str:
    """Return the env config ["environment_def"]["board"]["SKU"] value.

    :return: SKU value
    :raises EnvConfigError: when given sku is unknown
    """
    try:
        return self.env_config["environment_def"]["board"]["SKU"]
    except (KeyError, AttributeError) as e:
        msg = "Board SKU is not found in env config."
        raise EnvConfigError(msg) from e

get_board_station_number

get_board_station_number() -> int

Get station number of the board.

Returns:

Type Description
int

station number

Source code in boardfarm3/lib/boardfarm_config.py
63
64
65
66
67
68
def get_board_station_number(self) -> int:
    """Get station number of the board.

    :returns: station number
    """
    return int(self.resource_name.split("-")[-1])

get_device_config

get_device_config(device_name: str) -> dict[str, Any]

Get device merged config.

Parameters:

Name Type Description Default
device_name
str

device name

required

Returns:

Type Description
dict[str, Any]

merged device config

Raises:

Type Description
EnvConfigError

when given device name is unknown

Source code in boardfarm3/lib/boardfarm_config.py
77
78
79
80
81
82
83
84
85
86
87
88
def get_device_config(self, device_name: str) -> dict[str, Any]:
    """Get device merged config.

    :param device_name: device name
    :returns: merged device config
    :raises EnvConfigError: when given device name is unknown
    """
    for device_config in self._merged_devices_config:
        if device_config.get("name") == device_name:
            return device_config
    msg = f"{device_name} - Unknown device name"
    raise EnvConfigError(msg)

get_devices_config

get_devices_config() -> list[dict]

Get merged devices config.

Returns:

Type Description
list[dict]

merged devices config

Source code in boardfarm3/lib/boardfarm_config.py
70
71
72
73
74
75
def get_devices_config(self) -> list[dict]:
    """Get merged devices config.

    :returns: merged devices config
    """
    return self._merged_devices_config

get_prov_mode

get_prov_mode() -> str

Return the provisioning mode of the DUT.

Returns:

Type Description
str

ipv4, ipv6, dslite, dualstack, disabled

Raises:

Type Description
EnvConfigError

when given sku is unknown

Source code in boardfarm3/lib/boardfarm_config.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_prov_mode(self) -> str:
    """Return the provisioning mode of the DUT.

    :return: ipv4, ipv6, dslite, dualstack, disabled
    :raises EnvConfigError: when given sku is unknown
    """
    try:
        return self.env_config["environment_def"]["board"][
            "eRouter_Provisioning_mode"
        ]
    except (KeyError, AttributeError) as e:
        msg = "Unable to find eRouter_Provisioning_mode entry in env config."
        raise EnvConfigError(
            msg,
        ) from e

get_inventory_config

get_inventory_config(resource_name: str, inventory_json_path: str) -> dict[str, Any]

Return inventory config based on given arguments.

Parameters:

Name Type Description Default

resource_name

str

inventory resource name

required

inventory_json_path

str

inventory json config path

required

Returns:

Type Description
dict[str, Any]

inventory configuration

Raises:

Type Description
EnvConfigError

on resource name not found in inventory config

EnvConfigError

on invalid location config

Source code in boardfarm3/lib/boardfarm_config.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def get_inventory_config(
    resource_name: str,
    inventory_json_path: str,
) -> dict[str, Any]:
    """Return inventory config based on given arguments.

    :param resource_name: inventory resource name
    :type resource_name: str
    :param inventory_json_path: inventory json config path
    :type inventory_json_path: str
    :raises EnvConfigError: on resource name not found in inventory config
    :raises EnvConfigError: on invalid location config
    :return: inventory configuration
    :rtype: dict[str, Any]
    """
    full_inventory_config = get_json(inventory_json_path)
    if resource_name not in full_inventory_config:
        msg = f"{resource_name!r} resource not found in inventory config"
        raise EnvConfigError(
            msg,
        )
    inventory_config = full_inventory_config.get(resource_name)
    if "location" in inventory_config:
        if locations := full_inventory_config.get(
            "locations",
            {},
        ):  # optional, lab dependent
            inventory_config["devices"] += locations[
                inventory_config.pop("location")
            ].get("devices", [])
        else:
            msg = f"{inventory_config['location']!r} invalid location config"
            raise EnvConfigError(msg)
    for device in inventory_config.get("devices", []):
        if device["name"] == "board":
            device["resource_name"] = resource_name
            break
    return inventory_config

get_json

get_json(resource_name: str) -> dict[str, Any]

Get the inventory json either from a URL or a system path.

Parameters:

Name Type Description Default

resource_name

str

inventory resource name

required

Returns:

Type Description
dict[str, Any]

the inventory json from the specified path

Source code in boardfarm3/lib/boardfarm_config.py
196
197
198
199
200
201
202
203
204
205
206
207
208
def get_json(resource_name: str) -> dict[str, Any]:
    """Get the inventory json either from a URL or a system path.

    :param resource_name: inventory resource name
    :type resource_name: str
    :return: the inventory json from the specified path
    :rtype: dict[str, Any]
    """
    if resource_name.startswith(("http://", "https://")):
        json_dict = requests.get(resource_name, timeout=30).text
    else:
        json_dict = Path(resource_name).read_text(encoding="utf-8")
    return cast("dict[str, Any]", json.loads(json_dict))

parse_boardfarm_config

parse_boardfarm_config(
    inventory_config: dict[str, Any], env_json_config: dict[str, Any]
) -> BoardfarmConfig

Get environment config from given json files.

Parameters:

Name Type Description Default

inventory_config

dict[str, Any]

inventory config

required

env_json_config

dict[str, Any]

environment config

required

Returns:

Type Description
BoardfarmConfig

boardfarm config instance

Source code in boardfarm3/lib/boardfarm_config.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def parse_boardfarm_config(
    inventory_config: dict[str, Any],
    env_json_config: dict[str, Any],
) -> BoardfarmConfig:
    """Get environment config from given json files.

    :param inventory_config: inventory config
    :type inventory_config: dict[str, Any]
    :param env_json_config: environment config
    :type env_json_config: dict[str, Any]
    :return: boardfarm config instance
    :rtype: BoardfarmConfig
    """
    # disable jsonmerge debug logs
    logging.getLogger("jsonmerge").setLevel(logging.WARNING)
    wifi_devices = [
        device
        for device in inventory_config["devices"]
        if device["type"] in ["bf_wlan", "debian_wifi"]
    ]
    lan_devices = [
        device
        for device in inventory_config["devices"]
        if device["type"] in ["bf_lan", "debian_lan"]
    ]
    other_devices = [
        device
        for device in inventory_config["devices"]
        if device not in wifi_devices + lan_devices
    ]
    merged_devices_config = []
    environment_def = env_json_config.get("environment_def")
    for device in other_devices:
        device_name = device.get("name")
        merged_devices_config.append(
            (
                jsonmerge.merge(device, environment_def[device_name])
                if device_name in environment_def
                else device
            ),
        )
    merged_devices_config += _merge_with_lan_config(lan_devices, env_json_config)
    merged_devices_config += _merge_with_wifi_config(wifi_devices, env_json_config)
    return BoardfarmConfig(merged_devices_config, env_json_config, inventory_config)

boardfarm_pexpect

Boardfarm pexpect session module.

Classes:

Name Description
BoardfarmPexpect

Boardfarm pexpect session.

BoardfarmPexpect

BoardfarmPexpect(
    session_name: str,
    command: str,
    save_console_logs: str,
    args: list[str],
    **kwargs: dict[str, Any],
)

Boardfarm pexpect session.

Initialize boardfarm pexpect.

Parameters:

Name Type Description Default

session_name

str

pexpect session name

required

command

str

command to start pexpect session

required

save_console_logs

str

save console logs to the disk

required

args

list[str]

additional arguments to the command

required

kwargs

dict[str, Any]

may contains the 'env' for pexpect, optional

{}

Methods:

Name Description
execute_command

Execute a command in the pexpect session.

get_last_output

Get last output from the buffer.

start_interactive_session

Start interactive pexpect session.

Source code in boardfarm3/lib/boardfarm_pexpect.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def __init__(
    self,
    session_name: str,
    command: str,
    save_console_logs: str,
    args: list[str],
    **kwargs: dict[str, Any],
) -> None:
    """Initialize boardfarm pexpect.

    :param session_name: pexpect session name
    :type session_name: str
    :param command: command to start pexpect session
    :type command: str
    :param save_console_logs: save console logs to the disk
    :type save_console_logs: str
    :param args: additional arguments to the command
    :type args: list[str]
    :param kwargs: may contains the 'env' for pexpect, optional
    :type kwargs: dict[str, Any]
    """
    kwargs.setdefault("env", {"PATH": os.getenv("PATH"), "TERM": "dumb"})
    super().__init__(
        command=command,
        args=list(args),
        encoding="utf-8",
        dimensions=(24, 240),
        codec_errors="ignore",
        env=kwargs.get("env"),
    )
    self._configure_logging(session_name, save_console_logs)

execute_command abstractmethod

execute_command(command: str, timeout: int = -1) -> str

Execute a command in the pexpect session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. Defaults to -1

-1

Returns:

Type Description
str

output of given command execution

Source code in boardfarm3/lib/boardfarm_pexpect.py
124
125
126
127
128
129
130
131
132
@abstractmethod
def execute_command(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the pexpect session.

    :param command: command to execute
    :param timeout: timeout in seconds. Defaults to -1
    :returns: output of given command execution
    """
    raise NotImplementedError

get_last_output

get_last_output() -> str

Get last output from the buffer.

Returns:

Type Description
str

last output from the buffer

Source code in boardfarm3/lib/boardfarm_pexpect.py
117
118
119
120
121
122
def get_last_output(self) -> str:
    """Get last output from the buffer.

    :returns: last output from the buffer
    """
    return self.before.strip()

start_interactive_session

start_interactive_session() -> None

Start interactive pexpect session.

Source code in boardfarm3/lib/boardfarm_pexpect.py
134
135
136
137
def start_interactive_session(self) -> None:
    """Start interactive pexpect session."""
    with disable_logs("pexpect"):
        self.interact()

connection_factory

Connection decider module.

Functions:

Name Description
connection_factory

Return connection of given type.

connection_factory

connection_factory(
    connection_type: str, connection_name: str, **kwargs: Any
) -> BoardfarmPexpect

Return connection of given type.

Parameters:

Name Type Description Default

connection_type

str

type of the connection

required

connection_name

str

name of the connection

required

kwargs

Any

arguments to the connection

{}

Returns:

Type Description
BoardfarmPexpect

BoardfarmPexpect: connection of given type

Raises:

Type Description
EnvConfigError

when given connection type is not supported

Source code in boardfarm3/lib/connection_factory.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def connection_factory(
    connection_type: str,
    connection_name: str,
    **kwargs: Any,  # noqa: ANN401
) -> BoardfarmPexpect:
    """Return connection of given type.

    :param connection_type: type of the connection
    :param connection_name: name of the connection
    :param kwargs: arguments to the connection
    :returns: BoardfarmPexpect: connection of given type
    :raises EnvConfigError: when given connection type is not supported
    """
    connection_dispatcher = {
        "ssh_connection": SSHConnection,
        "authenticated_ssh": SSHConnection,
        "ldap_authenticated_serial": LdapAuthenticatedSerial,
        "local_cmd": LocalCmd,
        "serial": SerialConnection,
        "ser2net": _ser2net_param_parser,
        "telnet": _telnet_param_parser,
    }
    connection_obj = connection_dispatcher.get(connection_type)
    if connection_obj is not None and callable(connection_obj):
        if connection_type == "ssh_connection":
            kwargs.pop("password")
        return connection_obj(connection_name, **kwargs)
    # Handle unsupported connection types
    msg = f"Unsupported connection type: {connection_type}"
    raise EnvConfigError(msg)

connections

Boardfarm connections package.

Modules:

Name Description
connect_and_run

Connect and run module.

ldap_authenticated_serial

SSH connection module.

local_cmd

Connect to a device with a local command.

ser2net_connection

ser2net connection module.

serial_connection

Connect to a device with a local serial command.

ssh_connection

SSH connection module.

telnet

Telnet connection module.

connect_and_run

Connect and run module.

Classes:

Name Description
RuntimeConnectable

Runtine connectable protocol class.

Functions:

Name Description
connect_and_run

Connect run and disconnect to console at runtime.

RuntimeConnectable

Runtine connectable protocol class.

Methods:

Name Description
connect_console

Connect to the console.

disconnect_console

Disconnect from the console.

is_console_connected

Get status of the connection.

connect_console
connect_console() -> None

Connect to the console.

Source code in boardfarm3/lib/connections/connect_and_run.py
16
17
def connect_console(self) -> None:
    """Connect to the console."""
disconnect_console
disconnect_console() -> None

Disconnect from the console.

Source code in boardfarm3/lib/connections/connect_and_run.py
19
20
def disconnect_console(self) -> None:
    """Disconnect from the console."""
is_console_connected
is_console_connected() -> bool

Get status of the connection.

Source code in boardfarm3/lib/connections/connect_and_run.py
22
23
def is_console_connected(self) -> bool:
    """Get status of the connection."""

connect_and_run

connect_and_run(func: Callable[P, T]) -> Callable[P, T]

Connect run and disconnect to console at runtime.

Note: This is implemented only for instance methods

Parameters:

Name Type Description Default
func
Callable[P, T]

the decorated method

required

Returns:

Type Description
Callable[P, T]

True or False

Source code in boardfarm3/lib/connections/connect_and_run.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def connect_and_run(func: Callable[P, T]) -> Callable[P, T]:
    """Connect run and disconnect to console at runtime.

    Note: This is implemented only for instance methods

    :param func: the decorated method
    :return: True or False
    :rtype: Callable[P, T]
    """

    def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
        instance = args[0]
        exc_to_raise = None
        conn_exception = None
        if not isinstance(instance, RuntimeConnectable):
            msg = (
                f"Provided instance {instance} do not ,"
                f"follows the protocol RuntimeConnectable .i.e {RuntimeConnectable}"
            )
            raise TypeError(msg)

        if not instance.is_console_connected():
            # Adding a retry
            for _ in range(10):
                try:
                    instance.connect_console()
                    break
                except DeviceConnectionError as exc:
                    conn_exception = exc
                    sleep(15)
            else:
                raise conn_exception

        try:
            output = func(*args, **kwargs)
        except Exception as e:  # noqa: BLE001 pylint: disable=W0718
            exc_to_raise = e
        finally:
            if instance.is_console_connected():
                instance.disconnect_console()
        if exc_to_raise:
            raise exc_to_raise
        return output

    return wrapper

ldap_authenticated_serial

SSH connection module.

Classes:

Name Description
LdapAuthenticatedSerial

Connect to a serial with ldap credentials.

LdapAuthenticatedSerial

LdapAuthenticatedSerial(
    name: str,
    ip_addr: str,
    ldap_credentials: str,
    shell_prompt: list[str],
    port: int = 22,
    save_console_logs: str = "",
    **kwargs: dict[str, Any],
)

Connect to a serial with ldap credentials.

Initialize ldap authenticated serial connection.

Parameters:

Name Type Description Default
name
str

connection name

required
ip_addr
str

server ip address

required
ldap_credentials
str

ldap credentials

required
shell_prompt
list[str]

shell prompt patterns

required
port
int

port number, defaults to 22

22
save_console_logs
str

save console logs to disk, defaults to ""

''
kwargs
dict[str, Any]

other keyword arguments

{}

Raises:

Type Description
ValueError

invalid LDAP credentials

Methods:

Name Description
check_output

Return an output of the command.

execute_command

Execute a command in the SSH session.

execute_command_async

Execute a command in the SSH session.

get_last_output

Get last output from the buffer.

login_to_server

Login to serial server.

login_to_server_async

Login to serial server.

start_interactive_session

Start interactive pexpect session.

sudo_sendline

Add sudo in the sendline if username is root.

Source code in boardfarm3/lib/connections/ldap_authenticated_serial.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(  # noqa: PLR0913
    self,
    name: str,
    ip_addr: str,
    ldap_credentials: str,
    shell_prompt: list[str],
    port: int = 22,
    save_console_logs: str = "",
    **kwargs: dict[str, Any],  # ignore other arguments  # noqa: ARG002
) -> None:
    """Initialize ldap authenticated serial connection.

    :param name: connection name
    :type name: str
    :param ip_addr: server ip address
    :type ip_addr: str
    :param ldap_credentials: ldap credentials
    :type ldap_credentials: str
    :param shell_prompt: shell prompt patterns
    :type shell_prompt: list[str]
    :param port: port number, defaults to 22
    :type port: int
    :param save_console_logs: save console logs to disk, defaults to ""
    :type save_console_logs: str
    :param kwargs: other keyword arguments
    :raises ValueError: invalid LDAP credentials
    """
    if ";" not in ldap_credentials:
        msg = "Invalid LDAP credentials"
        raise ValueError(msg)
    username, password = ldap_credentials.split(";")
    super().__init__(
        name,
        ip_addr,
        username,
        shell_prompt,
        port,
        password,
        save_console_logs,
    )
check_output
check_output(cmd: str, timeout: int = 30) -> str

Return an output of the command.

Parameters:

Name Type Description Default
cmd
str

command to execute

required
timeout
int

timeout for command execute, defaults to 30

30

Returns:

Type Description
str

command output

Raises:

Type Description
BoardfarmException

if command doesn't execute in specified timeout

Source code in boardfarm3/lib/connections/ssh_connection.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def check_output(self, cmd: str, timeout: int = 30) -> str:
    """Return an output of the command.

    :param cmd: command to execute
    :param timeout: timeout for command execute, defaults to 30
    :raises BoardfarmException: if command doesn't execute in specified timeout
    :return: command output
    """
    self.sendline("\n" + cmd)
    self.expect_exact(cmd, timeout=timeout)
    try:
        self.expect(self._shell_prompt, timeout=timeout)
    except Exception as e:
        self.sendcontrol("c")
        msg = (
            f"Command did not complete within {timeout} seconds. "
            f"{self.name} prompt was not seen."
        )
        raise BoardfarmException(
            msg,
        ) from e
    return str(self.before.strip())
execute_command
execute_command(command: str, timeout: int = -1) -> str

Execute a command in the SSH session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. defaults to -1

-1

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/ssh_connection.py
115
116
117
118
119
120
121
122
123
124
125
126
def execute_command(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the SSH session.

    :param command: command to execute
    :param timeout: timeout in seconds. defaults to -1
    :returns: command output
    """
    self.sendline(command)
    self.expect_exact(command)
    self.expect(self.linesep)
    self.expect(self._shell_prompt, timeout=timeout)
    return self.get_last_output()
execute_command_async async
execute_command_async(command: str, timeout: int = -1) -> str

Execute a command in the SSH session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. defaults to -1

-1

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/ssh_connection.py
128
129
130
131
132
133
134
135
136
137
138
139
async def execute_command_async(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the SSH session.

    :param command: command to execute
    :param timeout: timeout in seconds. defaults to -1
    :returns: command output
    """
    self.sendline(command)
    await self.expect_exact(command, async_=True)
    await self.expect(self.linesep, async_=True)
    await self.expect(self._shell_prompt, timeout=timeout, async_=True)
    return self.get_last_output()
get_last_output
get_last_output() -> str

Get last output from the buffer.

Returns:

Type Description
str

last output from the buffer

Source code in boardfarm3/lib/boardfarm_pexpect.py
117
118
119
120
121
122
def get_last_output(self) -> str:
    """Get last output from the buffer.

    :returns: last output from the buffer
    """
    return self.before.strip()
login_to_server
login_to_server(password: str | None = None) -> None

Login to serial server.

Parameters:

Name Type Description Default
password
str | None

LDAP password

None

Raises:

Type Description
DeviceConnectionError

failed to connect to device via serial

Source code in boardfarm3/lib/connections/ldap_authenticated_serial.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def login_to_server(self, password: str | None = None) -> None:
    """Login to serial server.

    :param password: LDAP password
    :type password: str
    :raises DeviceConnectionError: failed to connect to device via serial
    """
    if password is None:
        password = self._password
    if self.expect(["Password:", pexpect.EOF, pexpect.TIMEOUT]):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
    self.sendline(password)

    if (
        self.expect_exact(
            ["OpenGear Serial Server", pexpect.TIMEOUT, pexpect.EOF],
            timeout=10,
        )
        == _EOF_INDEX
    ):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
    # In case of SSH communication over different geological WAN:
    # The SSH channel does not start to display data post connection.
    # Instead the user needs to enter some key to refresh. e.g. ENTER
    # This is generally due to poor connection.
    # Providing a few input below and flushing the buffer after 5 sec.
    self.sendline()
    self.sendline()
    self.expect(pexpect.TIMEOUT, timeout=5)
login_to_server_async async
login_to_server_async(password: str | None = None) -> None

Login to serial server.

Parameters:

Name Type Description Default
password
str | None

LDAP password

None

Raises:

Type Description
DeviceConnectionError

failed to connect to device via serial

Source code in boardfarm3/lib/connections/ldap_authenticated_serial.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
async def login_to_server_async(self, password: str | None = None) -> None:
    """Login to serial server.

    :param password: LDAP password
    :type password: str
    :raises DeviceConnectionError: failed to connect to device via serial
    """
    if password is None:
        password = self._password
    if await self.expect(["Password:", pexpect.EOF, pexpect.TIMEOUT], async_=True):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
    self.sendline(password)

    if (
        await self.expect_exact(
            ["OpenGear Serial Server", pexpect.TIMEOUT, pexpect.EOF],
            timeout=10,
            async_=True,
        )
        == _EOF_INDEX
    ):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
    # In case of SSH communication over different geological WAN:
    # The SSH channel does not start to display data post connection.
    # Instead the user needs to enter some key to refresh. e.g. ENTER
    # This is generally due to poor connection.
    # Providing a few input below and flushing the buffer after 5 sec.
    self.sendline()
    self.sendline()
    await self.expect(pexpect.TIMEOUT, timeout=5, async_=True)
start_interactive_session
start_interactive_session() -> None

Start interactive pexpect session.

Source code in boardfarm3/lib/boardfarm_pexpect.py
134
135
136
137
def start_interactive_session(self) -> None:
    """Start interactive pexpect session."""
    with disable_logs("pexpect"):
        self.interact()
sudo_sendline
sudo_sendline(cmd: str) -> None

Add sudo in the sendline if username is root.

Parameters:

Name Type Description Default
cmd
str

command to send

required
Source code in boardfarm3/lib/connections/ssh_connection.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def sudo_sendline(self, cmd: str) -> None:
    """Add sudo in the sendline if username is root.

    :param cmd: command to send
    """
    if self._username != "root":
        self.sendline("sudo true")
        password_requested = self.expect(
            [*self._shell_prompt, "password for .*:", "Password:"],
        )
        if password_requested:
            self.sendline(self._password)
            self.expect(self._shell_prompt)
        cmd = "sudo " + cmd
    self.sendline(cmd)

local_cmd

Connect to a device with a local command.

Classes:

Name Description
LocalCmd

Connect to a device with a local command.

LocalCmd

LocalCmd(
    name: str,
    conn_command: str,
    save_console_logs: str,
    shell_prompt: list[str] | None = None,
    args: list[str] | None = None,
    **kwargs: dict[str, Any],
)

Connect to a device with a local command.

Initialize local command connection.

Parameters:

Name Type Description Default
name
str

connection name

required
conn_command
str

command to start the session

required
save_console_logs
str

save console logs to disk

required
shell_prompt
list[str] | None

shell prompt pattern, defaults to None

None
args
list[str] | None

arguments to the command, defaults to None

None
kwargs
dict[str, Any]

additional keyword args

{}

Methods:

Name Description
execute_command

Execute a command in the local command session.

get_last_output

Get last output from the buffer.

login_to_server

Login.

start_interactive_session

Start interactive pexpect session.

Source code in boardfarm3/lib/connections/local_cmd.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(  # pylint: disable=too-many-arguments
    self,  # pylint: disable=unused-argument
    name: str,
    conn_command: str,
    save_console_logs: str,
    shell_prompt: list[str] | None = None,
    args: list[str] | None = None,
    **kwargs: dict[str, Any],
) -> None:
    """Initialize local command connection.

    :param name: connection name
    :type name: str
    :param conn_command: command to start the session
    :type conn_command: str
    :param save_console_logs: save console logs to disk
    :type save_console_logs: str
    :param shell_prompt: shell prompt pattern, defaults to None
    :type shell_prompt: list[str]
    :param args: arguments to the command, defaults to None
    :type args: list[str], optional
    :param kwargs: additional keyword args
    """
    self._shell_prompt = shell_prompt
    if args is None:
        args = []
    super().__init__(
        name,
        conn_command,
        save_console_logs,
        args,
        **kwargs,
    )
execute_command
execute_command(command: str, timeout: int = -1) -> str

Execute a command in the local command session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. defaults to -1

-1

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/local_cmd.py
83
84
85
86
87
88
89
90
91
92
93
94
95
def execute_command(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the local command session.

    :param command: command to execute
    :param timeout: timeout in seconds. defaults to -1
    :returns: command output
    """
    self.sendline(command)
    self.expect_exact(command)
    self.expect(self.linesep)
    # TODO: is this needed? is the shell prompt of Local(Jenkins or any user)?
    self.expect(self._shell_prompt, timeout=timeout)
    return self.get_last_output()
get_last_output
get_last_output() -> str

Get last output from the buffer.

Returns:

Type Description
str

last output from the buffer

Source code in boardfarm3/lib/boardfarm_pexpect.py
117
118
119
120
121
122
def get_last_output(self) -> str:
    """Get last output from the buffer.

    :returns: last output from the buffer
    """
    return self.before.strip()
login_to_server
login_to_server(password: str | None = None) -> None

Login.

Parameters:

Name Type Description Default
password
str | None

ssh password

None

Raises:

Type Description
DeviceConnectionError

connection failed via local command

ValueError

if shell prompt is unavailable

Source code in boardfarm3/lib/connections/local_cmd.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def login_to_server(self, password: str | None = None) -> None:
    """Login.

    :param password: ssh password
    :raises DeviceConnectionError: connection failed via local command
    :raises ValueError: if shell prompt is unavailable
    """
    if password is not None:
        if self.expect(
            ["password:", pexpect.EOF, pexpect.TIMEOUT],
        ):
            raise DeviceConnectionError(_CONNECTION_FAILED_STR)
        self.sendline(password)
    # TODO: temp fix for now. To be decided if shell prompt to be used.
    if not self._shell_prompt:
        raise ValueError(_SHELL_PROMPT_UNAVAILABLE_STR)
    if (
        self.expect(
            [
                pexpect.EOF,
                pexpect.TIMEOUT,
                *self._shell_prompt,
            ],
        )
        < _CONNECTION_ERROR_THRESHOLD
    ):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
start_interactive_session
start_interactive_session() -> None

Start interactive pexpect session.

Source code in boardfarm3/lib/boardfarm_pexpect.py
134
135
136
137
def start_interactive_session(self) -> None:
    """Start interactive pexpect session."""
    with disable_logs("pexpect"):
        self.interact()

ser2net_connection

ser2net connection module.

Classes:

Name Description
Ser2NetConnection

Allow telnet session to be established with the ser2net daemon.

Ser2NetConnection

Ser2NetConnection(
    session_name: str, command: str, save_console_logs: str, args: list[str]
)

Allow telnet session to be established with the ser2net daemon.

Requires the ser2net daemon to be running. Configuration to be stored in /etc/ser2net.conf Several devices can be connected to a host without the need for a terminal server. The following is a sample configuration for a single console:

2001:telnet:0:/dev/ttyUSB0:115200 NONE 1STOPBIT 8DATABITS XONXOFF banner max-connections=1

The telnet:0 disables the timeout on the telnet session. No authentication needed.

Initialize the Ser2Net connection.

Parameters:

Name Type Description Default
session_name
str

pexpect session name

required
command
str

command to start the pexpect session

required
save_console_logs
str

save console logs to disk

required
args
list[str]

additional arguments to the command

required

Methods:

Name Description
close

Close the connection.

execute_command

Execute a command in the Telnet session.

execute_command_async

Execute a command in the Telnet session.

get_last_output

Get last output from the buffer.

login_to_server

Login to Ser2Net server.

login_to_server_async

Login to Ser2Net server using asyncio.

start_interactive_session

Start interactive pexpect session.

Source code in boardfarm3/lib/connections/ser2net_connection.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    session_name: str,
    command: str,
    save_console_logs: str,
    args: list[str],
) -> None:
    """Initialize the Ser2Net connection.

    :param session_name: pexpect session name
    :type session_name: str
    :param command: command to start the pexpect session
    :type command: str
    :param save_console_logs: save console logs to disk
    :type save_console_logs: str
    :param args: additional arguments to the command
    :type args: list[str  |  list[str]]
    """
    self._ip_addr, self._port = args[0], args[1]
    super().__init__(
        session_name,
        command,
        save_console_logs,
        args,
    )
close
close(force: bool = True) -> None

Close the connection.

Parameters:

Name Type Description Default
force
bool

True to send a SIGKILL, False for SIGINT/UP, default True

True
Source code in boardfarm3/lib/connections/telnet.py
114
115
116
117
118
119
120
121
122
def close(self, force: bool = True) -> None:
    """Close the connection.

    :param force: True to send a SIGKILL, False for SIGINT/UP, default True
    :type force: bool
    """
    self.sendcontrol("]")
    self.sendline("q")
    super().close(force=force)
execute_command
execute_command(command: str, timeout: int = 30) -> str

Execute a command in the Telnet session.

Parameters:

Name Type Description Default
command
str

command to be executed

required
timeout
int

timeout for command execute, defaults to 30

30

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/telnet.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def execute_command(self, command: str, timeout: int = 30) -> str:
    """Execute a command in the Telnet session.

    :param command: command to be executed
    :type command: str
    :param timeout: timeout for command execute, defaults to 30
    :type timeout: int
    :return: command output
    :rtype: str
    """
    self.sendline(command)
    self.expect_exact(command)
    self.expect(self._shell_prompt, timeout=timeout)
    return self.get_last_output()
execute_command_async async
execute_command_async(command: str, timeout: int = -1) -> str

Execute a command in the Telnet session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. defaults to -1

-1

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/telnet.py
101
102
103
104
105
106
107
108
109
110
111
112
async def execute_command_async(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the Telnet session.

    :param command: command to execute
    :param timeout: timeout in seconds. defaults to -1
    :returns: command output
    """
    self.sendline(command)
    await self.expect_exact(command, async_=True)
    await self.expect(self.linesep, async_=True)
    await self.expect(self._shell_prompt, timeout=timeout, async_=True)
    return self.get_last_output()
get_last_output
get_last_output() -> str

Get last output from the buffer.

Returns:

Type Description
str

last output from the buffer

Source code in boardfarm3/lib/boardfarm_pexpect.py
117
118
119
120
121
122
def get_last_output(self) -> str:
    """Get last output from the buffer.

    :returns: last output from the buffer
    """
    return self.before.strip()
login_to_server
login_to_server(password: str | None = None) -> None

Login to Ser2Net server.

Parameters:

Name Type Description Default
password
str | None

Telnet password

None

Raises:

Type Description
DeviceConnectionError

connection failed to Telnet server

Source code in boardfarm3/lib/connections/ser2net_connection.py
72
73
74
75
76
77
78
79
80
81
82
83
84
def login_to_server(self, password: str | None = None) -> None:
    """Login to Ser2Net server.

    :param password: Telnet password
    :raises DeviceConnectionError: connection failed to Telnet server
    """
    super().login_to_server(password)
    if self.expect(
        [f"ser2net port.*{self._port}", pexpect.TIMEOUT],
        timeout=10,
    ):
        msg = f"ser2net: Failed to run 'telnet {self._ip_addr} {self._port}'"
        raise DeviceConnectionError(msg)
login_to_server_async async
login_to_server_async(password: str | None = None) -> None

Login to Ser2Net server using asyncio.

Parameters:

Name Type Description Default
password
str | None

Telnet password

None

Raises:

Type Description
DeviceConnectionError

connection failed to Telnet server

Source code in boardfarm3/lib/connections/ser2net_connection.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def login_to_server_async(self, password: str | None = None) -> None:
    """Login to Ser2Net server using asyncio.

    :param password: Telnet password
    :raises DeviceConnectionError: connection failed to Telnet server
    """
    await super().login_to_server_async(password)
    if (
        await self.expect(
            [f"ser2net port.*{self._port}", pexpect.TIMEOUT],
            timeout=10,
            async_=True,
        )
        == 1
    ):
        msg = f"ser2net: Failed to run 'telnet {self._ip_addr} {self._port}'"
        raise DeviceConnectionError(msg)
start_interactive_session
start_interactive_session() -> None

Start interactive pexpect session.

Source code in boardfarm3/lib/boardfarm_pexpect.py
134
135
136
137
def start_interactive_session(self) -> None:
    """Start interactive pexpect session."""
    with disable_logs("pexpect"):
        self.interact()

serial_connection

Connect to a device with a local serial command.

Basically a local command with no authentication on connection.

Classes:

Name Description
SerialConnection

Connect to a device with local serail command.

SerialConnection

SerialConnection(
    name: str,
    conn_command: str,
    save_console_logs: str,
    shell_prompt: list[str] | None = None,
    args: list[str] | None = None,
    **kwargs: dict[str, Any],
)

Connect to a device with local serail command.

No authentication needed. Just connect!

Initialize local command serial connection.

No authentication!

Parameters:

Name Type Description Default
name
str

connection name

required
conn_command
str

command to start the session

required
save_console_logs
str

save console logs to disk

required
shell_prompt
list[str] | None

shell prompt pattern, defaults to None

None
args
list[str] | None

arguments to the command, defaults to None

None
kwargs
dict[str, Any]

additional keyword args

{}

Raises:

Type Description
DeviceConnectionError

on connection failure

Methods:

Name Description
execute_command

Execute a command in the local command session.

get_last_output

Get last output from the buffer.

login_to_server

Do not do anything, just connect.

start_interactive_session

Start interactive pexpect session.

Source code in boardfarm3/lib/connections/serial_connection.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(  # pylint: disable=too-many-arguments
    self,  # pylint: disable=unused-argument
    name: str,
    conn_command: str,
    save_console_logs: str,
    shell_prompt: list[str] | None = None,
    args: list[str] | None = None,
    **kwargs: dict[str, Any],
) -> None:
    """Initialize local command serial connection.

    No authentication!

    :param name: connection name
    :type name: str
    :param conn_command: command to start the session
    :type conn_command: str
    :param save_console_logs: save console logs to disk
    :type save_console_logs: str
    :param shell_prompt: shell prompt pattern, defaults to None
    :type shell_prompt: list[str]
    :param args: arguments to the command, defaults to None
    :type args: list[str], optional
    :param kwargs: additional keyword args
    :raises DeviceConnectionError: on connection failure
    """
    if args is None:
        args = conn_command.split()
        conn_command = args.pop(0)
    if kwargs.get("env") is None:
        # some serial commands need a terminal that is not "dumb"
        kwargs["env"] = {
            "PATH": os.getenv("PATH"),
            "TERM": os.getenv("TERM") if os.getenv("TERM") else "xterm",
        }
    super().__init__(
        name, conn_command, save_console_logs, shell_prompt, args, **kwargs
    )
    try:
        self.expect("Terminal ready", 5)
    except EOF as exc:
        raise DeviceConnectionError(self.before) from exc
execute_command
execute_command(command: str, timeout: int = -1) -> str

Execute a command in the local command session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. defaults to -1

-1

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/serial_connection.py
73
74
75
76
77
78
79
80
81
82
83
84
85
def execute_command(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the local command session.

    :param command: command to execute
    :param timeout: timeout in seconds. defaults to -1
    :returns: command output
    """
    self.sendline(command)
    self.expect_exact(command)
    self.expect(self.linesep)
    # TODO: is this needed? is the shell prompt of Local(Jenkins or any user)?
    self.expect(self._shell_prompt, timeout=timeout)
    return self.get_last_output()
get_last_output
get_last_output() -> str

Get last output from the buffer.

Returns:

Type Description
str

last output from the buffer

Source code in boardfarm3/lib/boardfarm_pexpect.py
117
118
119
120
121
122
def get_last_output(self) -> str:
    """Get last output from the buffer.

    :returns: last output from the buffer
    """
    return self.before.strip()
login_to_server
login_to_server(password: str | None = None) -> None

Do not do anything, just connect.

Parameters:

Name Type Description Default
password
str | None

unused

None
Source code in boardfarm3/lib/connections/serial_connection.py
67
68
69
70
71
def login_to_server(self, password: str | None = None) -> None:
    """Do not do anything, just connect.

    :param password: unused
    """
start_interactive_session
start_interactive_session() -> None

Start interactive pexpect session.

Source code in boardfarm3/lib/boardfarm_pexpect.py
134
135
136
137
def start_interactive_session(self) -> None:
    """Start interactive pexpect session."""
    with disable_logs("pexpect"):
        self.interact()

ssh_connection

SSH connection module.

Classes:

Name Description
SSHConnection

Connect to a device via SSH.

SSHConnection

SSHConnection(
    name: str,
    ip_addr: str,
    username: str,
    shell_prompt: list[str],
    port: int = 22,
    password: str | None = None,
    save_console_logs: str = "",
    **kwargs: dict[str, Any],
)

Connect to a device via SSH.

Initialize SSH connection.

Parameters:

Name Type Description Default
name
str

connection name

required
ip_addr
str

ip address

required
username
str

ssh username

required
shell_prompt
list[str]

shell prompt pattern

required
port
int

port number, defaults to 22

22
password
str | None

password, defaults to None

None
save_console_logs
str

save console logs, defaults to ""

''
kwargs
dict[str, Any]

other keyword arguments

{}

Methods:

Name Description
check_output

Return an output of the command.

execute_command

Execute a command in the SSH session.

execute_command_async

Execute a command in the SSH session.

get_last_output

Get last output from the buffer.

login_to_server

Login to SSH session.

login_to_server_async

Login to SSH session.

start_interactive_session

Start interactive pexpect session.

sudo_sendline

Add sudo in the sendline if username is root.

Source code in boardfarm3/lib/connections/ssh_connection.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    self,  # pylint: disable=unused-argument
    name: str,
    ip_addr: str,
    username: str,
    shell_prompt: list[str],
    port: int = 22,
    password: str | None = None,
    save_console_logs: str = "",
    **kwargs: dict[str, Any],  # ignore other arguments  # noqa: ARG002
) -> None:
    """Initialize SSH connection.

    :param name: connection name
    :type name: str
    :param ip_addr: ip address
    :type ip_addr: str
    :param username: ssh username
    :type username: str
    :param shell_prompt: shell prompt pattern
    :type shell_prompt: list[str]
    :param port: port number, defaults to 22
    :type port: int
    :param password: password, defaults to None
    :type password: str
    :param save_console_logs: save console logs, defaults to ""
    :type save_console_logs: str
    :param kwargs: other keyword arguments
    """
    self._shell_prompt = shell_prompt
    self._username = username
    self._password = password
    args = [
        f"{username}@{ip_addr}",
        f"-p {port}",
        "-o StrictHostKeyChecking=no",
        "-o UserKnownHostsFile=/dev/null",
        "-o ServerAliveInterval=60",
        "-o ServerAliveCountMax=10",
        "-o IdentitiesOnly=yes",
        "-o HostKeyAlgorithms=+ssh-rsa",
    ]
    super().__init__(name, "ssh", save_console_logs, args)
check_output
check_output(cmd: str, timeout: int = 30) -> str

Return an output of the command.

Parameters:

Name Type Description Default
cmd
str

command to execute

required
timeout
int

timeout for command execute, defaults to 30

30

Returns:

Type Description
str

command output

Raises:

Type Description
BoardfarmException

if command doesn't execute in specified timeout

Source code in boardfarm3/lib/connections/ssh_connection.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def check_output(self, cmd: str, timeout: int = 30) -> str:
    """Return an output of the command.

    :param cmd: command to execute
    :param timeout: timeout for command execute, defaults to 30
    :raises BoardfarmException: if command doesn't execute in specified timeout
    :return: command output
    """
    self.sendline("\n" + cmd)
    self.expect_exact(cmd, timeout=timeout)
    try:
        self.expect(self._shell_prompt, timeout=timeout)
    except Exception as e:
        self.sendcontrol("c")
        msg = (
            f"Command did not complete within {timeout} seconds. "
            f"{self.name} prompt was not seen."
        )
        raise BoardfarmException(
            msg,
        ) from e
    return str(self.before.strip())
execute_command
execute_command(command: str, timeout: int = -1) -> str

Execute a command in the SSH session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. defaults to -1

-1

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/ssh_connection.py
115
116
117
118
119
120
121
122
123
124
125
126
def execute_command(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the SSH session.

    :param command: command to execute
    :param timeout: timeout in seconds. defaults to -1
    :returns: command output
    """
    self.sendline(command)
    self.expect_exact(command)
    self.expect(self.linesep)
    self.expect(self._shell_prompt, timeout=timeout)
    return self.get_last_output()
execute_command_async async
execute_command_async(command: str, timeout: int = -1) -> str

Execute a command in the SSH session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. defaults to -1

-1

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/ssh_connection.py
128
129
130
131
132
133
134
135
136
137
138
139
async def execute_command_async(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the SSH session.

    :param command: command to execute
    :param timeout: timeout in seconds. defaults to -1
    :returns: command output
    """
    self.sendline(command)
    await self.expect_exact(command, async_=True)
    await self.expect(self.linesep, async_=True)
    await self.expect(self._shell_prompt, timeout=timeout, async_=True)
    return self.get_last_output()
get_last_output
get_last_output() -> str

Get last output from the buffer.

Returns:

Type Description
str

last output from the buffer

Source code in boardfarm3/lib/boardfarm_pexpect.py
117
118
119
120
121
122
def get_last_output(self) -> str:
    """Get last output from the buffer.

    :returns: last output from the buffer
    """
    return self.before.strip()
login_to_server
login_to_server(password: str | None = None) -> None

Login to SSH session.

Parameters:

Name Type Description Default
password
str | None

ssh password

None

Raises:

Type Description
DeviceConnectionError

connection failed to SSH server

Source code in boardfarm3/lib/connections/ssh_connection.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def login_to_server(self, password: str | None = None) -> None:
    """Login to SSH session.

    :param password: ssh password
    :raises DeviceConnectionError: connection failed to SSH server
    """
    if password is None:
        password = self._password
    if self.expect(
        ["password:", pexpect.EOF, pexpect.TIMEOUT],
    ):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
    self.sendline(password)
    if (
        self.expect(
            [
                pexpect.EOF,
                pexpect.TIMEOUT,
                *self._shell_prompt,
            ],
        )
        < _CONNECTION_ERROR_THRESHOLD
    ):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
login_to_server_async async
login_to_server_async(password: str | None = None) -> None

Login to SSH session.

Parameters:

Name Type Description Default
password
str | None

ssh password

None

Raises:

Type Description
DeviceConnectionError

connection failed to SSH server

Source code in boardfarm3/lib/connections/ssh_connection.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def login_to_server_async(self, password: str | None = None) -> None:
    """Login to SSH session.

    :param password: ssh password
    :raises DeviceConnectionError: connection failed to SSH server
    """
    if password is None:
        password = self._password
    if await self.expect(
        ["password:", pexpect.EOF, pexpect.TIMEOUT],
        async_=True,
    ):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
    self.sendline(password)
    if (
        await self.expect(
            [
                pexpect.EOF,
                pexpect.TIMEOUT,
                *self._shell_prompt,
            ],
            async_=True,
        )
        < _CONNECTION_ERROR_THRESHOLD
    ):
        raise DeviceConnectionError(_CONNECTION_FAILED_STR)
start_interactive_session
start_interactive_session() -> None

Start interactive pexpect session.

Source code in boardfarm3/lib/boardfarm_pexpect.py
134
135
136
137
def start_interactive_session(self) -> None:
    """Start interactive pexpect session."""
    with disable_logs("pexpect"):
        self.interact()
sudo_sendline
sudo_sendline(cmd: str) -> None

Add sudo in the sendline if username is root.

Parameters:

Name Type Description Default
cmd
str

command to send

required
Source code in boardfarm3/lib/connections/ssh_connection.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def sudo_sendline(self, cmd: str) -> None:
    """Add sudo in the sendline if username is root.

    :param cmd: command to send
    """
    if self._username != "root":
        self.sendline("sudo true")
        password_requested = self.expect(
            [*self._shell_prompt, "password for .*:", "Password:"],
        )
        if password_requested:
            self.sendline(self._password)
            self.expect(self._shell_prompt)
        cmd = "sudo " + cmd
    self.sendline(cmd)

telnet

Telnet connection module.

Classes:

Name Description
TelnetConnection

A simple telnet session.

TelnetConnection

TelnetConnection(
    session_name: str, command: str, save_console_logs: str, args: list[str]
)

A simple telnet session.

Initialize the Ser2Net connection.

Parameters:

Name Type Description Default
session_name
str

pexpect session name

required
command
str

command to start the pexpect session

required
save_console_logs
str

save console logs to disk

required
args
list[str]

additional arguments to the command

required

Methods:

Name Description
close

Close the connection.

execute_command

Execute a command in the Telnet session.

execute_command_async

Execute a command in the Telnet session.

get_last_output

Get last output from the buffer.

login_to_server

Login to Telnet server.

login_to_server_async

Login to Telnet seerver using asyncio.

start_interactive_session

Start interactive pexpect session.

Source code in boardfarm3/lib/connections/telnet.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    session_name: str,
    command: str,
    save_console_logs: str,
    args: list[str],
) -> None:
    """Initialize the Ser2Net connection.

    :param session_name: pexpect session name
    :type session_name: str
    :param command: command to start the pexpect session
    :type command: str
    :param save_console_logs: save console logs to disk
    :type save_console_logs: str
    :param args: additional arguments to the command
    :type args: list[str]
    """
    self._ip_addr, self._port, self._shell_prompt = args[0], args[1], args.pop(2)
    super().__init__(
        session_name=session_name,
        command=command,
        save_console_logs=save_console_logs,
        args=args,
    )
close
close(force: bool = True) -> None

Close the connection.

Parameters:

Name Type Description Default
force
bool

True to send a SIGKILL, False for SIGINT/UP, default True

True
Source code in boardfarm3/lib/connections/telnet.py
114
115
116
117
118
119
120
121
122
def close(self, force: bool = True) -> None:
    """Close the connection.

    :param force: True to send a SIGKILL, False for SIGINT/UP, default True
    :type force: bool
    """
    self.sendcontrol("]")
    self.sendline("q")
    super().close(force=force)
execute_command
execute_command(command: str, timeout: int = 30) -> str

Execute a command in the Telnet session.

Parameters:

Name Type Description Default
command
str

command to be executed

required
timeout
int

timeout for command execute, defaults to 30

30

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/telnet.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def execute_command(self, command: str, timeout: int = 30) -> str:
    """Execute a command in the Telnet session.

    :param command: command to be executed
    :type command: str
    :param timeout: timeout for command execute, defaults to 30
    :type timeout: int
    :return: command output
    :rtype: str
    """
    self.sendline(command)
    self.expect_exact(command)
    self.expect(self._shell_prompt, timeout=timeout)
    return self.get_last_output()
execute_command_async async
execute_command_async(command: str, timeout: int = -1) -> str

Execute a command in the Telnet session.

Parameters:

Name Type Description Default
command
str

command to execute

required
timeout
int

timeout in seconds. defaults to -1

-1

Returns:

Type Description
str

command output

Source code in boardfarm3/lib/connections/telnet.py
101
102
103
104
105
106
107
108
109
110
111
112
async def execute_command_async(self, command: str, timeout: int = -1) -> str:
    """Execute a command in the Telnet session.

    :param command: command to execute
    :param timeout: timeout in seconds. defaults to -1
    :returns: command output
    """
    self.sendline(command)
    await self.expect_exact(command, async_=True)
    await self.expect(self.linesep, async_=True)
    await self.expect(self._shell_prompt, timeout=timeout, async_=True)
    return self.get_last_output()
get_last_output
get_last_output() -> str

Get last output from the buffer.

Returns:

Type Description
str

last output from the buffer

Source code in boardfarm3/lib/boardfarm_pexpect.py
117
118
119
120
121
122
def get_last_output(self) -> str:
    """Get last output from the buffer.

    :returns: last output from the buffer
    """
    return self.before.strip()
login_to_server
login_to_server(password: str | None = None) -> None

Login to Telnet server.

Parameters:

Name Type Description Default
password
str | None

Telnet password

None

Raises:

Type Description
DeviceConnectionError

connection failed to Telnet server

Source code in boardfarm3/lib/connections/telnet.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def login_to_server(self, password: str | None = None) -> None:
    """Login to Telnet server.

    :param password: Telnet password
    :raises DeviceConnectionError: connection failed to Telnet server
    """
    if password is not None:
        msg = "Authenticated Telnet not supported."
        raise DeviceConnectionError(msg)
    if self.expect(
        [
            f"Connected to {self._ip_addr}",
            "Escape character is '^]'.",
            pexpect.TIMEOUT,
        ],
        timeout=10,
    ):
        msg = f"Failed to run 'telnet {self._ip_addr} {self._port}'"
        raise DeviceConnectionError(
            msg,
        )
login_to_server_async async
login_to_server_async(password: str | None = None) -> None

Login to Telnet seerver using asyncio.

Parameters:

Name Type Description Default
password
str | None

Telnet password (currently unused)

None

Raises:

Type Description
DeviceConnectionError

connection failed to Telnet server

Source code in boardfarm3/lib/connections/telnet.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def login_to_server_async(self, password: str | None = None) -> None:
    """Login to Telnet seerver using asyncio.

    :param password: Telnet password (currently unused)
    :raises DeviceConnectionError: connection failed to Telnet server
    """
    if password is not None:
        msg = "Authenticated Telnet not supported."
        raise DeviceConnectionError(msg)
    if (
        await self.expect(
            [
                f"Connected to {self._ip_addr}",
                "Escape character is '^]'.",
                pexpect.TIMEOUT,
            ],
            timeout=10,
            async_=True,
        )
        > 1
    ):
        msg = f"Failed to run 'telnet {self._ip_addr} {self._port}'"
        raise DeviceConnectionError(msg)
start_interactive_session
start_interactive_session() -> None

Start interactive pexpect session.

Source code in boardfarm3/lib/boardfarm_pexpect.py
134
135
136
137
def start_interactive_session(self) -> None:
    """Start interactive pexpect session."""
    with disable_logs("pexpect"):
        self.interact()

cpe_sw

Common libraries of CPE Sw component.

Classes:

Name Description
CPESwLibraries

CPE SW common libraries.

CPESwLibraries

CPESwLibraries(hardware: CPEHW)

CPE SW common libraries.

Initialise the CPE software.

Parameters:

Name Type Description Default

hardware

CPEHW

cpe hardware instance

required

Methods:

Name Description
add_info_to_file

Add data into a file.

enable_logs

Enable logs for given component.

factory_reset

Perform factory reset CPE via given method.

finalize_boot

Validate board settings post boot.

get_board_logs

Get the console log for the time period mentioned.

get_boottime_log

Return the boot time log from the board.

get_date

Get the system date and time.

get_file_content

Get the content of the given file.

get_interface_ipv4_netmask

Return given interface IPv4 netmask.

get_interface_ipv4addr

Return given interface IPv4 address.

get_interface_ipv6addr

Return given interface IPv6 address.

get_interface_link_local_ipv6_addr

Return given interface link local IPv6 address.

get_interface_mac_addr

Return given interface mac address.

get_interface_mtu_size

Get the MTU size of the interface in bytes.

get_load_avg

Return current load average of the cable modem.

get_memory_utilization

Return current memory utilization of the cable modem.

get_ntp_sync_status

Execute ntpq command to get the synchronization status.

get_provision_mode

Return provision mode.

get_running_processes

Return the currently running processes in the CPE via the ps command.

get_seconds_uptime

Return uptime in seconds.

get_tr069_log

Return the TR-069 log from the board.

is_link_up

Check given interface is up or not.

is_online

Is the device online.

is_production

Is production software.

is_tr069_connected

Is TR-69 agent is connected.

kill_process_immediately

Kill the process based on the provided PID.

read_event_logs

Return the event logs from the logread command.

reset

Perform reset via given method.

set_date

Set the device's date and time.

verify_cpe_is_booting

Verify CPE is booting.

wait_for_boot

Wait for CPE to boot.

Attributes:

Name Type Description
aftr_iface str

AFTR interface name.

cpe_id str

TR069 CPE ID.

dmcli DMCLIAPI

Dmcli instance running in CPE Software.

erouter_iface str

e-Router interface name.

firewall IptablesFirewall

Firewall component of cpe software.

guest_iface str

Guest network interface name.

gui_password str

GUI login password.

json_values dict[str, Any]

CPE Specific JSON values.

lan_gateway_ipv4 IPv4Address

LAN Gateway IPv4 address.

lan_gateway_ipv6 IPv6Address

LAN Gateway IPv6 address.

lan_iface str

LAN interface name.

lan_network_ipv4 IPv4Network

LAN IPv4 network.

nw_utility NetworkUtility

Network utility component of cpe software.

tr69_cpe_id str

TR-69 CPE Identifier.

version str

CPE software version.

wifi WiFiHal

Wifi instance CPE Software.

Source code in boardfarm3/lib/cpe_sw.py
32
33
34
35
36
37
38
39
40
41
def __init__(self, hardware: CPEHW) -> None:
    """Initialise the CPE software.

    :param hardware: cpe hardware instance
    :type hardware: CPEHW
    """
    self._hw = hardware
    self._nw_utility = NetworkUtility(self._get_console("networking"))
    self._firewall = IptablesFirewall(self._get_console("networking"))
    self._dmcli = DMCLIAPI(hardware.get_console("console"))

aftr_iface abstractmethod property

aftr_iface: str

AFTR interface name.

cpe_id abstractmethod property

cpe_id: str

TR069 CPE ID.

dmcli property

dmcli: DMCLIAPI

Dmcli instance running in CPE Software.

Returns:

Type Description
DMCLIAPI

the object instance

erouter_iface abstractmethod property

erouter_iface: str

e-Router interface name.

firewall property

firewall: IptablesFirewall

Firewall component of cpe software.

Returns:

Type Description
IptablesFirewall

Network firewall libraries

guest_iface abstractmethod property

guest_iface: str

Guest network interface name.

gui_password abstractmethod property

gui_password: str

GUI login password.

json_values abstractmethod property

json_values: dict[str, Any]

CPE Specific JSON values.

lan_gateway_ipv4 abstractmethod property

lan_gateway_ipv4: IPv4Address

LAN Gateway IPv4 address.

lan_gateway_ipv6 property

lan_gateway_ipv6: IPv6Address

LAN Gateway IPv6 address.

Returns:

Type Description
IPv6Address

IPv6 address of LAN GW

lan_iface abstractmethod property

lan_iface: str

LAN interface name.

lan_network_ipv4 property

lan_network_ipv4: IPv4Network

LAN IPv4 network.

Returns:

Type Description
IPv4Network

LAN IPv4 network

nw_utility property

nw_utility: NetworkUtility

Network utility component of cpe software.

Returns:

Type Description
NetworkUtility

Network utility libraries

tr69_cpe_id abstractmethod property

tr69_cpe_id: str

TR-69 CPE Identifier.

version abstractmethod property

version: str

CPE software version.

This will reload after each flash.

wifi abstractmethod property

wifi: WiFiHal

Wifi instance CPE Software.

add_info_to_file

add_info_to_file(to_add: str, fname: str) -> None

Add data into a file.

Parameters:

Name Type Description Default
to_add
str

contents/data to be added to a file.

required
fname
str

filename with absolute path

required
Source code in boardfarm3/lib/cpe_sw.py
422
423
424
425
426
427
428
429
430
def add_info_to_file(self, to_add: str, fname: str) -> None:
    """Add data into a file.

    :param to_add: contents/data to be added to a file.
    :type to_add: str
    :param fname: filename with absolute path
    :type fname: str
    """
    self._get_console("default_shell").execute_command(f"echo {to_add} >> {fname}")

enable_logs

enable_logs(component: str, flag: str = 'enable') -> None

Enable logs for given component.

Parameters:

Name Type Description Default
component
str

component name

required
flag
str

flag name, Default: "enable"

'enable'

Raises:

Type Description
NotImplementedError

pending implementation

Source code in boardfarm3/lib/cpe_sw.py
272
273
274
275
276
277
278
279
def enable_logs(self, component: str, flag: str = "enable") -> None:
    """Enable logs for given component.

    :param component: component name
    :param flag: flag name, Default: "enable"
    :raises NotImplementedError: pending implementation
    """
    raise NotImplementedError

factory_reset abstractmethod

factory_reset(method: str | None = None) -> bool

Perform factory reset CPE via given method.

Parameters:

Name Type Description Default
method
str | None

factory reset method. Default None.

None

Returns:

Type Description
bool

True on successful factory reset, False otherwise

Source code in boardfarm3/templates/cpe/cpe_sw.py
139
140
141
142
143
144
145
146
147
@abstractmethod
def factory_reset(self, method: str | None = None) -> bool:
    """Perform factory reset CPE via given method.

    :param method: factory reset method. Default None.
    :type method: str | None
    :return: True on successful factory reset, False otherwise
    """
    raise NotImplementedError

finalize_boot abstractmethod

finalize_boot() -> bool

Validate board settings post boot.

Returns:

Type Description
bool

True on successful validation

Source code in boardfarm3/templates/cpe/cpe_sw.py
381
382
383
384
385
386
387
@abstractmethod
def finalize_boot(self) -> bool:
    """Validate board settings post boot.

    :return: True on successful validation
    """
    raise NotImplementedError

get_board_logs

get_board_logs(timeout: int = 300) -> str

Get the console log for the time period mentioned.

Parameters:

Name Type Description Default
timeout
int

time value to collect the logs for, defaults to 300

300

Returns:

Type Description
str

Console logs for a given time period

Raises:

Type Description
BoardfarmException

if the console is not initialised, i.e. board has no console connections

Source code in boardfarm3/lib/cpe_sw.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def get_board_logs(self, timeout: int = 300) -> str:
    """Get the console log for the time period mentioned.

    :param timeout: time value to collect the logs for, defaults to 300
    :type timeout: int
    :return: Console logs for a given time period
    :rtype: str
    :raises BoardfarmException: if the console is not initialised,
        i.e. board has no console connections
    """
    if self._console:
        self._console.sendline()
        self._console.expect(pexpect.TIMEOUT, timeout=timeout)
        return str(self._console.before)
    msg = "Console obj is not initialized"
    raise BoardfarmException(msg)

get_boottime_log

get_boottime_log() -> list[str]

Return the boot time log from the board.

Raises:

Type Description
NotImplementedError

pending implementation

Source code in boardfarm3/lib/cpe_sw.py
393
394
395
396
397
398
def get_boottime_log(self) -> list[str]:
    """Return the boot time log from the board.

    :raises NotImplementedError: pending implementation
    """
    raise NotImplementedError

get_date

get_date() -> str | None

Get the system date and time.

.. code-block:: python

# example output
donderdag, mei 23, 2024 14:23:39

Returns:

Type Description
str | None

date

Source code in boardfarm3/lib/cpe_sw.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def get_date(self) -> str | None:
    """Get the system date and time.

    .. code-block:: python

        # example output
        donderdag, mei 23, 2024 14:23:39


    :return: date
    :rtype: str | None
    """
    cpe_date = self._console.execute_command("date '+%A, %B %d, %Y %T'")
    date = re.search(
        r"(\w+,\s\w+\s\d+,\s\d+\s(([0-1]?[0-9])|(2[0-3])):[0-5][0-9]:[0-5][0-9])",
        cpe_date,
    )
    if date is not None:
        return date.group(0)
    return None

get_file_content

get_file_content(fname: str, timeout: int) -> str

Get the content of the given file.

Parameters:

Name Type Description Default
fname
str

name of the file with absolute path

required
timeout
int

timeout value to fetch the file content

required

Returns:

Type Description
str

contents of the file

Source code in boardfarm3/lib/cpe_sw.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def get_file_content(self, fname: str, timeout: int) -> str:
    """Get the content of the given file.

    :param fname: name of the file with absolute path
    :type fname: str
    :param timeout: timeout value to fetch the file content
    :type timeout: int
    :return: contents of the file
    :rtype: str
    """
    return self._get_console("default_shell").execute_command(
        f"cat {fname}",
        timeout,
    )

get_interface_ipv4_netmask

get_interface_ipv4_netmask(interface: str) -> IPv4Address

Return given interface IPv4 netmask.

Parameters:

Name Type Description Default
interface
str

name of the interface

required

Returns:

Type Description
IPv4Address

netmask of the interface

Source code in boardfarm3/lib/cpe_sw.py
185
186
187
188
189
190
191
192
193
194
195
196
197
def get_interface_ipv4_netmask(self, interface: str) -> IPv4Address:
    """Return given interface IPv4 netmask.

    :param interface: name of the interface
    :type interface: str
    :return: netmask of the interface
    :rtype: IPv4Address
    """
    output = self._get_console("networking").execute_command(
        f"ifconfig {interface}",
    )
    netmask = output.split("Mask:")[-1].split()[0]
    return IPv4Address(netmask)

get_interface_ipv4addr

get_interface_ipv4addr(interface: str) -> str

Return given interface IPv4 address.

Parameters:

Name Type Description Default
interface
str

interface name

required

Returns:

Type Description
str

IPv4 address

Raises:

Type Description
ValueError

if unable to parse IPv4Address

Source code in boardfarm3/lib/cpe_sw.py
156
157
158
159
160
161
162
163
164
165
166
167
def get_interface_ipv4addr(self, interface: str) -> str:
    """Return given interface IPv4 address.

    :param interface: interface name
    :return: IPv4 address
    :rtype: str
    :raises ValueError: if unable to parse IPv4Address
    """
    if ips := self._get_nw_interface_ip_address(interface, is_ipv6=False):
        return ips[0]
    msg = f"Failed to get IPv4 address of {interface} interface"
    raise ValueError(msg)

get_interface_ipv6addr

get_interface_ipv6addr(interface: str) -> str

Return given interface IPv6 address.

Parameters:

Name Type Description Default
interface
str

interface name

required

Returns:

Type Description
str

IPv6 address

Source code in boardfarm3/lib/cpe_sw.py
169
170
171
172
173
174
175
def get_interface_ipv6addr(self, interface: str) -> str:
    """Return given interface IPv6 address.

    :param interface: interface name
    :return: IPv6 address
    """
    return self._get_interface_ipv6_address(interface, "global")
get_interface_link_local_ipv6_addr(interface: str) -> str

Return given interface link local IPv6 address.

Parameters:

Name Type Description Default
str

interface name

required

Returns:

Type Description
str

link local IPv6 address

Source code in boardfarm3/lib/cpe_sw.py
177
178
179
180
181
182
183
def get_interface_link_local_ipv6_addr(self, interface: str) -> str:
    """Return given interface link local IPv6 address.

    :param interface: interface name
    :return: link local IPv6 address
    """
    return self._get_interface_ipv6_address(interface, address_type="link-local")

get_interface_mac_addr

get_interface_mac_addr(interface: str) -> str

Return given interface mac address.

Parameters:

Name Type Description Default
interface
str

interface name

required

Returns:

Type Description
str

mac address of the given interface

Source code in boardfarm3/lib/cpe_sw.py
215
216
217
218
219
220
221
222
223
224
225
def get_interface_mac_addr(self, interface: str) -> str:
    """Return given interface mac address.

    :param interface: interface name
    :return: mac address of the given interface
    """
    return (
        self._get_console("networking")
        .execute_command(f"cat /sys/class/net/{interface}/address")
        .strip()
    )

get_interface_mtu_size abstractmethod

get_interface_mtu_size(interface: str) -> int

Get the MTU size of the interface in bytes.

Parameters:

Name Type Description Default
interface
str

name of the interface

required

Returns:

Type Description
int

size of the MTU in bytes

Source code in boardfarm3/templates/cpe/cpe_sw.py
395
396
397
398
399
400
401
402
403
404
@abstractmethod
def get_interface_mtu_size(self, interface: str) -> int:
    """Get the MTU size of the interface in bytes.

    :param interface: name of the interface
    :type interface: str
    :return: size of the MTU in bytes
    :rtype: int
    """
    raise NotImplementedError

get_load_avg

get_load_avg() -> float

Return current load average of the cable modem.

Returns:

Type Description
float

current load average for the past minute

Source code in boardfarm3/lib/cpe_sw.py
234
235
236
237
238
239
240
241
242
243
244
def get_load_avg(self) -> float:
    """Return current load average of the cable modem.

    :return: current load average for the past minute
    :rtype: float
    """
    return float(
        self._get_console("default_shell")
        .execute_command(r"cat /proc/loadavg | cut -d\  -f1")
        .strip(),
    )

get_memory_utilization

get_memory_utilization() -> dict[str, int]

Return current memory utilization of the cable modem.

Returns:

Type Description
dict[str, int]

current memory utilization of cpe

Source code in boardfarm3/lib/cpe_sw.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def get_memory_utilization(self) -> dict[str, int]:
    """Return current memory utilization of the cable modem.

    :return: current memory utilization of cpe
    :rtype: dict[str, int]
    """
    memory_keys = []
    memory_val: list[str] = []
    result = self._get_console("default_shell").execute_command("free -m")
    if regex_match := re.search(
        r"Mem:\s+((\d+)(\s+)(\d+)(\s+)(\d+)(\s+)(\d+)(\s+)(\d+)(\s+)(\d+))",
        result,
    ):
        memory_keys = [
            "total",
            "used",
            "free",
            "shared",
            "cache",
            "available",
        ]
        memory_val = regex_match.group(1).strip().split()
    # ignoring pylint error as fixing it would cause mypy to complain
    # about unreachable code...
    return dict(zip(memory_keys, map(int, memory_val)))

get_ntp_sync_status

get_ntp_sync_status() -> list[dict[str, Any]]

Execute ntpq command to get the synchronization status.

Raises:

Type Description
NotImplementedError

pending implementation

Source code in boardfarm3/lib/cpe_sw.py
376
377
378
379
380
381
382
383
def get_ntp_sync_status(
    self,
) -> list[dict[str, Any]]:
    """Execute ntpq command to get the synchronization status.

    :raises NotImplementedError: pending implementation
    """
    raise NotImplementedError

get_provision_mode abstractmethod

get_provision_mode() -> str

Return provision mode.

Source code in boardfarm3/templates/cpe/cpe_sw.py
117
118
119
120
@abstractmethod
def get_provision_mode(self) -> str:
    """Return provision mode."""
    raise NotImplementedError

get_running_processes

get_running_processes(ps_options: str | None = '-A') -> Iterable[ParsedPSOutput]

Return the currently running processes in the CPE via the ps command.

.. code-block:: python

# parsed_ps_output
[
    {"pid": 1, "tty": None, "time": "00:02:29", "cmd": "init"},
    {"pid": 2, "tty": None, "time": "00:00:00", "cmd": "kthreadd"},
    {"pid": 3, "tty": None, "time": "00:00:36", "cmd": "ksoftirqd/0"},
    {"pid": 5, "tty": None, "time": "00:00:00", "cmd": "kworker/0:0H"},
    ...
    {'pid': 2613, 'tty': None, 'time': '00:03:25', 'cmd': 'CcspTr069PaSsp'}
    ...
]

Parameters:

Name Type Description Default
ps_options
str | None

The options to be passed to the ps command, defaults to "-A"

'-A'

Returns:

Type Description
Iterable[ParsedPSOutput]

the currently running processes as a parsed tuple of dictionaries

Source code in boardfarm3/lib/cpe_sw.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def get_running_processes(
    self, ps_options: str | None = "-A"
) -> Iterable[ParsedPSOutput]:
    """Return the currently running processes in the CPE via the `ps` command.

    .. code-block:: python

        # parsed_ps_output
        [
            {"pid": 1, "tty": None, "time": "00:02:29", "cmd": "init"},
            {"pid": 2, "tty": None, "time": "00:00:00", "cmd": "kthreadd"},
            {"pid": 3, "tty": None, "time": "00:00:36", "cmd": "ksoftirqd/0"},
            {"pid": 5, "tty": None, "time": "00:00:00", "cmd": "kworker/0:0H"},
            ...
            {'pid': 2613, 'tty': None, 'time': '00:03:25', 'cmd': 'CcspTr069PaSsp'}
            ...
        ]

    :param ps_options: The options to be passed to the ps command, defaults to "-A"
    :type ps_options: str | None
    :return: the currently running processes as a parsed tuple of dictionaries
    :rtype: Iterable[ParsedPSOutput]
    """
    return cast(
        "tuple[ParsedPSOutput]",
        parse(
            "ps",
            self._get_console("default_shell").execute_command(f"ps {ps_options}"),
        ),
    )

get_seconds_uptime

get_seconds_uptime() -> float

Return uptime in seconds.

Returns:

Type Description
float

uptime in seconds

Raises:

Type Description
ValueError

if failed to get uptime

Source code in boardfarm3/lib/cpe_sw.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_seconds_uptime(self) -> float:
    """Return uptime in seconds.

    :return: uptime in seconds
    :rtype: float
    :raises ValueError: if failed to get uptime
    """
    result = self._get_console("default_shell").execute_command("cat /proc/uptime")
    regex_match = re.search(r"((\d+)\.(\d{2}))(\s)(\d+)\.(\d{2})", result)
    if regex_match is None:
        msg = "Failed to get the uptime"
        raise ValueError(msg)
    return float(regex_match[1])

get_tr069_log

get_tr069_log() -> list[str]

Return the TR-069 log from the board.

Raises:

Type Description
NotImplementedError

pending implementation

Source code in boardfarm3/lib/cpe_sw.py
400
401
402
403
404
405
def get_tr069_log(self) -> list[str]:
    """Return the TR-069 log from the board.

    :raises NotImplementedError: pending implementation
    """
    raise NotImplementedError
is_link_up(interface: str, pattern: str = 'BROADCAST,MULTICAST,UP') -> bool

Check given interface is up or not.

Parameters:

Name Type Description Default
str

interface name, defaults to "BROADCAST,MULTICAST,UP"

required
str

interface state

'BROADCAST,MULTICAST,UP'

Returns:

Type Description
bool

True if the link is up

Source code in boardfarm3/lib/cpe_sw.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def is_link_up(
    self,
    interface: str,
    pattern: str = "BROADCAST,MULTICAST,UP",
) -> bool:
    """Check given interface is up or not.

    :param interface: interface name, defaults to "BROADCAST,MULTICAST,UP"
    :type interface: str
    :param pattern: interface state
    :type pattern: str
    :return: True if the link is up
    :rtype: bool
    """
    return is_link_up(self._get_console("networking"), interface, pattern)

is_online

is_online() -> bool

Is the device online.

Returns:

Type Description
bool

True if the device is online, False otherwise

Raises:

Type Description
ValueError

if erouter mode not in ["dual", "ipv4", "ipv6"]

Source code in boardfarm3/lib/cpe_sw.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def is_online(self) -> bool:
    """Is the device online.

    :return: True if the device is online, False otherwise
    :raises ValueError: if erouter mode not in ["dual", "ipv4", "ipv6"]
    """
    mode = self._hw.config.get("eRouter_Provisioning_mode", "dual")
    if mode not in ["dual", "ipv4", "ipv6"]:
        msg = f"Unsupported mode: {mode}"
        raise ValueError(msg)
    online: bool = True
    try:
        if mode in ["dual", "ipv4"]:
            online &= bool(self.get_interface_ipv4addr(self.erouter_iface))
        if mode in ["dual", "ipv6"]:
            online &= bool(self.get_interface_ipv6addr(self.erouter_iface))
    except ValueError:
        online = False
    return online

is_production abstractmethod

is_production() -> bool

Is production software.

Production software has limited capabilities.

Source code in boardfarm3/templates/cpe/cpe_sw.py
122
123
124
125
126
127
128
@abstractmethod
def is_production(self) -> bool:
    """Is production software.

    Production software has limited capabilities.
    """
    raise NotImplementedError

is_tr069_connected

is_tr069_connected() -> bool

Is TR-69 agent is connected.

Raises:

Type Description
NotImplementedError

pending implementation

Source code in boardfarm3/lib/cpe_sw.py
227
228
229
230
231
232
def is_tr069_connected(self) -> bool:
    """Is TR-69 agent is connected.

    :raises NotImplementedError: pending implementation
    """
    raise NotImplementedError

kill_process_immediately

kill_process_immediately(pid: int) -> None

Kill the process based on the provided PID.

Parameters:

Name Type Description Default
pid
int

process number

required
Source code in boardfarm3/lib/cpe_sw.py
385
386
387
388
389
390
391
def kill_process_immediately(self, pid: int) -> None:
    """Kill the process based on the provided PID.

    :param pid: process number
    :type pid: int
    """
    self._get_console("default_shell").execute_command(f"kill -9 {pid}")

read_event_logs

read_event_logs() -> JSONDictType | list[JSONDictType] | Iterator[JSONDictType]

Return the event logs from the logread command.

.. code-block:: python

# example output
[
    {
        "priority": None,
        "date": "May  8 02:28:26",
        "hostname": "mv1cbn-arm",
        "tag": "kern",
        "content": ".debug kernel: PP MC Session delete: failed for src...",
    },
    {
        "priority": None,
        "date": "May  8 02:42:46",
        "hostname": "mv1cbn-arm",
        "tag": "user",
        "content": ".debug MCPROXY: del mroute gaddr: 239.255.255.250, ...",
    },
    {
        "priority": None,
        "date": "May  8 02:42:56",
        "hostname": "mv1cbn-arm",
        "tag": "kern",
        "content": ".debug kernel: PP MC Session delete: failed for src...",
    },
    {
        "priority": None,
        "date": "May  8 02:57:16",
        "hostname": "mv1cbn-arm",
        "tag": "user",
        "content": ".debug MCPROXY: del mroute gaddr: 239.255.255.250, ...",
    },
]

Returns:

Type Description
JSONDictType | list[JSONDictType] | Iterator[JSONDictType]

the event logs from the logread command.

Source code in boardfarm3/lib/cpe_sw.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def read_event_logs(
    self,
) -> JSONDictType | list[JSONDictType] | Iterator[JSONDictType]:
    """Return the event logs from the `logread` command.

    .. code-block:: python

        # example output
        [
            {
                "priority": None,
                "date": "May  8 02:28:26",
                "hostname": "mv1cbn-arm",
                "tag": "kern",
                "content": ".debug kernel: PP MC Session delete: failed for src...",
            },
            {
                "priority": None,
                "date": "May  8 02:42:46",
                "hostname": "mv1cbn-arm",
                "tag": "user",
                "content": ".debug MCPROXY: del mroute gaddr: 239.255.255.250, ...",
            },
            {
                "priority": None,
                "date": "May  8 02:42:56",
                "hostname": "mv1cbn-arm",
                "tag": "kern",
                "content": ".debug kernel: PP MC Session delete: failed for src...",
            },
            {
                "priority": None,
                "date": "May  8 02:57:16",
                "hostname": "mv1cbn-arm",
                "tag": "user",
                "content": ".debug MCPROXY: del mroute gaddr: 239.255.255.250, ...",
            },
        ]

    :return: the event logs from the `logread` command.
    :rtype: JSONDictType | list[JSONDictType] | Iterator[JSONDictType]
    """
    return parse(
        "syslog-bsd",
        self._get_console("default_shell").execute_command("logread"),
    )

reset abstractmethod

reset(method: str | None = None) -> None

Perform reset via given method.

Parameters:

Name Type Description Default
method
str | None

reset method. Default None

None
Source code in boardfarm3/templates/cpe/cpe_sw.py
130
131
132
133
134
135
136
137
@abstractmethod
def reset(self, method: str | None = None) -> None:
    """Perform reset via given method.

    :param method: reset method. Default None
    :type method: str | None
    """
    raise NotImplementedError

set_date

set_date(date_string: str) -> bool

Set the device's date and time.

It should execute date -s {date_string} on the device's console.

Parameters:

Name Type Description Default
date_string
str

value to be changed

required

Returns:

Type Description
bool

True if set is successful

Source code in boardfarm3/lib/cpe_sw.py
453
454
455
456
457
458
459
460
461
462
463
464
def set_date(self, date_string: str) -> bool:
    """Set the device's date and time.

    It should execute `date -s {date_string}` on the device's console.

    :param date_string: value to be changed
    :type date_string: str
    :return: True if set is successful
    :rtype: bool
    """
    cmd_out = self._console.execute_command(f"date -s {date_string}")
    return bool(cmd_out)

verify_cpe_is_booting abstractmethod

verify_cpe_is_booting() -> None

Verify CPE is booting.

Source code in boardfarm3/templates/cpe/cpe_sw.py
112
113
114
115
@abstractmethod
def verify_cpe_is_booting(self) -> None:
    """Verify CPE is booting."""
    raise NotImplementedError

wait_for_boot abstractmethod

wait_for_boot() -> None

Wait for CPE to boot.

Source code in boardfarm3/templates/cpe/cpe_sw.py
149
150
151
152
@abstractmethod
def wait_for_boot(self) -> None:
    """Wait for CPE to boot."""
    raise NotImplementedError

custom_typing

Boardfarm3 custom type hints package.

Modules:

Name Description
cpe

Collection on Enum to be used.

dhcp

DHCP type hints for Provisioner Templates.

jc

Type hints for jc parsed output.

cpe

Collection on Enum to be used.

Classes:

Name Description
CPEInterfaces

Define all the interfaces on CPE.

HostInterfaces

Define all the interfaces on the Host device.

CPEInterfaces

Define all the interfaces on CPE.

The corresponding value should be available as an instance on board object.

HostInterfaces

Define all the interfaces on the Host device.

The corresponding value should be available as an instance on the device object.

dhcp

DHCP type hints for Provisioner Templates.

jc

Type hints for jc parsed output.

Classes:

Name Description
ParsedPSOutput

Typing for parsed output returned by jc for the ps command.

ParsedPSOutput

Typing for parsed output returned by jc for the ps command.

dataclass

Boardfarm3 dataclasses.

These are used to store/load information required by usecases.

Modules:

Name Description
dhcp

Dataclasses to store DHCP info.

interface

Data classes to store IP interface info.

network_models

Data classes to store all models related to network.

packets

Data classes to store network packets.

dhcp

Dataclasses to store DHCP info.

Classes:

Name Description
DHCPV6Options

DHCPV6Options data class.

DHCPV6TraceData

DHCPv6TraceData data class.

DHCPV6Options dataclass

DHCPV6Options(option_data: ResultDict)

DHCPV6Options data class.

Attributes:

Name Type Description
option_1 ResultDict | None

DHCP IPv6 option 1.

option_14 ResultDict | None

DHCP IPv6 option 14.

option_16 ResultDict | None

DHCP IPv6 option 16.

option_17 ResultDict | None

DHCP IPv6 option 17.

option_2 ResultDict | None

DHCP IPv6 option 2.

option_20 ResultDict | None

DHCP IPv6 option 20.

option_23 ResultDict | None

DHCP IPv6 option 23.

option_24 ResultDict | None

DHCP IPv6 option 24.

option_25 ResultDict | None

DHCP IPv6 option 25.

option_26 ResultDict | None

DHCP IPv6 option 26.

option_3 ResultDict | None

DHCP IPv6 option 3.

option_5 ResultDict | None

DHCP IPv6 option 5.

option_6 ResultDict | None

DHCP IPv6 option 6.

option_64 ResultDict | None

DHCP IPv6 option 64.

option_8 ResultDict | None

DHCP IPv6 option 8.

option_1 property
option_1: ResultDict | None

DHCP IPv6 option 1.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 1.

option_14 property
option_14: ResultDict | None

DHCP IPv6 option 14.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 14.

option_16 property
option_16: ResultDict | None

DHCP IPv6 option 16.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 16.

option_17 property
option_17: ResultDict | None

DHCP IPv6 option 17.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 17.

option_2 property
option_2: ResultDict | None

DHCP IPv6 option 2.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 2.

option_20 property
option_20: ResultDict | None

DHCP IPv6 option 20.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 20.

option_23 property
option_23: ResultDict | None

DHCP IPv6 option 23.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 23.

option_24 property
option_24: ResultDict | None

DHCP IPv6 option 24.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 24.

option_25 property
option_25: ResultDict | None

DHCP IPv6 option 25.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 25.

option_26 property
option_26: ResultDict | None

DHCP IPv6 option 26.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 26.

option_3 property
option_3: ResultDict | None

DHCP IPv6 option 3.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 3.

option_5 property
option_5: ResultDict | None

DHCP IPv6 option 5.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 5.

option_6 property
option_6: ResultDict | None

DHCP IPv6 option 6.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 6.

option_64 property
option_64: ResultDict | None

DHCP IPv6 option 64.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 64.

option_8 property
option_8: ResultDict | None

DHCP IPv6 option 8.

Returns:

Type Description
ResultDict | None

DHCP IPv6 option 8.

DHCPV6TraceData dataclass

DHCPV6TraceData(
    source: IPAddresses,
    destination: IPAddresses,
    dhcpv6_packet: ResultDict,
    dhcpv6_message_type: int,
)

DHCPv6TraceData data class.

Holds source, destination, DHCPv6_packet and DHCPv6_message_type.

interface

Data classes to store IP interface info.

Classes:

Name Description
IPAddresses

To store IP address information.

IPAddresses dataclass

IPAddresses(
    ipv4: IPv4Address | None,
    ipv6: IPv6Address | None,
    link_local_ipv6: IPv6Address | None,
)

To store IP address information.

network_models

Data classes to store all models related to network.

Classes:

Name Description
IPerf3TrafficGenerator

IPerf3TrafficGenerator data classes.

IPerf3TrafficGenerator dataclass

IPerf3TrafficGenerator(
    traffic_sender: LAN | WAN | WLAN,
    sender_pid: int | None,
    traffic_receiver: LAN | WAN | WLAN,
    receiver_pid: int | None,
    server_log_file: str = "",
    client_log_file: str = "",
)

IPerf3TrafficGenerator data classes.

It holds sender/receiver devices and their process IDs.

packets

Data classes to store network packets.

Classes:

Name Description
ICMPPacketData

ICMP packet data class.

RIPv2PacketData

Class to hold RIP packet details for Use Case.

ICMPPacketData dataclass

ICMPPacketData(query_code: int, source: IPAddresses, destination: IPAddresses)

ICMP packet data class.

To hold all the packet information specific to ICMP packets.

source and destination could be either IPv4 or IPv6 addresses. query_code defines the type of message received or sent and could be among the following:

* Type 0 = Echo Reply
* Type 8 = Echo Request
* Type 9 = Router Advertisement
* Type 10 = Router Solicitation
* Type 13 = Timestamp Request
* Type 14 = Timestamp Reply

RIPv2PacketData dataclass

RIPv2PacketData(
    source: IPv4Address,
    destination: IPv4Address,
    ip_address: list[IPv4Address],
    subnet: list[IPv4Interface | IPv6Interface],
    frame_time: datetime | None = None,
)

Class to hold RIP packet details for Use Case.

device_manager

Boardfarm device manager.

Classes:

Name Description
DeviceManager

Manages all the devices in the environment.

Functions:

Name Description
get_device_manager

Return device manager instance if already instantiated.

DeviceManager

DeviceManager(plugin_manager: PluginManager)

Manages all the devices in the environment.

Initialize device manager.

Parameters:

Name Type Description Default

plugin_manager

PluginManager

plugin manager

required

Raises:

Type Description
ValueError

when DeviceManager is already initialized

Methods:

Name Description
get_device_by_type

Get first device of the given type.

get_devices_by_type

Get devices of given type.

register_device

Register a device as plugin with boardfarm.

unregister_device

Unregister a device from boardfarm.

Source code in boardfarm3/lib/device_manager.py
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, plugin_manager: PluginManager) -> None:
    """Initialize device manager.

    :param plugin_manager: plugin manager
    :raises ValueError: when DeviceManager is already initialized
    """
    global _DEVICE_MANAGER_INSTANCE  # pylint: disable=global-statement  # noqa: PLW0603
    if _DEVICE_MANAGER_INSTANCE is not None:
        msg = "DeviceManager is already initialized."  # type: ignore[unreachable]
        raise ValueError(msg)
    self._plugin_manager = plugin_manager
    _DEVICE_MANAGER_INSTANCE = self

get_device_by_type

get_device_by_type(device_type: type[T]) -> T

Get first device of the given type.

In order to get all devices of given type use get_devices_by_type.

Parameters:

Name Type Description Default
device_type
type[T]

device type

required

Returns:

Type Description
T

device of given type

Raises:

Type Description
DeviceNotFound

when device of given type not available

Source code in boardfarm3/lib/device_manager.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def get_device_by_type(self, device_type: type[T]) -> T:
    """Get first device of the given type.

    In order to get all devices of given type use get_devices_by_type.

    :param device_type: device type
    :returns: device of given type
    :raises DeviceNotFound: when device of given type not available
    """
    for _, plugin in self._plugin_manager.list_name_plugin():
        if isinstance(plugin, device_type):
            return plugin
    msg = f"No device available of type {device_type}"
    raise DeviceNotFound(msg)

get_devices_by_type

get_devices_by_type(device_type: type[T]) -> dict[str, T]

Get devices of given type.

Parameters:

Name Type Description Default
device_type
type[T]

device type

required

Returns:

Type Description
dict[str, T]

devices of given type

Source code in boardfarm3/lib/device_manager.py
42
43
44
45
46
47
48
49
50
51
52
def get_devices_by_type(self, device_type: type[T]) -> dict[str, T]:
    """Get devices of given type.

    :param device_type: device type
    :returns: devices of given type
    """
    return {
        name: plugin
        for name, plugin in self._plugin_manager.list_name_plugin()
        if isinstance(plugin, device_type)
    }

register_device

register_device(device: BoardfarmDevice) -> None

Register a device as plugin with boardfarm.

Parameters:

Name Type Description Default
device
BoardfarmDevice

device instance to register

required
Source code in boardfarm3/lib/device_manager.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def register_device(self, device: BoardfarmDevice) -> None:
    """Register a device as plugin with boardfarm.

    :param device: device instance to register
    :type device: BoardfarmDevice
    """
    # During the registration of a plugin to the boardfarm, Pluggy calls all
    # of its properties. However, if a plugin has a method that throws an
    # exception, it can cause the boardfarm to crash. To address this issue
    # with Pluggy, we ignore the NotImplementedError and NotSupportedError
    # exceptions while registering a plugin.
    with mock.patch.object(
        device.__class__,
        "__getattribute__",
        _get_attribute_with_ignore_exception,
    ):
        self._plugin_manager.register(device, device.device_name)

unregister_device

unregister_device(device_name: str) -> None

Unregister a device from boardfarm.

Parameters:

Name Type Description Default
device_name
str

name of device to unregister

required
Source code in boardfarm3/lib/device_manager.py
87
88
89
90
91
92
def unregister_device(self, device_name: str) -> None:
    """Unregister a device from boardfarm.

    :param device_name: name of device to unregister
    """
    self._plugin_manager.set_blocked(device_name)

get_device_manager

get_device_manager() -> DeviceManager

Return device manager instance if already instantiated.

When you run boardfarm it will initialize the DeviceManager.

Returns:

Type Description
DeviceManager

device manager instance

Raises:

Type Description
ValueError

when device manager in not instantiated

Source code in boardfarm3/lib/device_manager.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def get_device_manager() -> DeviceManager:
    """Return device manager instance if already instantiated.

    When you run boardfarm it will initialize the DeviceManager.

    :raises ValueError: when device manager in not instantiated
    :return: device manager instance
    :rtype: DeviceManager
    """
    if _DEVICE_MANAGER_INSTANCE is None:
        msg = "DeviceManager is not instantiated."
        raise ValueError(msg)
    return _DEVICE_MANAGER_INSTANCE

dmcli

RDKB dmcli command line interface module.

Classes:

Name Description
DMCLIAPI

RDKB dmcli command line interface utility.

DMCLIError

Raise this on DMCLI command line utility errors.

DMCLIOut

DMCLI command output data.

DMCLIAPI

RDKB dmcli command line interface utility.

Initialize DMCLIAPI.

Parameters:

Name Type Description Default

console

BoardfarmPexpect

console instance which has dmcli utility

required

Methods:

Name Description
AddObject

Add object via dmcli.

DelObject

Add object via dmcli.

GPV

Get given parameter value via dmcli.

SPV

Set given parameter via dmcli.

Source code in boardfarm3/lib/dmcli.py
34
35
36
37
38
39
40
def __init__(self, console: BoardfarmPexpect) -> None:
    """Initialize DMCLIAPI.

    :param console: console instance which has dmcli utility
    :type console: BoardfarmPexpect
    """
    self._console = console

AddObject

AddObject(param: str) -> DMCLIOut

Add object via dmcli.

Parameters:

Name Type Description Default
param
str

param to be added

required

Returns:

Type Description
DMCLIOut

dmcli output object

Source code in boardfarm3/lib/dmcli.py
67
68
69
70
71
72
73
74
def AddObject(self, param: str) -> DMCLIOut:  # pylint: disable=invalid-name
    """Add object via dmcli.

    :param param: param to be added
    :return: dmcli output object
    :rtype: DMCLIOut
    """
    return self._trigger_dmcli_cmd("addtable", param)

DelObject

DelObject(param: str) -> DMCLIOut

Add object via dmcli.

Parameters:

Name Type Description Default
param
str

param to delete

required

Returns:

Type Description
DMCLIOut

dmcli output object

Source code in boardfarm3/lib/dmcli.py
106
107
108
109
110
111
112
113
def DelObject(self, param: str) -> DMCLIOut:  # pylint: disable=invalid-name
    """Add object via dmcli.

    :param param: param to delete
    :return: dmcli output object
    :rtype: DMCLIOut
    """
    return self._trigger_dmcli_cmd("deltable", param)

GPV

GPV(param: str) -> DMCLIOut

Get given parameter value via dmcli.

Parameters:

Name Type Description Default
param
str

param to get

required

Returns:

Type Description
DMCLIOut

dmcli output object

Source code in boardfarm3/lib/dmcli.py
 96
 97
 98
 99
100
101
102
103
104
def GPV(self, param: str) -> DMCLIOut:  # pylint: disable=invalid-name
    """Get given parameter value via dmcli.

    :param param: param to get
    :type param: str
    :return: dmcli output object
    :rtype: DMCLIOut
    """
    return self._trigger_dmcli_cmd("getvalues", param)

SPV

SPV(
    param: str, value: str, type_set: str = "string", sleep_timeout: float = 0.0
) -> DMCLIOut

Set given parameter via dmcli.

Parameters:

Name Type Description Default
param
str

param in which value is to be set

required
value
str

value to be set

required
type_set
str

type of value set, defaults to string

'string'
sleep_timeout
float

sleep values when SPV, defaults to 0.0

0.0

Returns:

Type Description
DMCLIOut

dmcli output object

Source code in boardfarm3/lib/dmcli.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def SPV(  # pylint: disable=invalid-name
    self,
    param: str,
    value: str,
    type_set: str = "string",
    sleep_timeout: float = 0.0,
) -> DMCLIOut:
    """Set given parameter via dmcli.

    :param param: param in which value is to be set
    :param value: value to be set
    :param type_set: type of value set, defaults to string
    :param sleep_timeout: sleep values when SPV, defaults to 0.0
    :return: dmcli output object
    :rtype: DMCLIOut
    """
    return self._trigger_dmcli_cmd(
        "setvalues", f"{param} {type_set} {value}", sleep_timeout
    )

DMCLIError

Raise this on DMCLI command line utility errors.

DMCLIOut dataclass

DMCLIOut(status: str, rtype: str, rval: str, console_out: str)

DMCLI command output data.

Properties: status, rtype, rval, console_out

docker_factory

Docker Factory v2 related libraries.

Modules:

Name Description
docker_compose_generator

Library to generate docker-compose.yml payload to docker factory v2.

docker_compose_generator

Library to generate docker-compose.yml payload to docker factory v2.

Classes:

Name Description
DockerComposeGenerator

Class to manage docker-compose.yml payload for docker factory v2.

DockerComposeGenerator

DockerComposeGenerator(boardfarm_config: BoardfarmConfig)

Class to manage docker-compose.yml payload for docker factory v2.

Initialize the YMLManager for Docker Factory v2.

Parameters:

Name Type Description Default
boardfarm_config
BoardfarmConfig

Boardfarm Config instance

required

Methods:

Name Description
generate_docker_compose

Generate the docker-compose yml to be used as payload for docker factory.

Source code in boardfarm3/lib/docker_factory/docker_compose_generator.py
23
24
25
26
27
28
29
30
31
32
def __init__(self, boardfarm_config: BoardfarmConfig) -> None:
    """Initialize the YMLManager for Docker Factory v2.

    :param boardfarm_config: Boardfarm Config instance
    :type boardfarm_config: BoardfarmConfig
    """
    self._templates_path = Path(__file__).parent / "templates"
    self._boardfarm_config = boardfarm_config
    self._devices_list: list[str] = []
    self._get_devices(self._boardfarm_config.env_config)
generate_docker_compose
generate_docker_compose() -> dict[str, Any]

Generate the docker-compose yml to be used as payload for docker factory.

Returns:

Type Description
dict[str, Any]

The docker compose payload for docker factory

Source code in boardfarm3/lib/docker_factory/docker_compose_generator.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def generate_docker_compose(self) -> dict[str, Any]:
    """Generate the docker-compose yml to be used as payload for docker factory.

    :return: The docker compose payload for docker factory
    :rtype: dict[str, Any]
    """
    base_compose = self._generate_base_compose()
    for device in sorted(self._devices_list):
        base_compose = self._merge_dicts(
            base_compose,
            self._generate_device_compose(device),
        )
    return jsonmerge.merge(
        base_compose,
        self._generate_orchestrator_compose(list(base_compose["services"].keys())),
    )

gui

Keep SonarQube happy.

Modules:

Name Description
gui_helper

Module for Selenium webdriver interaction implementations.

prplos

Keep ruff happy.

gui_helper

Module for Selenium webdriver interaction implementations.

Classes:

Name Description
GuiHelper

GuiHelper class to get webdrivers for testing.

GuiHelperNoProxy

GuiHelper without proxy.

ScreenshotListener

Take a screenshot on exceptions/events.

Functions:

Name Description
element_is_present_by_css

Determine if element is present by its css selector.

firefox_webproxy_driver

Initialize proxy firefox webdriver.

get_web_driver

Return proxy webdriver.

save_screenshot

Save screenshot of the driver window.

GuiHelper

GuiHelper(
    device: LAN | WAN | WLAN, default_delay: int = 20, output_dir: str | None = None
)

GuiHelper class to get webdrivers for testing.

GUI helper class.

Parameters:

Name Type Description Default
device
LAN | WAN | WLAN

device instance

required
default_delay
int

default delay in seconds, defaults to 20

20
output_dir
str | None

output directory path, defaults to None

None

Methods:

Name Description
get_web_driver

Return event firing web driver.

get_webdriver_without_event_firing

Return webdriver without the EventFiringWebDriver.

Source code in boardfarm3/lib/gui/gui_helper.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def __init__(
    self,
    device: LAN | WAN | WLAN,
    default_delay: int = 20,
    output_dir: str | None = None,
) -> None:
    """GUI helper class.

    :param device: device instance
    :type device: LAN | WAN | WLAN
    :param default_delay: default delay in seconds, defaults to 20
    :type default_delay: int
    :param output_dir: output directory path, defaults to None
    :type output_dir: str | None
    """
    if output_dir is None:
        output_dir = pathlib.Path.cwd().joinpath("results").as_posix()
    self._device = device
    self._default_delay = default_delay
    self._test_name = get_pytest_name()
    self._screenshot_path = str(
        pathlib.PurePosixPath(output_dir).joinpath(self._test_name)
    )
get_web_driver
get_web_driver() -> EventFiringWebDriver

Return event firing web driver.

Returns:

Type Description
EventFiringWebDriver

web driver instance

Source code in boardfarm3/lib/gui/gui_helper.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def get_web_driver(self) -> EventFiringWebDriver:
    """Return event firing web driver.

    :return: web driver instance
    :rtype: EventFiringWebDriver
    """
    webdriver = get_web_driver(
        self._device, self._default_delay, headless=self._headless
    )
    event_firing_webdriver = EventFiringWebDriver(
        webdriver, ScreenshotListener(self._screenshot_path)
    )
    event_firing_webdriver.screenshot_path = self._screenshot_path
    return event_firing_webdriver
get_webdriver_without_event_firing
get_webdriver_without_event_firing(headless: bool = True) -> WebDriver

Return webdriver without the EventFiringWebDriver.

Parameters:

Name Type Description Default
headless
bool

run in headless mode, defaults to True

True

Returns:

Type Description
WebDriver

web driver instance

Source code in boardfarm3/lib/gui/gui_helper.py
381
382
383
384
385
386
387
388
389
390
391
def get_webdriver_without_event_firing(self, headless: bool = True) -> WebDriver:
    """Return webdriver without the EventFiringWebDriver.

    :param headless: run in headless mode, defaults to True
    :type headless: bool
    :return: web driver instance
    :rtype: WebDriver
    """
    driver = get_web_driver(self._device, self._default_delay, headless=headless)
    driver.screenshot_path = self._screenshot_path  # type: ignore[attr-defined]
    return driver

GuiHelperNoProxy

GuiHelperNoProxy(
    default_delay: int = 20, output_dir: str | None = None, headless: bool = True
)

GuiHelper without proxy.

GUI helper class without proxy.

Parameters:

Name Type Description Default
default_delay
int

default delay in seconds, defaults to 20

20
output_dir
str | None

output directory path, defaults to None

None
headless
bool

turns on/off headless mode, defaults to True

True

Methods:

Name Description
get_web_driver

Return event firing web driver.

get_webdriver_without_event_firing

Return webdriver without the EventFiringWebDriver.

Source code in boardfarm3/lib/gui/gui_helper.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def __init__(
    self,
    default_delay: int = 20,
    output_dir: str | None = None,
    headless: bool = True,
) -> None:
    """GUI helper class without proxy.

    :param default_delay: default delay in seconds, defaults to 20
    :type default_delay: int
    :param output_dir: output directory path, defaults to None
    :type output_dir: str
    :param headless: turns on/off headless mode, defaults to True
    :type headless: bool
    """
    super().__init__(None, default_delay, output_dir)
    self._headless = headless
get_web_driver
get_web_driver() -> EventFiringWebDriver

Return event firing web driver.

Returns:

Type Description
EventFiringWebDriver

web driver instance

Source code in boardfarm3/lib/gui/gui_helper.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def get_web_driver(self) -> EventFiringWebDriver:
    """Return event firing web driver.

    :return: web driver instance
    :rtype: EventFiringWebDriver
    """
    webdriver = firefox_webproxy_driver(
        http_proxy="",
        default_delay=self._default_delay,
        headless=self._headless,
    )
    webdriver.maximize_window()
    event_firing_webdriver = EventFiringWebDriver(
        webdriver, ScreenshotListener(self._screenshot_path)
    )
    event_firing_webdriver.screenshot_path = self._screenshot_path
    return event_firing_webdriver
get_webdriver_without_event_firing
get_webdriver_without_event_firing(headless: bool = True) -> WebDriver

Return webdriver without the EventFiringWebDriver.

Parameters:

Name Type Description Default
headless
bool

run in headless mode, defaults to True

True

Returns:

Type Description
WebDriver

web driver instance

Source code in boardfarm3/lib/gui/gui_helper.py
381
382
383
384
385
386
387
388
389
390
391
def get_webdriver_without_event_firing(self, headless: bool = True) -> WebDriver:
    """Return webdriver without the EventFiringWebDriver.

    :param headless: run in headless mode, defaults to True
    :type headless: bool
    :return: web driver instance
    :rtype: WebDriver
    """
    driver = get_web_driver(self._device, self._default_delay, headless=headless)
    driver.screenshot_path = self._screenshot_path  # type: ignore[attr-defined]
    return driver

ScreenshotListener

ScreenshotListener(screenshot_path: str)

Take a screenshot on exceptions/events.

This allows to capture screenshot based on selenium web driver events. Capturing screenshot can be varied by setting the logging.root.level When logging.root.level set to :

1. NOTSET - takes screenshots for on_exception and before_click events
2. INFO   - takes screenshots for on_exception, before_click and
                    after_change_value_of events
3. DEBUG  - takes screenshot for all the events

Init method.

Parameters:

Name Type Description Default
screenshot_path
str

the screenshot destination dir

required

Methods:

Name Description
after_change_value_of

Capture screenshot after change_value_of event.

after_click

Capture screenshot after click event.

after_close

Capture screenshot after close event.

after_execute_script

Capture screenshot after execute_script event.

after_navigate_to

Capture screenshot after navigate_to event.

before_change_value_of

Capture screenshot before change_value_of event.

before_click

Capture screenshot before click event.

before_close

Capture screenshot before close event.

before_execute_script

Capture screenshot before execute_script event.

before_navigate_to

Capture screenshot before navigate_to event.

before_quit

Capture screenshot before quit event.

capture_screenshot

Capture screenshot and save name.ext to disk.

on_exception

Capture screenshot on exception.

Source code in boardfarm3/lib/gui/gui_helper.py
52
53
54
55
56
57
58
59
def __init__(self, screenshot_path: str):
    """Init method.

    :param screenshot_path: the screenshot destination dir
    :type screenshot_path: str
    """
    super().__init__()
    self.screenshot_path = screenshot_path
after_change_value_of
after_change_value_of(element: WebElement, driver: WebDriver) -> None

Capture screenshot after change_value_of event.

Parameters:

Name Type Description Default
element
WebElement

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def after_change_value_of(
    self,
    element: WebElement,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot after change_value_of event.

    :param element: unused
    :type element: WebElement
    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled or self.debug_enabled:
        self.capture_screenshot(driver, "after_change_value_of")
after_click
after_click(element: WebElement, driver: WebDriver) -> None

Capture screenshot after click event.

Parameters:

Name Type Description Default
element
WebElement

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def after_click(
    self,
    element: WebElement,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot after click event.

    :param element: unused
    :type element: WebElement
    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "after_click")
after_close
after_close(driver: WebDriver) -> None

Capture screenshot after close event.

Parameters:

Name Type Description Default
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
241
242
243
244
245
246
247
248
def after_close(self, driver: WebDriver) -> None:
    """Capture screenshot after close event.

    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "after_close")
after_execute_script
after_execute_script(script: str, driver: WebDriver) -> None

Capture screenshot after execute_script event.

Parameters:

Name Type Description Default
script
str

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def after_execute_script(
    self,
    script: str,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot after execute_script event.

    :param script: unused
    :type script: str
    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "after_execute_script")
after_navigate_to
after_navigate_to(url: str, driver: WebDriver) -> None

Capture screenshot after navigate_to event.

Parameters:

Name Type Description Default
url
str

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def after_navigate_to(
    self,
    url: str,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot after navigate_to event.

    :param url: unused
    :type url: str
    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "after_navigate_to")
before_change_value_of
before_change_value_of(element: WebElement, driver: WebDriver) -> None

Capture screenshot before change_value_of event.

Parameters:

Name Type Description Default
element
WebElement

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def before_change_value_of(
    self,
    element: WebElement,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot before change_value_of event.

    :param element: unused
    :type element: WebElement
    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "before_change_value_of")
before_click
before_click(element: WebElement, driver: WebDriver) -> None

Capture screenshot before click event.

Parameters:

Name Type Description Default
element
WebElement

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
143
144
145
146
147
148
149
150
151
152
153
154
155
def before_click(
    self,
    element: WebElement,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot before click event.

    :param element: unused
    :type element: WebElement
    :param driver: web driver
    :type driver: WebDriver
    """
    self.capture_screenshot(driver, "before_click")
before_close
before_close(driver: WebDriver) -> None

Capture screenshot before close event.

Parameters:

Name Type Description Default
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
232
233
234
235
236
237
238
239
def before_close(self, driver: WebDriver) -> None:
    """Capture screenshot before close event.

    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "before_close")
before_execute_script
before_execute_script(script: str, driver: WebDriver) -> None

Capture screenshot before execute_script event.

Parameters:

Name Type Description Default
script
str

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def before_execute_script(
    self,
    script: str,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot before execute_script event.

    :param script: unused
    :type script: str
    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "before_execute_script")
before_navigate_to
before_navigate_to(url: str, driver: WebDriver) -> None

Capture screenshot before navigate_to event.

Parameters:

Name Type Description Default
url
str

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def before_navigate_to(
    self,
    url: str,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot before navigate_to event.

    :param url: unused
    :type url: str
    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "before_navigate_to")
before_quit
before_quit(driver: WebDriver) -> None

Capture screenshot before quit event.

Parameters:

Name Type Description Default
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
250
251
252
253
254
255
256
257
def before_quit(self, driver: WebDriver) -> None:
    """Capture screenshot before quit event.

    :param driver: web driver
    :type driver: WebDriver
    """
    if self.verbose_debug_enabled:
        self.capture_screenshot(driver, "before_quit")
capture_screenshot
capture_screenshot(driver: WebDriver, name: str, ext: str = 'png') -> None

Capture screenshot and save name.ext to disk.

Parameters:

Name Type Description Default
driver
WebDriver

web driver

required
name
str

the filename (with path if needed)

required
ext
str

the extension, defaults to png

'png'
Source code in boardfarm3/lib/gui/gui_helper.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def capture_screenshot(
    self, driver: WebDriver, name: str, ext: str = "png"
) -> None:
    """Capture screenshot and save name.ext to disk.

    :param driver: web driver
    :type driver: WebDriver
    :param name: the filename (with path if needed)
    :type name: str
    :param ext: the extension, defaults to png
    :type ext: str
    """
    WebDriverWait(driver, 10).until(
        lambda drv: drv.execute_script(
            "return document.readyState",
        )
        == "complete"
    )
    now = datetime.now(tz=UTC)
    abs_path = (
        "_".join([self.screenshot_path, now.strftime("%Y%m%d_%H%M%S%f"), name])
        + "."
        + ext
    )

    def gui_page_size(dimension: str) -> str:
        return driver.execute_script(
            "return document.body.parentNode.scroll" + dimension
        )

    # Setting screen size twice because one is not enough for Selenium :)
    driver.set_window_size(gui_page_size("Width"), gui_page_size("Height"))
    driver.set_window_size(gui_page_size("Width"), gui_page_size("Height"))

    driver.get_screenshot_as_file(abs_path)
    driver.maximize_window()  # Restore window to fit screen
    _LOGGER.debug("Screenshot saved as '%s'", abs_path)
on_exception
on_exception(exception: Exception, driver: WebDriver) -> None

Capture screenshot on exception.

Parameters:

Name Type Description Default
exception
Exception

unused

required
driver
WebDriver

web driver

required
Source code in boardfarm3/lib/gui/gui_helper.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
def on_exception(
    self,
    exception: Exception,  # noqa: ARG002
    driver: WebDriver,
) -> None:
    """Capture screenshot on exception.

    :param exception: unused
    :type exception: Exception
    :param driver: web driver
    :type driver: WebDriver
    """
    self.capture_screenshot(driver, "Exception")

element_is_present_by_css

element_is_present_by_css(element_css: str) -> Callable[[AnyDriver], WebElement | bool]

Determine if element is present by its css selector.

Parameters:

Name Type Description Default
element_css
str

element css selector

required

Returns:

Type Description
Callable[[EC.AnyDriver], WebDriver | EventFiringWebDriver]

True if element is present, False otherwise

Source code in boardfarm3/lib/gui/gui_helper.py
394
395
396
397
398
399
400
401
402
403
404
def element_is_present_by_css(
    element_css: str,
) -> Callable[[EC.AnyDriver], WebElement | bool]:  # type: ignore[name-defined]
    """Determine if element is present by its css selector.

    :param element_css: element css selector
    :type element_css: str
    :return: True if element is present, False otherwise
    :rtype: Callable[[EC.AnyDriver], WebDriver | EventFiringWebDriver]
    """
    return EC.presence_of_element_located((By.CSS_SELECTOR, element_css))

firefox_webproxy_driver

firefox_webproxy_driver(
    http_proxy: str, default_delay: int, headless: bool = False
) -> Firefox

Initialize proxy firefox webdriver.

Parameters:

Name Type Description Default
http_proxy
str

proxy ip and port number

required
default_delay
int

selenium default delay in seconds

required
headless
bool

headless state, default to False

False

Returns:

Type Description
Firefox

gui selenium web driver instance

Source code in boardfarm3/lib/gui/gui_helper.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def firefox_webproxy_driver(
    http_proxy: str, default_delay: int, headless: bool = False
) -> Firefox:
    """Initialize proxy firefox webdriver.

    :param http_proxy: proxy ip and port number
    :type http_proxy: str
    :param default_delay: selenium default delay in seconds
    :type default_delay: int
    :param headless: headless state, default to False
    :type headless: bool
    :return: gui selenium web driver instance
    :rtype: Firefox
    """
    options = Options()
    if headless:
        options.add_argument("--headless")
    if http_proxy:
        gateway_ip, port = http_proxy.split(":")
        options.set_preference("network.proxy.type", 1)
        options.set_preference("network.proxy.socks", gateway_ip)
        options.set_preference("network.proxy.socks_port", int(port))
        options.set_preference("network.proxy.socks_version", 5)
        options.set_preference("network.proxy.socks_remote_dns", True)
    options.set_preference("security.enterprise_roots.enabled", True)
    # number 2 is to save the file to the above current location instead of downloads
    options.set_preference("browser.download.folderList", 2)
    # added the download dir as /tmp
    options.set_preference("browser.download.dir", "/tmp")  # noqa: S108
    # open the file without asking any questions
    options.set_preference(
        "browser.helperApps.neverAsk.openFile",
        (
            "text/anytext,text/comma-separated-values,"
            "text/csv,application/octet-stream,text/plain"
        ),
    )
    # save the file without asking any questions
    options.set_preference("browser.helperApps.neverAsk.saveToDisk", "text/plain")
    options.headless = headless  # type: ignore[attr-defined]
    # Make DEBUG logs less polluted with selenium.webdriver.remote.remote_connection
    # and urllib3.connectionpool messages
    LOGGER.setLevel(logging.WARNING)
    log.setLevel(logging.WARNING)
    driver = Firefox(options=options)
    driver.implicitly_wait(default_delay)
    driver.set_page_load_timeout(default_delay)
    return driver

get_web_driver

get_web_driver(
    device: LAN | WAN | WLAN, default_delay: int, headless: bool = True
) -> WebDriver

Return proxy webdriver.

Http proxy (dante) must be running on provided device.

Parameters:

Name Type Description Default
device
LAN | WAN | WLAN

device instance

required
default_delay
int

default delay in seconds

required
headless
bool

headless state, default to True

True

Returns:

Type Description
WebDriver

configured Firefox webdriver instance

Source code in boardfarm3/lib/gui/gui_helper.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def get_web_driver(
    device: LAN | WAN | WLAN,
    default_delay: int,
    headless: bool = True,
) -> WebDriver:
    """Return proxy webdriver.

    Http proxy (dante) must be running on provided device.

    :param device: device instance
    :type device: LAN | WAN | WLAN
    :param default_delay: default delay in seconds
    :type default_delay: int
    :param headless: headless state, default to True
    :type headless: bool
    :return: configured Firefox webdriver instance
    :rtype: WebDriver
    """
    webdriver = firefox_webproxy_driver(
        http_proxy=device.http_proxy,
        default_delay=default_delay,
        headless=headless,
    )
    webdriver.maximize_window()
    return webdriver

save_screenshot

save_screenshot(driver: WebDriver | EventFiringWebDriver) -> None

Save screenshot of the driver window.

Parameters:

Name Type Description Default
driver
WebDriver | EventFiringWebDriver

webdriver instance

required
Source code in boardfarm3/lib/gui/gui_helper.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def save_screenshot(driver: WebDriver | EventFiringWebDriver) -> None:
    """Save screenshot of the driver window.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    """
    total_width = driver.execute_script("return document.body.parentNode.scrollWidth")
    total_height = driver.execute_script("return document.body.parentNode.scrollHeight")
    # Setting screen size twice because one is not enough for Selenium :)
    driver.set_window_size(total_width, total_height)
    driver.set_window_size(total_width, total_height)
    path = (
        "_".join(
            [
                driver.screenshot_path,  # type: ignore[union-attr]
                datetime.now(tz=UTC).strftime("%Y%m%d_%H%M%S%f"),
            ]
        )
        + "_exception.png"
    )
    driver.get_screenshot_as_file(path)

prplos

Keep ruff happy.

Modules:

Name Description
pages

Keep ruff happy.

pages

Keep ruff happy.

Modules:

Name Description
home

PrplOS GUI Home Page.

lcm

PrplOS GUI LCM Page.

login

Login Page POM.

page_helper

Page Helper Module.

prplos_base_pom

PrplOS Base Page Oobject Module (POM).

wan

PrplOS GUI WAN Page.

wifi

PrplOS GUI WiFi Page.

home

PrplOS GUI Home Page.

Classes:

Name Description
HomePage

Page Object for Home page.

HomePage
HomePage(
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
)

Page Object for Home page.

Initialize HomePage POM.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required
gw_ip str | IPv4Address

gateway ip address

required
fluent_wait int

timeout in seconds to load the page, defaults to 20

20

Methods:

Name Description
__getattribute__

Take a screenshot and save to a folder upon TimeoutException.

click_networking_submenu

Ckick on WAN menu option.

is_page_loaded

Verify the home page is completely loaded.

logout

Log out.

wait_until_loaded

Wait until the page is loaded completely.

Attributes:

Name Type Description
cpu_info_element WebElement

CPU info box.

memory_info_element WebElement

Memory info box.

system_info_element WebElement

System info box.

wan_info_element WebElement

WAN info element.

Source code in boardfarm3/lib/gui/prplos/pages/home.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(
    self,
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
) -> None:
    """Initialize HomePage POM.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :param gw_ip: gateway ip address
    :type gw_ip: str | IPv4Address
    :param fluent_wait: timeout in seconds to load the page, defaults to 20
    :type fluent_wait: int
    """
    super().__init__(driver, gw_ip, fluent_wait)
    self.wait.until(self.is_page_loaded)
cpu_info_element property
cpu_info_element: WebElement

CPU info box.

Returns:

Type Description
WebElement

the web element

memory_info_element property
memory_info_element: WebElement

Memory info box.

Returns:

Type Description
WebElement

the web element

system_info_element property
system_info_element: WebElement

System info box.

Returns:

Type Description
WebElement

the web element

wan_info_element property
wan_info_element: WebElement

WAN info element.

Returns:

Type Description
WebElement

the web element

__getattribute__
__getattribute__(name: str) -> None

Take a screenshot and save to a folder upon TimeoutException.

Parameters:

Name Type Description Default
name str

the attribute

required

Returns:

Type Description
related to the attribute

the attribute value

Raises:

Type Description
TimeoutException

if the attr was not found

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __getattribute__(self, name: str) -> None:
    """Take a screenshot and save to a folder upon TimeoutException.

    :param name: the attribute
    :type name: str
    :return: the attribute value
    :rtype: related to the attribute
    :raises TimeoutException: if the attr was not found
    """
    try:
        return super().__getattribute__(name)
    except TimeoutException:
        now = datetime.now(UTC)
        sc_path = (
            "_".join(
                [
                    self.driver.screenshot_path,  # type: ignore[union-attr]
                    now.strftime("%Y%m%d_%H%M%S%f"),
                ],
            )
            + "_TimeoutException.png"
        )
        self.driver.get_screenshot_as_file(sc_path)
        _LOGGER.debug("Screenshot saved as '%s'", sc_path)
        raise
click_networking_submenu
click_networking_submenu(element: WebElement) -> None

Ckick on WAN menu option.

Parameters:

Name Type Description Default
element WebElement

the element to click

required
Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
115
116
117
118
119
120
121
122
123
124
def click_networking_submenu(self, element: WebElement) -> None:
    """Ckick on WAN menu option.

    :param element: the element to click
    :type element: WebElement
    """
    self.actions.move_to_element(  # type: ignore[attr-defined]
        element,
    ).pause(1).click().perform()
    time.sleep(1)
is_page_loaded
is_page_loaded(driver: WebDriver | EventFiringWebDriver) -> bool

Verify the home page is completely loaded.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required

Returns:

Type Description
bool

True if home page is loaded, Otherwise False

Source code in boardfarm3/lib/gui/prplos/pages/home.py
39
40
41
42
43
44
45
46
47
48
49
def is_page_loaded(self, driver: WebDriver | EventFiringWebDriver) -> bool:
    """Verify the home page is completely loaded.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :return: True if home page is loaded, Otherwise False
    :rtype: bool
    """
    return self.system_info_element.is_displayed() and driver.execute_script(
        "return document.readyState == 'complete'"
    )
logout
logout() -> None

Log out.

All the pages derived from the POM have logged in!

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
126
127
128
129
130
131
132
def logout(self) -> None:
    """Log out.

    All the pages derived from the POM have logged in!
    """
    get_element_by_css(self, ".btn").click()
    get_element_by_css(self, ".logo")
wait_until_loaded
wait_until_loaded(timeout: int | None = None) -> bool

Wait until the page is loaded completely.

Generic method to wait upto fluent_wait time or upto timeout specified by the user until the page is loaded and return the status

Parameters:

Name Type Description Default
timeout int | None

timeout in seconds to load the page, defaults to None

None

Returns:

Type Description
bool

True if page is loaded, Otherwise False

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def wait_until_loaded(self, timeout: int | None = None) -> bool:
    """Wait until the page is loaded completely.

    Generic method to wait upto fluent_wait time or upto timeout specified
    by the user until the page is loaded and return the status

    :param timeout: timeout in seconds to load the page, defaults to None
    :type timeout: int | None, default None
    :return: True if page is loaded, Otherwise False
    :rtype: bool
    """
    return wait_until_loaded(self, timeout)
lcm

PrplOS GUI LCM Page.

Classes:

Name Description
LCMPage

Page Object for LCM page.

LCMPage
LCMPage(
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
)

Page Object for LCM page.

Initialize LCMPage POM.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required
gw_ip str | IPv4Address

gateway ip address

required
fluent_wait int

timeout in seconds to load the page, defaults to 20

20

Methods:

Name Description
__getattribute__

Take a screenshot and save to a folder upon TimeoutException.

click_networking_submenu

Ckick on WAN menu option.

is_page_loaded

Verify the home page is completely loaded.

logout

Log out.

wait_until_loaded

Wait until the page is loaded completely.

Attributes:

Name Type Description
networking_lcm_header WebElement

The LCM header element.

Source code in boardfarm3/lib/gui/prplos/pages/lcm.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
) -> None:
    """Initialize LCMPage POM.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :param gw_ip: gateway ip address
    :type gw_ip: str | IPv4Address
    :param fluent_wait: timeout in seconds to load the page, defaults to 20
    :type fluent_wait: int
    """
    super().__init__(driver, gw_ip, fluent_wait)
    try:
        if self.networking_lcm_header.is_displayed():
            return  # Do not click open sub-menu if it is already open
    except NoSuchElementException:
        pass
    self.click_networking_submenu(get_element_by_css(self, _SUBMENU_CSS))
    self.wait.until(self.is_page_loaded)
networking_lcm_header property
networking_lcm_header: WebElement

The LCM header element.

Returns:

Type Description
WebElement

the web element

Raises:

Type Description
NoSuchElementException

if not found

__getattribute__
__getattribute__(name: str) -> None

Take a screenshot and save to a folder upon TimeoutException.

Parameters:

Name Type Description Default
name str

the attribute

required

Returns:

Type Description
related to the attribute

the attribute value

Raises:

Type Description
TimeoutException

if the attr was not found

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __getattribute__(self, name: str) -> None:
    """Take a screenshot and save to a folder upon TimeoutException.

    :param name: the attribute
    :type name: str
    :return: the attribute value
    :rtype: related to the attribute
    :raises TimeoutException: if the attr was not found
    """
    try:
        return super().__getattribute__(name)
    except TimeoutException:
        now = datetime.now(UTC)
        sc_path = (
            "_".join(
                [
                    self.driver.screenshot_path,  # type: ignore[union-attr]
                    now.strftime("%Y%m%d_%H%M%S%f"),
                ],
            )
            + "_TimeoutException.png"
        )
        self.driver.get_screenshot_as_file(sc_path)
        _LOGGER.debug("Screenshot saved as '%s'", sc_path)
        raise
click_networking_submenu
click_networking_submenu(element: WebElement) -> None

Ckick on WAN menu option.

Parameters:

Name Type Description Default
element WebElement

the element to click

required
Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
115
116
117
118
119
120
121
122
123
124
def click_networking_submenu(self, element: WebElement) -> None:
    """Ckick on WAN menu option.

    :param element: the element to click
    :type element: WebElement
    """
    self.actions.move_to_element(  # type: ignore[attr-defined]
        element,
    ).pause(1).click().perform()
    time.sleep(1)
is_page_loaded
is_page_loaded(driver: WebDriver | EventFiringWebDriver) -> bool

Verify the home page is completely loaded.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required

Returns:

Type Description
bool

True if home page is loaded

Source code in boardfarm3/lib/gui/prplos/pages/lcm.py
51
52
53
54
55
56
57
58
59
60
61
def is_page_loaded(self, driver: WebDriver | EventFiringWebDriver) -> bool:
    """Verify the home page is completely loaded.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :return: True if home page is loaded
    :rtype: bool
    """
    return self.networking_lcm_header.is_displayed() and driver.execute_script(
        "return document.readyState == 'complete'"
    )
logout
logout() -> None

Log out.

All the pages derived from the POM have logged in!

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
126
127
128
129
130
131
132
def logout(self) -> None:
    """Log out.

    All the pages derived from the POM have logged in!
    """
    get_element_by_css(self, ".btn").click()
    get_element_by_css(self, ".logo")
wait_until_loaded
wait_until_loaded(timeout: int | None = None) -> bool

Wait until the page is loaded completely.

Generic method to wait upto fluent_wait time or upto timeout specified by the user until the page is loaded and return the status

Parameters:

Name Type Description Default
timeout int | None

timeout in seconds to load the page, defaults to None

None

Returns:

Type Description
bool

True if page is loaded, Otherwise False

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def wait_until_loaded(self, timeout: int | None = None) -> bool:
    """Wait until the page is loaded completely.

    Generic method to wait upto fluent_wait time or upto timeout specified
    by the user until the page is loaded and return the status

    :param timeout: timeout in seconds to load the page, defaults to None
    :type timeout: int | None, default None
    :return: True if page is loaded, Otherwise False
    :rtype: bool
    """
    return wait_until_loaded(self, timeout)
login

Login Page POM.

Classes:

Name Description
LoginPage

Page Object for Login page.

LoginPage
LoginPage(
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
    use_https: bool = False,
)

Page Object for Login page.

Initialize LoginPage POM.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required
gw_ip str | IPv4Address

gateway ip address

required
fluent_wait int

timeout in seconds to load the page, defaults to 20

20
use_https bool

whether to use http/s, defaults to False

False

Methods:

Name Description
is_page_loaded

Verify the home page is completely loaded.

login

Login to the UI.

Attributes:

Name Type Description
logo_element WebElement

Logo element.

password_box_element WebElement

Password textbox element.

username_box_element WebElement

Username textbox element.

Source code in boardfarm3/lib/gui/prplos/pages/login.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
    use_https: bool = False,
) -> None:
    """Initialize LoginPage POM.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :param gw_ip: gateway ip address
    :type gw_ip: str | IPv4Address
    :param fluent_wait: timeout in seconds to load the page, defaults to 20
    :type fluent_wait: int
    :param use_https: whether to use http/s, defaults to False
    :type use_https: bool
    """
    self.driver = driver
    self.wait = WebDriverWait(
        driver=self.driver,  # type: ignore[type-var]
        timeout=fluent_wait,
    )
    self.actions = ActionChains(
        driver if isinstance(driver, WebDriver) else driver.wrapped_driver
    )
    self.fluent_wait = fluent_wait
    if str(gw_ip) not in self.driver.current_url:
        self.driver.get(f"https://{gw_ip}" if use_https else f"http://{gw_ip}")
    driver.set_page_load_timeout(fluent_wait * 4)  # type: ignore[union-attr]
    self.wait.until(self.is_page_loaded)
logo_element property
logo_element: WebElement

Logo element.

Returns:

Type Description
WebElement

The web element

password_box_element property
password_box_element: WebElement

Password textbox element.

Returns:

Type Description
WebElement

The web element

username_box_element property
username_box_element: WebElement

Username textbox element.

Returns:

Type Description
WebElement

The web element

is_page_loaded
is_page_loaded(driver: WebDriver | EventFiringWebDriver) -> bool

Verify the home page is completely loaded.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required

Returns:

Type Description
bool

True if home page is loaded, Otherwise False

Source code in boardfarm3/lib/gui/prplos/pages/login.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def is_page_loaded(self, driver: WebDriver | EventFiringWebDriver) -> bool:
    """Verify the home page is completely loaded.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :return: True if home page is loaded, Otherwise False
    :rtype: bool
    """
    return (
        self.logo_element.is_displayed()
        and self.username_box_element.is_displayed()
        and self.password_box_element.is_displayed()
        and driver.execute_script("return document.readyState == 'complete'")
    )
login
login(user: str, password: str) -> None

Login to the UI.

Login to the UI or if the UI is not at the Login page then performs first installation operation

Parameters:

Name Type Description Default
user str

login username

required
password str

login password

required
Source code in boardfarm3/lib/gui/prplos/pages/login.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def login(self, user: str, password: str) -> None:
    """Login to the UI.

    Login to the UI or if the UI is not at the Login page then performs
    first installation operation

    :param user: login username
    :type user: str
    :param password: login password
    :type password: str
    """
    self.username_box_element.send_keys(user)
    self.username_box_element.send_keys(Keys.ENTER)
    self.password_box_element.send_keys(password)
    self.password_box_element.send_keys(Keys.ENTER)
    self._wait_until_logged_in()
page_helper

Page Helper Module.

Functions:

Name Description
get_element_by_css

Get an element by CSS selector (wait if needed).

initialize

Initialize the page obj.

wait_until_loaded

Wait until the page is loaded completely.

get_element_by_css
get_element_by_css(page: Any, element: str) -> WebElement

Get an element by CSS selector (wait if needed).

Parameters:

Name Type Description Default
page Any

the webpage

required
element str

the element

required

Returns:

Type Description
WebElement

the web element

Raises:

Type Description
NoSuchElementException

if the element is not found

Source code in boardfarm3/lib/gui/prplos/pages/page_helper.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def get_element_by_css(page: Any, element: str) -> WebElement:  # noqa: ANN401
    """Get an element by CSS selector (wait if needed).

    :param page: the webpage
    :type page: Any
    :param element: the element
    :type element: str
    :raises NoSuchElementException: if the element is not found
    :return: the web element
    :rtype: WebElement
    """
    if page.wait.until(element_is_present_by_css(element)):
        return page.driver.find_element(by=By.CSS_SELECTOR, value=element)
    msg = f"{element} not found in {page.driver.current_url}"
    raise NoSuchElementException(msg)
initialize
initialize(
    page: Any,
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
    use_https: bool = False,
    user: str = PRPLOS_USER,
    password: str = PRPLOS_PASSWORD,
) -> None

Initialize the page obj.

Parameters:

Name Type Description Default
page Any

the page

required
driver WebDriver | EventFiringWebDriver

the web driver

required
gw_ip str | IPv4Address

the gateway ip

required
fluent_wait int

browser fluent wait, defaults to 20

20
use_https bool

use htpp/s, defaults to False

False
user str

username, defaults to admin

PRPLOS_USER
password str

login password, defaults to admin

PRPLOS_PASSWORD
Source code in boardfarm3/lib/gui/prplos/pages/page_helper.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def initialize(  # noqa: PLR0913
    page: Any,  # noqa: ANN401
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
    use_https: bool = False,
    user: str = PRPLOS_USER,
    password: str = PRPLOS_PASSWORD,
) -> None:
    """Initialize the page obj.

    :param page: the page
    :type page: Any
    :param driver: the web driver
    :type driver: WebDriver | EventFiringWebDriver
    :param gw_ip: the gateway ip
    :type gw_ip: str | IPv4Address
    :param fluent_wait: browser fluent wait, defaults to 20
    :type fluent_wait: int
    :param use_https: use htpp/s, defaults to False
    :type use_https: bool
    :param user: username, defaults to admin
    :type user: str
    :param password: login password, defaults to admin
    :type password: str
    """
    page.driver = driver
    page.wait = WebDriverWait(
        driver=page.driver,  # type: ignore[type-var]
        timeout=fluent_wait,
    )
    page.actions = ActionChains(
        driver if isinstance(driver, WebDriver) else driver.wrapped_driver
    )
    page.fluent_wait = fluent_wait
    if str(gw_ip) not in page.driver.current_url:
        page.driver.get(f"https://{gw_ip}" if use_https else f"http://{gw_ip}")
    driver.set_page_load_timeout(fluent_wait * 4)
    if not isinstance(page, LoginPage):
        loginpage = LoginPage(page.driver, gw_ip)
        wait_until_loaded(loginpage)
        loginpage.login(user, password)
        return

    page.wait.until(page.is_page_loaded)
wait_until_loaded
wait_until_loaded(page: Any, timeout: int | None = None) -> bool

Wait until the page is loaded completely.

Generic method to wait upto fluent_wait time or upto timeout specified by the user until the page is loaded and return the status

Parameters:

Name Type Description Default
page Any

the page obj

required
timeout int | None

timeout in seconds to load the page, defaults to None

None

Returns:

Type Description
bool

True if page is loaded

Source code in boardfarm3/lib/gui/prplos/pages/page_helper.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def wait_until_loaded(page: Any, timeout: int | None = None) -> bool:  # noqa: ANN401
    """Wait until the page is loaded completely.

    Generic method to wait upto fluent_wait time or upto timeout specified
    by the user until the page is loaded and return the status

    :param page: the page obj
    :type page: Any
    :param timeout: timeout in seconds to load the page, defaults to None
    :type timeout: int | None
    :return: True if page is loaded
    :rtype: bool
    """
    if not timeout:
        timeout = page.fluent_wait
    timeout = int(time.time()) + timeout
    output = False
    while int(time.time()) <= timeout and not output:
        try:
            output = page.is_page_loaded(page.driver)  # pylint: disable=E1111
        except (  # noqa: PERF203
            NoSuchElementException,
            TimeoutException,
            StaleElementReferenceException,
        ):
            output = False
    if not output:
        save_screenshot(page.driver)
    return output
prplos_base_pom

PrplOS Base Page Oobject Module (POM).

Classes:

Name Description
PrplOSBasePOM

Contains objects that are common to all the pages of the PrplOS GUI.

PrplOSBasePOM
PrplOSBasePOM(
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
    use_https: bool = False,
)

Contains objects that are common to all the pages of the PrplOS GUI.

Initialize the class.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required
gw_ip str | IPv4Address

gateway ip address

required
fluent_wait int

timeout in seconds to load the page, defaults to 20

20
use_https bool

flag to specify whether http or https, defaults to false

False

Methods:

Name Description
__getattribute__

Take a screenshot and save to a folder upon TimeoutException.

click_networking_submenu

Ckick on WAN menu option.

is_page_loaded

Verify the page is completely loaded.

logout

Log out.

wait_until_loaded

Wait until the page is loaded completely.

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(
    self,
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
    use_https: bool = False,
) -> None:
    """Initialize the class.

    :param driver: webdriver instance
    :type driver: Union[WebDriver, EventFiringWebDriver]
    :param gw_ip: gateway ip address
    :type gw_ip: Union[str, IPv4Address]
    :param fluent_wait: timeout in seconds to load the page, defaults to 20
    :type fluent_wait: int
    :param use_https: flag to specify whether http or https, defaults to false
    :type use_https: bool
    """
    initialize(
        page=self,
        driver=driver,
        gw_ip=gw_ip,
        fluent_wait=fluent_wait,
        use_https=use_https,
    )
__getattribute__
__getattribute__(name: str) -> None

Take a screenshot and save to a folder upon TimeoutException.

Parameters:

Name Type Description Default
name str

the attribute

required

Returns:

Type Description
related to the attribute

the attribute value

Raises:

Type Description
TimeoutException

if the attr was not found

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __getattribute__(self, name: str) -> None:
    """Take a screenshot and save to a folder upon TimeoutException.

    :param name: the attribute
    :type name: str
    :return: the attribute value
    :rtype: related to the attribute
    :raises TimeoutException: if the attr was not found
    """
    try:
        return super().__getattribute__(name)
    except TimeoutException:
        now = datetime.now(UTC)
        sc_path = (
            "_".join(
                [
                    self.driver.screenshot_path,  # type: ignore[union-attr]
                    now.strftime("%Y%m%d_%H%M%S%f"),
                ],
            )
            + "_TimeoutException.png"
        )
        self.driver.get_screenshot_as_file(sc_path)
        _LOGGER.debug("Screenshot saved as '%s'", sc_path)
        raise
click_networking_submenu
click_networking_submenu(element: WebElement) -> None

Ckick on WAN menu option.

Parameters:

Name Type Description Default
element WebElement

the element to click

required
Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
115
116
117
118
119
120
121
122
123
124
def click_networking_submenu(self, element: WebElement) -> None:
    """Ckick on WAN menu option.

    :param element: the element to click
    :type element: WebElement
    """
    self.actions.move_to_element(  # type: ignore[attr-defined]
        element,
    ).pause(1).click().perform()
    time.sleep(1)
is_page_loaded
is_page_loaded(_driver: WebDriver | EventFiringWebDriver) -> bool

Verify the page is completely loaded.

Must be overridden in the derived class!

Raises:

Type Description
AttributeError

if not overriden

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
102
103
104
105
106
107
108
109
110
111
112
113
def is_page_loaded(
    self,
    _driver: WebDriver | EventFiringWebDriver,
) -> bool:
    """Verify the page is completely loaded.

    Must be overridden  in the derived class!

    :raises AttributeError: if not overriden
    """
    msg = "is_page_loaded must be defined in derived class"
    raise AttributeError(msg)
logout
logout() -> None

Log out.

All the pages derived from the POM have logged in!

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
126
127
128
129
130
131
132
def logout(self) -> None:
    """Log out.

    All the pages derived from the POM have logged in!
    """
    get_element_by_css(self, ".btn").click()
    get_element_by_css(self, ".logo")
wait_until_loaded
wait_until_loaded(timeout: int | None = None) -> bool

Wait until the page is loaded completely.

Generic method to wait upto fluent_wait time or upto timeout specified by the user until the page is loaded and return the status

Parameters:

Name Type Description Default
timeout int | None

timeout in seconds to load the page, defaults to None

None

Returns:

Type Description
bool

True if page is loaded, Otherwise False

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def wait_until_loaded(self, timeout: int | None = None) -> bool:
    """Wait until the page is loaded completely.

    Generic method to wait upto fluent_wait time or upto timeout specified
    by the user until the page is loaded and return the status

    :param timeout: timeout in seconds to load the page, defaults to None
    :type timeout: int | None, default None
    :return: True if page is loaded, Otherwise False
    :rtype: bool
    """
    return wait_until_loaded(self, timeout)
wan

PrplOS GUI WAN Page.

Classes:

Name Description
WANPage

Page Object for WAN page.

WANPage
WANPage(
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
)

Page Object for WAN page.

Initialize WANPage POM.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required
gw_ip str | IPv4Address

gateway ip address

required
fluent_wait int

timeout in seconds to load the page, defaults to 20

20

Methods:

Name Description
__getattribute__

Take a screenshot and save to a folder upon TimeoutException.

click_networking_submenu

Ckick on WAN menu option.

is_page_loaded

Verify the home page is completely loaded.

logout

Log out.

wait_until_loaded

Wait until the page is loaded completely.

Attributes:

Name Type Description
networking_wan_header WebElement

The WAN header element.

Source code in boardfarm3/lib/gui/prplos/pages/wan.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
) -> None:
    """Initialize WANPage POM.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :param gw_ip: gateway ip address
    :type gw_ip: str | IPv4Address
    :param fluent_wait: timeout in seconds to load the page, defaults to 20
    :type fluent_wait: int
    """
    super().__init__(driver, gw_ip, fluent_wait)
    try:
        if self.networking_wan_header.is_displayed():
            return  # Do not click open sub-menu if it is already open
    except NoSuchElementException:
        pass
    self.click_networking_submenu(get_element_by_css(self, _SUBMENU_CSS))
    self.wait.until(self.is_page_loaded)
networking_wan_header property
networking_wan_header: WebElement

The WAN header element.

Returns:

Type Description
WebElement

the web element

Raises:

Type Description
NoSuchElementException

if the correct header is not found

__getattribute__
__getattribute__(name: str) -> None

Take a screenshot and save to a folder upon TimeoutException.

Parameters:

Name Type Description Default
name str

the attribute

required

Returns:

Type Description
related to the attribute

the attribute value

Raises:

Type Description
TimeoutException

if the attr was not found

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __getattribute__(self, name: str) -> None:
    """Take a screenshot and save to a folder upon TimeoutException.

    :param name: the attribute
    :type name: str
    :return: the attribute value
    :rtype: related to the attribute
    :raises TimeoutException: if the attr was not found
    """
    try:
        return super().__getattribute__(name)
    except TimeoutException:
        now = datetime.now(UTC)
        sc_path = (
            "_".join(
                [
                    self.driver.screenshot_path,  # type: ignore[union-attr]
                    now.strftime("%Y%m%d_%H%M%S%f"),
                ],
            )
            + "_TimeoutException.png"
        )
        self.driver.get_screenshot_as_file(sc_path)
        _LOGGER.debug("Screenshot saved as '%s'", sc_path)
        raise
click_networking_submenu
click_networking_submenu(element: WebElement) -> None

Ckick on WAN menu option.

Parameters:

Name Type Description Default
element WebElement

the element to click

required
Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
115
116
117
118
119
120
121
122
123
124
def click_networking_submenu(self, element: WebElement) -> None:
    """Ckick on WAN menu option.

    :param element: the element to click
    :type element: WebElement
    """
    self.actions.move_to_element(  # type: ignore[attr-defined]
        element,
    ).pause(1).click().perform()
    time.sleep(1)
is_page_loaded
is_page_loaded(driver: WebDriver | EventFiringWebDriver) -> bool

Verify the home page is completely loaded.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required

Returns:

Type Description
bool

True if home page is loaded, Otherwise False

Source code in boardfarm3/lib/gui/prplos/pages/wan.py
51
52
53
54
55
56
57
58
59
60
61
def is_page_loaded(self, driver: WebDriver | EventFiringWebDriver) -> bool:
    """Verify the home page is completely loaded.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :return: True if home page is loaded, Otherwise False
    :rtype: bool
    """
    return self.networking_wan_header.is_displayed() and driver.execute_script(
        "return document.readyState == 'complete'"
    )
logout
logout() -> None

Log out.

All the pages derived from the POM have logged in!

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
126
127
128
129
130
131
132
def logout(self) -> None:
    """Log out.

    All the pages derived from the POM have logged in!
    """
    get_element_by_css(self, ".btn").click()
    get_element_by_css(self, ".logo")
wait_until_loaded
wait_until_loaded(timeout: int | None = None) -> bool

Wait until the page is loaded completely.

Generic method to wait upto fluent_wait time or upto timeout specified by the user until the page is loaded and return the status

Parameters:

Name Type Description Default
timeout int | None

timeout in seconds to load the page, defaults to None

None

Returns:

Type Description
bool

True if page is loaded, Otherwise False

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def wait_until_loaded(self, timeout: int | None = None) -> bool:
    """Wait until the page is loaded completely.

    Generic method to wait upto fluent_wait time or upto timeout specified
    by the user until the page is loaded and return the status

    :param timeout: timeout in seconds to load the page, defaults to None
    :type timeout: int | None, default None
    :return: True if page is loaded, Otherwise False
    :rtype: bool
    """
    return wait_until_loaded(self, timeout)
wifi

PrplOS GUI WiFi Page.

Classes:

Name Description
WiFiPage

Page Object for WiFi page.

WiFiPage
WiFiPage(
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
)

Page Object for WiFi page.

Initialize WiFiPage POM.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required
gw_ip str | IPv4Address

gateway ip address

required
fluent_wait int

timeout in seconds to load the page, defaults to 20

20

Methods:

Name Description
__getattribute__

Take a screenshot and save to a folder upon TimeoutException.

click_networking_submenu

Ckick on WAN menu option.

is_page_loaded

Verify the home page is completely loaded.

logout

Log out.

wait_until_loaded

Wait until the page is loaded completely.

Attributes:

Name Type Description
networking_wifi_header WebElement

The WiFi header element.

Source code in boardfarm3/lib/gui/prplos/pages/wifi.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    driver: WebDriver | EventFiringWebDriver,
    gw_ip: str | IPv4Address,
    fluent_wait: int = 20,
) -> None:
    """Initialize WiFiPage POM.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :param gw_ip: gateway ip address
    :type gw_ip: str | IPv4Address
    :param fluent_wait: timeout in seconds to load the page, defaults to 20
    :type fluent_wait: int
    """
    super().__init__(driver, gw_ip, fluent_wait)
    try:
        if self.networking_wifi_header.is_displayed():
            return  # Do not click open sub-menu if it is already open
    except NoSuchElementException:
        pass
    self.click_networking_submenu(get_element_by_css(self, _SUBMENU_CSS))
    self.wait.until(self.is_page_loaded)
networking_wifi_header property
networking_wifi_header: WebElement

The WiFi header element.

Returns:

Type Description
WebElement

the web element

Raises:

Type Description
NoSuchElementException

if not found

__getattribute__
__getattribute__(name: str) -> None

Take a screenshot and save to a folder upon TimeoutException.

Parameters:

Name Type Description Default
name str

the attribute

required

Returns:

Type Description
related to the attribute

the attribute value

Raises:

Type Description
TimeoutException

if the attr was not found

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __getattribute__(self, name: str) -> None:
    """Take a screenshot and save to a folder upon TimeoutException.

    :param name: the attribute
    :type name: str
    :return: the attribute value
    :rtype: related to the attribute
    :raises TimeoutException: if the attr was not found
    """
    try:
        return super().__getattribute__(name)
    except TimeoutException:
        now = datetime.now(UTC)
        sc_path = (
            "_".join(
                [
                    self.driver.screenshot_path,  # type: ignore[union-attr]
                    now.strftime("%Y%m%d_%H%M%S%f"),
                ],
            )
            + "_TimeoutException.png"
        )
        self.driver.get_screenshot_as_file(sc_path)
        _LOGGER.debug("Screenshot saved as '%s'", sc_path)
        raise
click_networking_submenu
click_networking_submenu(element: WebElement) -> None

Ckick on WAN menu option.

Parameters:

Name Type Description Default
element WebElement

the element to click

required
Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
115
116
117
118
119
120
121
122
123
124
def click_networking_submenu(self, element: WebElement) -> None:
    """Ckick on WAN menu option.

    :param element: the element to click
    :type element: WebElement
    """
    self.actions.move_to_element(  # type: ignore[attr-defined]
        element,
    ).pause(1).click().perform()
    time.sleep(1)
is_page_loaded
is_page_loaded(driver: WebDriver | EventFiringWebDriver) -> bool

Verify the home page is completely loaded.

Parameters:

Name Type Description Default
driver WebDriver | EventFiringWebDriver

webdriver instance

required

Returns:

Type Description
bool

True if home page is loaded

Source code in boardfarm3/lib/gui/prplos/pages/wifi.py
51
52
53
54
55
56
57
58
59
60
61
def is_page_loaded(self, driver: WebDriver | EventFiringWebDriver) -> bool:
    """Verify the home page is completely loaded.

    :param driver: webdriver instance
    :type driver: WebDriver | EventFiringWebDriver
    :return: True if home page is loaded
    :rtype: bool
    """
    return self.networking_wifi_header.is_displayed() and driver.execute_script(
        "return document.readyState == 'complete'"
    )
logout
logout() -> None

Log out.

All the pages derived from the POM have logged in!

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
126
127
128
129
130
131
132
def logout(self) -> None:
    """Log out.

    All the pages derived from the POM have logged in!
    """
    get_element_by_css(self, ".btn").click()
    get_element_by_css(self, ".logo")
wait_until_loaded
wait_until_loaded(timeout: int | None = None) -> bool

Wait until the page is loaded completely.

Generic method to wait upto fluent_wait time or upto timeout specified by the user until the page is loaded and return the status

Parameters:

Name Type Description Default
timeout int | None

timeout in seconds to load the page, defaults to None

None

Returns:

Type Description
bool

True if page is loaded, Otherwise False

Source code in boardfarm3/lib/gui/prplos/pages/prplos_base_pom.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def wait_until_loaded(self, timeout: int | None = None) -> bool:
    """Wait until the page is loaded completely.

    Generic method to wait upto fluent_wait time or upto timeout specified
    by the user until the page is loaded and return the status

    :param timeout: timeout in seconds to load the page, defaults to None
    :type timeout: int | None, default None
    :return: True if page is loaded, Otherwise False
    :rtype: bool
    """
    return wait_until_loaded(self, timeout)

hal

Hardware abstraction layers for different CPE components.

Modules:

Name Description
cpe_wifi

CPE WiFi HAL class.

cpe_wifi

CPE WiFi HAL class.

Classes:

Name Description
WiFiHal

Contain CPE Wi-Fi software methods.

WiFiHal

Contain CPE Wi-Fi software methods.

Methods:

Name Description
enable_wifi

Use Wifi Hal API to enable the wifi if not already enabled.

get_bssid

Get the wifi Basic Service Set Identifier.

get_passphrase

Get the passphrase for a network on an interface.

get_ssid

Get the wifi ssid for the wlan client with specific network and band.

is_wifi_enabled

Check if specific wifi is enabled.

Attributes:

Name Type Description
wlan_ifaces dict[str, dict[str, str]]

Get all the wlan interfaces on board.

wlan_ifaces abstractmethod property
wlan_ifaces: dict[str, dict[str, str]]

Get all the wlan interfaces on board.

Returns:

Type Description
dict[str, dict[str, str]]

interfaces e.g. private/guest/community

enable_wifi abstractmethod
enable_wifi(network: str, band: str) -> tuple[str, str, str]

Use Wifi Hal API to enable the wifi if not already enabled.

Parameters:

Name Type Description Default
network
str

network type(private/guest/community)

required
band
str

wifi band(5/2.4 GHz)

required

Returns:

Type Description
tuple[str, str, str]

tuple of ssid,bssid,passphrase

Source code in boardfarm3/lib/hal/cpe_wifi.py
71
72
73
74
75
76
77
78
79
80
81
82
@abstractmethod
def enable_wifi(self, network: str, band: str) -> tuple[str, str, str]:
    """Use Wifi Hal API to enable the wifi if not already enabled.

    :param network: network type(private/guest/community)
    :type network: str
    :param band: wifi band(5/2.4 GHz)
    :type band: str
    :return: tuple of ssid,bssid,passphrase
    :rtype: tuple[str, str, str]
    """
    raise NotImplementedError
get_bssid abstractmethod
get_bssid(network: str, band: str) -> str | None

Get the wifi Basic Service Set Identifier.

Parameters:

Name Type Description Default
network
str

network type(private/guest/community)

required
band
str

wifi band(5/2.4 GHz)

required

Returns:

Type Description
Optional[str]

MAC physical address of the access point

Source code in boardfarm3/lib/hal/cpe_wifi.py
34
35
36
37
38
39
40
41
42
43
44
45
@abstractmethod
def get_bssid(self, network: str, band: str) -> str | None:
    """Get the wifi Basic Service Set Identifier.

    :param network: network type(private/guest/community)
    :type network: str
    :param band: wifi band(5/2.4 GHz)
    :type band: str
    :return: MAC physical address of the access point
    :rtype: Optional[str]
    """
    raise NotImplementedError
get_passphrase abstractmethod
get_passphrase(iface: str) -> str

Get the passphrase for a network on an interface.

Parameters:

Name Type Description Default
iface
str

name of the interface

required

Returns:

Type Description
str

encrypted password for a network

Source code in boardfarm3/lib/hal/cpe_wifi.py
47
48
49
50
51
52
53
54
55
56
@abstractmethod
def get_passphrase(self, iface: str) -> str:
    """Get the passphrase for a network on an interface.

    :param iface: name of the interface
    :type iface: str
    :return: encrypted password for a network
    :rtype: str
    """
    raise NotImplementedError
get_ssid abstractmethod
get_ssid(network: str, band: str) -> str | None

Get the wifi ssid for the wlan client with specific network and band.

Parameters:

Name Type Description Default
network
str

network type(private/guest/community)

required
band
str

wifi band(5/2.4 GHz)

required

Returns:

Type Description
Optional[str]

SSID of the WiFi for a given network type and band

Source code in boardfarm3/lib/hal/cpe_wifi.py
21
22
23
24
25
26
27
28
29
30
31
32
@abstractmethod
def get_ssid(self, network: str, band: str) -> str | None:
    """Get the wifi ssid for the wlan client with specific network and band.

    :param network: network type(private/guest/community)
    :type network: str
    :param band: wifi band(5/2.4 GHz)
    :type band: str
    :return: SSID of the WiFi for a given network type and band
    :rtype: Optional[str]
    """
    raise NotImplementedError
is_wifi_enabled abstractmethod
is_wifi_enabled(network_type: str, band: str) -> bool

Check if specific wifi is enabled.

Parameters:

Name Type Description Default
network_type
str

network type(private/guest/community)

required
band
str

wifi band(5/2.4 GHz)

required

Returns:

Type Description
bool

True if enabled, False otherwise

Source code in boardfarm3/lib/hal/cpe_wifi.py
58
59
60
61
62
63
64
65
66
67
68
69
@abstractmethod
def is_wifi_enabled(self, network_type: str, band: str) -> bool:
    """Check if specific wifi is enabled.

    :param network_type: network type(private/guest/community)
    :type network_type: str
    :param band: wifi band(5/2.4 GHz)
    :type band: str
    :return: True if enabled, False otherwise
    :rtype: bool
    """
    raise NotImplementedError

interactive_shell

Boardfarm interactive shell module.

Classes:

Name Description
OptionsTable

Boardfarm interactive console options table.

Functions:

Name Description
get_interactive_console_options

Return options table with all boardfarm interactive shell options.

OptionsTable

OptionsTable(title: str | None = None)

Boardfarm interactive console options table.

Initialize the OptionsTable.

Parameters:

Name Type Description Default

title

str | None

title of the table, defaults to None

None

Methods:

Name Description
add_column

Add a table column.

add_option

Add an action item to the table.

show_table

Show table and perform actions based on user input.

Source code in boardfarm3/lib/interactive_shell.py
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(self, title: str | None = None) -> None:
    """Initialize the OptionsTable.

    :param title: title of the table, defaults to None
    :type title: Optional[str], optional
    """
    self._table = Table(
        title=title,
        title_justify="center",
        box=HORIZONTALS,
        show_lines=True,
    )
    self._actions: dict[str, tuple[Callable, tuple[Any, ...], dict[str, Any]]] = {}

add_column

add_column(
    name: str,
    justify: JustifyMethod | None = None,
    style: str | None = None,
    width: int | None = None,
) -> None

Add a table column.

Parameters:

Name Type Description Default
name
str

name of the column

required
justify
JustifyMethod | None

column content position, defaults to None

None
style
str | None

table content style, defaults to None

None
width
int | None

column width, defaults to None

None
Source code in boardfarm3/lib/interactive_shell.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def add_column(
    self,
    name: str,
    justify: JustifyMethod | None = None,
    style: str | None = None,
    width: int | None = None,
) -> None:
    """Add a table column.

    :param name: name of the column
    :type name: str
    :param justify: column content position, defaults to None
    :type justify: JustifyMethod, optional
    :param style: table content style, defaults to None
    :type style: str, optional
    :param width: column width, defaults to None
    :type width: Optional[int], optional
    """
    self._table.add_column(name, justify=justify, style=style, width=width)

add_option

add_option(
    column_data: tuple[str, ...],
    function: Callable,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
) -> None

Add an action item to the table.

Parameters:

Name Type Description Default
column_data
tuple[str, ...]

table column data

required
function
Callable

function to be called for this option

required
args
tuple[Any, ...]

positional arguments to the function

required
kwargs
dict[str, Any]

keyword arguments to the function

required
Source code in boardfarm3/lib/interactive_shell.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def add_option(
    self,
    column_data: tuple[str, ...],
    function: Callable,
    args: tuple[Any, ...],
    kwargs: dict[str, Any],
) -> None:
    """Add an action item to the table.

    :param column_data: table column data
    :type column_data: tuple[str, ...]
    :param function: function to be called for this option
    :type function: Callable
    :param args: positional arguments to the function
    :type args: Tuple[Any, ...]
    :param kwargs: keyword arguments to the function
    :type kwargs: Dict[str, Any]
    """
    self._table.add_row(*column_data)
    self._actions[column_data[0]] = (function, args, kwargs)

show_table

show_table(exit_option: str, exit_option_description: str) -> None

Show table and perform actions based on user input.

Parameters:

Name Type Description Default
exit_option
str

exit option name

required
exit_option_description
str

exit option description

required
Source code in boardfarm3/lib/interactive_shell.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def show_table(self, exit_option: str, exit_option_description: str) -> None:
    """Show table and perform actions based on user input.

    :param exit_option: exit option name
    :type exit_option: str
    :param exit_option_description: exit option description
    :type exit_option_description: str
    """
    self._table.add_row(exit_option, exit_option_description)
    rich_print(self._table)
    while (
        option := Prompt.ask(
            "Enter your choice:",
            choices=[*list(self._actions.keys()), exit_option],
        )
    ) != exit_option:
        self._actions[option][0](
            *self._actions[option][1],
            **self._actions[option][2],
        )
        rich_print(self._table)

get_interactive_console_options

get_interactive_console_options(
    device_manager: DeviceManager, cmdline_args: Namespace
) -> OptionsTable

Return options table with all boardfarm interactive shell options.

Parameters:

Name Type Description Default

device_manager

DeviceManager

device manager instance

required

cmdline_args

Namespace

command line arguments

required

Returns:

Type Description
OptionsTable

option table with boardfarm interactive shell options

Source code in boardfarm3/lib/interactive_shell.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def get_interactive_console_options(
    device_manager: DeviceManager,
    cmdline_args: Namespace,
) -> OptionsTable:
    """Return options table with all boardfarm interactive shell options.

    :param device_manager: device manager instance
    :type device_manager: DeviceManager
    :param cmdline_args: command line arguments
    :type cmdline_args: Namespace
    :return: option table with boardfarm interactive shell options
    :rtype: OptionsTable
    """
    table = OptionsTable("BOARDFARM INTERACTIVE SHELL")
    table.add_column("Choice", justify="center", style="cyan")
    table.add_column("Description", style="magenta")
    table.add_column("Consoles", justify="center")
    for option in _get_device_console_options(device_manager):
        table.add_option(*option)
    table.add_option(
        ("p", "python interactive shell (ptpython)"),
        _interactive_ptpython_shell,
        (cmdline_args, device_manager),
        {},
    )
    if entry_points(group="pytest11", name="pytest_boardfarm"):
        table.add_option(
            ("e", "execute boardfarm automated test(s)"),
            _run_boardfarm_tests,
            (),
            {},
        )
    if cmdline_args.save_console_logs:
        table.add_option(
            ("m", "add custom marker in console logs"),
            _add_session_marker_in_console_logs,
            (),
            {},
        )
    return table

mibs_compiler

MIBs to JSON compiler module.

Classes:

Name Description
MibsCompiler

MIBs to JSON compiler class.

MibsCompiler

MibsCompiler(mibs_dirs: list[str])

MIBs to JSON compiler class.

Initialize MIBs to JSON compiler.

Parameters:

Name Type Description Default

mibs_dirs

list[str]

mibs directories

required

Methods:

Name Description
get_mib_oid

Get OID of given MIB.

Source code in boardfarm3/lib/mibs_compiler.py
23
24
25
26
27
28
29
def __init__(self, mibs_dirs: list[str]):
    """Initialize MIBs to JSON compiler.

    :param mibs_dirs: mibs directories
    """
    self._mibs_dict: dict[str, dict] = {}
    self._compile_mibs(mibs_dirs)

get_mib_oid

get_mib_oid(mib_name: str) -> str

Get OID of given MIB.

Parameters:

Name Type Description Default
mib_name
str

MIB name

required

Returns:

Type Description
str

OID of the given MIB

Raises:

Type Description
ValueError

when unable to find given mib

Source code in boardfarm3/lib/mibs_compiler.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def get_mib_oid(self, mib_name: str) -> str:
    """Get OID of given MIB.

    :param mib_name: MIB name
    :returns: OID of the given MIB
    :raises ValueError: when unable to find given mib
    """
    if (
        mib_name in self._mibs_dict
        and self._mibs_dict.get(mib_name, None)
        and "oid" in self._mibs_dict[mib_name]
    ):
        return self._mibs_dict[mib_name]["oid"]
    msg = f"Unable to find OID of {mib_name!r} MIB"
    raise ValueError(msg)

multicast

Multicast library.

Classes:

Name Description
IPerfResult

Store results of IPerf server session.

IPerfSession

Store details of IPerf session.

IPerfStream

Store details of IPerf stream.

Multicast

Multicast device component.

MulticastGroupRecordType

IGMPv3 Record Types.

IPerfResult dataclass

IPerfResult(_data: DataFrame | None)

Store results of IPerf server session.

Attributes:

Name Type Description
bandwidth str | None

Return resultant bandwidth in Mbps.

result DataFrame | None

Return the entire result as a dataframe.

total_loss str | None

Return no. of datagrams lost.

bandwidth property

bandwidth: str | None

Return resultant bandwidth in Mbps.

Returns:

Type Description
Optional[str]

resultant bandwidth, None if iperf failed

result property

result: DataFrame | None

Return the entire result as a dataframe.

Returns:

Type Description
Optional[pandas.DataFrame]

iperf result in tablular format, None if iperf failed

total_loss property

total_loss: str | None

Return no. of datagrams lost.

Returns:

Type Description
Optional[str]

resultant total loss, None if iperf failed

IPerfSession dataclass

IPerfSession(device: IperfDevice, pid: str, address: str, port: int, output_file: str)

Store details of IPerf session.

IPerfStream dataclass

IPerfStream(
    device: IperfDevice,
    pid: str,
    address: str,
    port: int,
    output_file: str,
    time: int = 0,
)

Store details of IPerf stream.

Multicast

Multicast(
    device_name: str, iface_dut: str, console: BoardfarmPexpect, shell_prompt: list[str]
)

Multicast device component.

Initialize multicast component.

Parameters:

Name Type Description Default

device_name

str

device name

required

iface_dut

str

DUT interface name

required

console

BoardfarmPexpect

console instance of the device

required

shell_prompt

list[str]

shell prompt patterns

required

Methods:

Name Description
join_iperf_multicast_asm_group

Start an iperf server binding to a multicast address in background.

join_iperf_multicast_ssm_group

Start an iperf server binding to a multicast address in background.

kill_all_iperf_sessions

Kill all iperf sessions.

leave_iperf_multicast_group

Send IGMP leave to stop receiving multicast traffic.

parse_mcast_trace

Compare captured PCAP file against an expected sequence of packets.

send_igmpv3_report

Send an IGMPv3 report with desired multicast record.

send_mldv2_report

Send an MLDv2 report with desired multicast record.

start_iperf_multicast_stream

Start an iperf client sending data on multicast address in background.

wait_for_multicast_stream_to_end

Wait for all multicast streams to end.

Attributes:

Name Type Description
gateway_mac_addr str

Return the L2 address of DUT gateway from ARP table.

Source code in boardfarm3/lib/multicast.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(
    self,
    device_name: str,
    iface_dut: str,
    console: BoardfarmPexpect,
    shell_prompt: list[str],
) -> None:
    """Initialize multicast component.

    :param device_name: device name
    :type device_name: str
    :param iface_dut: DUT interface name
    :type iface_dut: str
    :param console: console instance of the device
    :type console: BoardfarmPexpect
    :param shell_prompt: shell prompt patterns
    :type shell_prompt: list[str]
    """
    self._console = console
    self._iface_dut = iface_dut
    self._shell_prompt = shell_prompt
    self._device_name = device_name

gateway_mac_addr property

gateway_mac_addr: str

Return the L2 address of DUT gateway from ARP table.

Returns:

Type Description
str

MAC address in string format.

join_iperf_multicast_asm_group

join_iperf_multicast_asm_group(multicast_group_addr: str, port: int) -> IPerfSession

Start an iperf server binding to a multicast address in background.

This use case is applicable for ASM (any-source multicast) channels (*,G) # noqa: RST213 - false positive

Session will have the following parameters by default: - 1s interval between periodic bandwidth, jitter, and loss reports.

The Use Case will return an Iperf Session object holding following info: - Target device class object on which iperf command is executed - PID of the iperf session - Multicast group address - Multicast port - CSV output file of the iperf session

.. note::

- CSV output file can only be accessed once you leave the multicast group.

.. hint:: This Use Case implements statements from the test suite such as:

- Start an Ethernet LAN client to request CPE for
  IPv4 ASM traffic from WAN multicast server
- Start client to join/subscribe any source multicast channel (S,G)
  by sending IGMPv3 report

Parameters:

Name Type Description Default
multicast_group_addr
str

multicast stream's group IP address to join

required
port
int

multicast stream's port number

required

Returns:

Type Description
IPerfSession

object holding data on the IPerf Session

Source code in boardfarm3/lib/multicast.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def join_iperf_multicast_asm_group(
    self,
    multicast_group_addr: str,
    port: int,
) -> IPerfSession:
    """Start an iperf server binding to a multicast address in background.

    This use case is applicable for ASM (any-source multicast)
    channels (*,G)  # noqa: RST213 - false positive

    Session will have the following parameters by default:
        - 1s interval between periodic bandwidth, jitter,
          and loss reports.

    The Use Case will return an Iperf Session object holding
    following info:
    - Target device class object on which iperf command is executed
    - PID of the iperf session
    - Multicast group address
    - Multicast port
    - CSV output file of the iperf session

    .. note::

        - CSV output file can only be accessed once you leave the multicast group.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an Ethernet LAN client to request CPE for
          IPv4 ASM traffic from WAN multicast server
        - Start client to join/subscribe any source multicast channel (S,G)
          by sending IGMPv3 report

    :param multicast_group_addr: multicast stream's group IP address to join
    :type multicast_group_addr: str
    :param port: multicast stream's port number
    :type port: int
    :return: object holding data on the IPerf Session
    :rtype: IPerfSession
    """
    ipv6_flag = (
        "-V" if isinstance(ip_address(multicast_group_addr), IPv6Address) else ""
    )
    # Cannot have any iperf session running for the same mul
    self._iperf_session_check(multicast_group_addr, port)
    fname = f"mclient_{port}.txt"
    # run iperf, format result as CSV
    self._console.execute_command(
        f"iperf {ipv6_flag} -s -f m -u -U -p {port} "
        f"-B {multicast_group_addr} "
        f"-i 1 -y C > {fname} &",
    )
    pid = self._console.execute_command(
        f"pgrep iperf -a | grep {port} | awk '{{print$1}}'",
    )
    return IPerfSession(None, pid, multicast_group_addr, port, fname)

join_iperf_multicast_ssm_group

join_iperf_multicast_ssm_group(
    multicast_source_addr: str, multicast_group_addr: str, port: int
) -> IPerfSession

Start an iperf server binding to a multicast address in background.

This use case is applicable for SSM (source-specific multicast) channels (S,G)

Session will have the following parameters by default: - 1s interval between periodic bandwidth, jitter, and loss reports.

The Use Case will return an Iperf Session object holding following info: - Target device class object on which iperf command is executed - PID of the iperf session - Multicast group address - Multicast port - CSV output file of the iperf session

.. note::

- The multicast source will always be a WAN device.
- CSV output file can only be accessed once you leave the multicast group.

.. hint:: This Use Case implements statements from the test suite such as:

- Start an Ethernet LAN client to request CPE for
  IPv4 SSM traffic from WAN multicast server
- Start client to join/subscribe a specific source and Group (S,G)
  channel by sending IGMPv3 report

Parameters:

Name Type Description Default
multicast_source_addr
str

WAN IP address used to run the mcast stream

required
multicast_group_addr
str

multicast stream's group IP address to join

required
port
int

multicast stream's port number

required

Returns:

Type Description
IPerfSession

object holding data on the IPerf Session

Source code in boardfarm3/lib/multicast.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def join_iperf_multicast_ssm_group(
    self,
    multicast_source_addr: str,
    multicast_group_addr: str,
    port: int,
) -> IPerfSession:
    """Start an iperf server binding to a multicast address in background.

    This use case is applicable for SSM (source-specific multicast) channels (S,G)

    Session will have the following parameters by default:
        - 1s interval between periodic bandwidth, jitter,
          and loss reports.

    The Use Case will return an Iperf Session object holding
    following info:
    - Target device class object on which iperf command is executed
    - PID of the iperf session
    - Multicast group address
    - Multicast port
    - CSV output file of the iperf session

    .. note::

        - The multicast source will always be a WAN device.
        - CSV output file can only be accessed once you leave the multicast group.

    .. hint:: This Use Case implements statements from the test suite such as:

        - Start an Ethernet LAN client to request CPE for
          IPv4 SSM traffic from WAN multicast server
        - Start client to join/subscribe a specific source and Group (S,G)
          channel by sending IGMPv3 report

    :param multicast_source_addr: WAN IP address used to run the mcast stream
    :type multicast_source_addr: str
    :param multicast_group_addr: multicast stream's group IP address to join
    :type multicast_group_addr: str
    :param port: multicast stream's port number
    :type port: int
    :return: object holding data on the IPerf Session
    :rtype: IPerfSession
    """
    ipv6_flag = (
        "-V"
        if isinstance(ip_address(multicast_source_addr), IPv6Address)
        and isinstance(ip_address(multicast_group_addr), IPv6Address)
        else ""
    )
    # Cannot have any iperf session running for the same mul
    self._iperf_session_check(multicast_group_addr, port)
    fname = f"mclient_{port}.txt"
    # run iperf, format result as CSV
    self._console.execute_command(
        f"iperf {ipv6_flag} -s -f m -u -U -p {port} "
        f"-B {multicast_group_addr} --ssm-host {multicast_source_addr} "
        f"-i 1 -y C > {fname} &",
    )
    pid = self._console.execute_command(
        f"pgrep iperf -a | grep {port} | awk '{{print$1}}'",
    )
    return IPerfSession(None, pid, multicast_group_addr, port, fname)

kill_all_iperf_sessions

kill_all_iperf_sessions() -> None

Kill all iperf sessions.

Source code in boardfarm3/lib/multicast.py
134
135
136
137
138
def kill_all_iperf_sessions(self) -> None:
    """Kill all iperf sessions."""
    self._console.sendcontrol("c")
    self._console.expect(self._shell_prompt)
    self._console.execute_command("for i in $(pgrep iperf); do kill -9 $i; done")

leave_iperf_multicast_group

leave_iperf_multicast_group(session: IPerfSession) -> IPerfResult

Send IGMP leave to stop receiving multicast traffic.

This is achieved by stopping the iperf server bounded to a multicast channel ASM/SSM.

Executes a kill -15 on target device. In case of IGMPv3, will send a block old sources membership report.

Parameters:

Name Type Description Default
session
IPerfSession

session object created during the join

required

Returns:

Type Description
IPerfResult

IPerf results object

Raises:

Type Description
MulticastError

when iperf session deosn't exit

Source code in boardfarm3/lib/multicast.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def leave_iperf_multicast_group(self, session: IPerfSession) -> IPerfResult:
    """Send IGMP leave to stop receiving multicast traffic.

    This is achieved by stopping the iperf server bounded
    to a multicast channel ASM/SSM.

    Executes a kill -15 <iperf session pid> on target device.
    In case of IGMPv3, will send a block old sources membership report.

    :param session: session object created during the join
    :type session: IPerfSession
    :raises MulticastError: when iperf session deosn't exit
    :return: IPerf results object
    :rtype: IPerfResult
    """
    if not self._console.execute_command(
        f"pgrep iperf -a | grep {session.port}| grep {session.address}",
    ):
        # Something is wrong, there should be a process ID always.
        msg = (
            f"iperf session with port {session.port} and {session.address} "
            f"multicast group does not exist on {self._device_name}"
        )
        raise MulticastError(
            msg,
        )
    # kill -15 iperf session
    self._console.execute_command(f"kill -15 {session.pid}")
    output = self._console.execute_command(f"cat {session.output_file}")
    # remove the file after reading results
    self._console.execute_command(f"rm {session.output_file}")
    if not output.strip():
        return IPerfResult(None)

    csv = pd.read_csv(StringIO(output.strip()))
    cols = [
        "timestamp",
        "source_address",
        "source_port",
        "destination_address",
        "destination_port",
        "id",
        "interval",
        "transferred_bytes",
        "bandwidth",
        "jitter",
        "lost",
        "total",
    ]
    return IPerfResult(pd.DataFrame(csv.iloc[:, :-2].values, columns=cols))

parse_mcast_trace

parse_mcast_trace(
    fname: str, expected_sequence: list[tuple[str, ...]], ip_version: int = 4
) -> list[tuple[str, ...]]

Compare captured PCAP file against an expected sequence of packets.

This returns a matched subset of the whole packet trace. The sequence of the matched packets must align with expected sequence. The length of the matched sequence is equal to expected sequence.

In case a packet is missing in captured sequence, an empty value is maintained in output at the same index as that of the expected sequence.

IP packets in expected sequence must follow the following order:

- IP source
- IP destination
- MAC source
- MAC destination
- IP protocol number (1 - ICMP, 2 - IGMP, 6 - TCP, 17 - UDP)
- IGMP version (v3 by default)
- IGMP Record Type number (5 - Allow new sources, 6 - Block old sources)
- IGMP Multicast Address (if provided in group records)
- IGMP Source Address (if provided in group records)

IPv6 packets will be parsed and following values are returned in a list:

- IPv6 source
- IPv6 destination
- MAC source
- MAC destination
- IPv6 Next Header (0 - ICMPv6, 6 - TCP, 17 - UDP)
- MLDv2 version (130 - MLDv2 Query, 143 - MLDv2 Report)
- MLDv2 Record Type number (5 - Allow new sources, 6 - Block old sources)
- MLDv2 Multicast Address (if provided in group records)
- MLDv2 Source Address (if provided in group records)

You can use * to mark a field as Any

.. hint:: This Use Case assists in validating statements from the test suite such as:

- Check IGMPv3 report to subscribe to (S,G) from LAN on eRouter
  LAN interface
- Check Multicast traffic from WAN multicast server is received
  on eRouter WAN interface and forwarded to Ethernet LAN client

Parameters:

Name Type Description Default
fname
str

name of the pcap file

required
expected_sequence
list[tuple[str, ...]]

expected sequence to match against captured sequence

required
ip_version
int

IP version, defaults to 4

4

Returns:

Type Description
list[tuple[str, ...]]

matched captured sequence against the expected sequence

Raises:

Type Description
ValueError

when given ip version is invalid

Source code in boardfarm3/lib/multicast.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def parse_mcast_trace(
    self,
    fname: str,
    expected_sequence: list[tuple[str, ...]],
    ip_version: int = 4,
) -> list[tuple[str, ...]]:
    """Compare captured PCAP file against an expected sequence of packets.

    This returns a matched subset of the whole packet trace.
    The sequence of the matched packets must align with expected sequence.
    The length of the matched sequence is equal to expected sequence.

    In case a packet is missing in captured sequence, an empty value is
    maintained in output at the same index as that of the expected sequence.

    IP packets in expected sequence must follow the following order:

        - IP source
        - IP destination
        - MAC source
        - MAC destination
        - IP protocol number (1 - ICMP, 2 - IGMP, 6 - TCP, 17 - UDP)
        - IGMP version (v3 by default)
        - IGMP Record Type number (5 - Allow new sources, 6 - Block old sources)
        - IGMP Multicast Address (if provided in group records)
        - IGMP Source Address (if provided in group records)

    IPv6 packets will be parsed and following values are returned in a list:

        - IPv6 source
        - IPv6 destination
        - MAC source
        - MAC destination
        - IPv6 Next Header (0 - ICMPv6, 6 - TCP, 17 - UDP)
        - MLDv2 version (130 - MLDv2 Query, 143 - MLDv2 Report)
        - MLDv2 Record Type number (5 - Allow new sources, 6 - Block old sources)
        - MLDv2 Multicast Address (if provided in group records)
        - MLDv2 Source Address (if provided in group records)

    You can use * to mark a field as Any

    .. hint:: This Use Case assists in validating statements from the
       test suite such as:

        - Check IGMPv3 report to subscribe to (S,G) from LAN on eRouter
          LAN interface
        - Check Multicast traffic from WAN multicast server is received
          on eRouter WAN interface and forwarded to Ethernet LAN client

    :param fname: name of the pcap file
    :type fname: str
    :param expected_sequence: expected sequence to match against captured sequence
    :type expected_sequence: list[tuple[str, ...]]
    :param ip_version: IP version, defaults to 4
    :type ip_version: int
    :raises ValueError: when given ip version is invalid
    :return: matched captured sequence against the expected sequence
    :rtype: list[tuple[str, ...]]
    """
    if ip_version == 4:  # noqa: PLR2004
        captured_sequence = self._read_mcast_ipv4_trace(fname)
    elif ip_version == 6:  # noqa: PLR2004
        captured_sequence = self._read_mcast_ipv6_trace(fname)
    else:
        msg = f"Invalid IP version: {ip_version}"
        raise ValueError(msg)
    last_check = 0
    final_result = []
    for packet in expected_sequence:
        for i in range(last_check, len(captured_sequence)):
            if all(
                expected == actual
                for expected, actual in zip(packet, captured_sequence[i])
                if expected != "*"
            ):
                last_check = i
                _LOGGER.debug(
                    "Verified IP Multicast: %s--->%s, MAC: %s--->%s",
                    packet[0],
                    packet[1],
                    packet[2],
                    packet[3],
                )
                final_result.append(captured_sequence[i])
                break
        else:
            _LOGGER.debug(
                "Failed IP Multicast verification: %s--->%s, MAC: %s--->%s",
                packet[0],
                packet[1],
                packet[2],
                packet[3],
            )
            final_result.append(())
    return final_result

send_igmpv3_report

send_igmpv3_report(mcast_group_record: MulticastGroupRecord, count: int) -> None

Send an IGMPv3 report with desired multicast record.

Multicast source and group must be IPv4 addresses. Multicast sources need to be non-multicast addresses and group address needs to be a multicast address.

Implementation relies on a custom send_igmp_report script based on scapy.

Parameters:

Name Type Description Default
mcast_group_record
MulticastGroupRecord

IGMPv3 multicast group record

required
count
int

num of packets to send in 1s interval

required

Raises:

Type Description
MulticastError

when failed to execute send_mld_report command

Source code in boardfarm3/lib/multicast.py
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def send_igmpv3_report(
    self,
    mcast_group_record: MulticastGroupRecord,
    count: int,
) -> None:
    """Send an IGMPv3 report with desired multicast record.

    Multicast source and group must be IPv4 addresses.
    Multicast sources need to be non-multicast addresses and
    group address needs to be a multicast address.

    Implementation relies on a custom send_igmp_report
    script based on scapy.

    :param mcast_group_record: IGMPv3 multicast group record
    :type mcast_group_record: MulticastGroupRecord
    :param count: num of packets to send in 1s interval
    :type count: int
    :raises MulticastError: when failed to execute send_mld_report command
    """
    command = f"send_igmp_report -i {self._iface_dut} -c {count}"
    output = self._send_multicast_report(command, mcast_group_record)
    if f"Sent {count} packets" not in output:
        msg = f"Failed to execute send_mld_report command:\n{output}"
        raise MulticastError(
            msg,
        )

send_mldv2_report

send_mldv2_report(mcast_group_record: MulticastGroupRecord, count: int) -> None

Send an MLDv2 report with desired multicast record.

Multicast source and group must be IPv6 addresses. Multicast sources need to be non-multicast addresses and group address needs to be a multicast address.

Implementation relies on a custom send_mld_report script based on scapy.

Parameters:

Name Type Description Default
mcast_group_record
MulticastGroupRecord

MLDv2 multicast group record

required
count
int

num of packets to send in 1s interval

required

Raises:

Type Description
MulticastError

when failed to execute send_mld_report command

Source code in boardfarm3/lib/multicast.py
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
def send_mldv2_report(
    self,
    mcast_group_record: MulticastGroupRecord,
    count: int,
) -> None:
    """Send an MLDv2 report with desired multicast record.

    Multicast source and group must be IPv6 addresses.
    Multicast sources need to be non-multicast addresses and
    group address needs to be a multicast address.

    Implementation relies on a custom send_mld_report
    script based on scapy.

    :param mcast_group_record: MLDv2 multicast group record
    :type mcast_group_record: MulticastGroupRecord
    :param count: num of packets to send in 1s interval
    :type count: int
    :raises MulticastError: when failed to execute send_mld_report command
    """
    command = f"send_mld_report -i {self._iface_dut} -c {count}"
    output = self._send_multicast_report(command, mcast_group_record)
    if f"Sent {count} packets" not in output:
        msg = f"Failed to execute send_mld_report command:\n{output}"
        raise MulticastError(
            msg,
        )

start_iperf_multicast_stream

start_iperf_multicast_stream(
    multicast_group_addr: str, port: int, time: int, bit_rate: float
) -> IPerfStream

Start an iperf client sending data on multicast address in background.

Session will have the following parameters by default: - TTL value set to 5

.. hint:: This Use Case implements statements from the test suite such as:

- Start multicast server on WAN network to provide
  the multicast traffic in unreserved multicast group IP
  range 232.0.0.0/8
- Start multicast server on WAN network to provide
  the multicast traffic in unreserved multicast group IP
  range FF38::8000:0/96
- Start multicast stream on a specific Group channel
  by sending IGMPv3 report

Parameters:

Name Type Description Default
multicast_group_addr
str

multicast stream's group IP address

required
port
int

multicast stream's port number

required
time
int

total time the session should run for

required
bit_rate
float

bit_rate of data to be sent (in Mbps)

required

Returns:

Type Description
IPerfStream

object holding data on the IPerf stream.

Source code in boardfarm3/lib/multicast.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
def start_iperf_multicast_stream(
    self,
    multicast_group_addr: str,
    port: int,
    time: int,
    bit_rate: float,
) -> IPerfStream:
    """Start an iperf client sending data on multicast address in background.

    Session will have the following parameters by default:
        - TTL value set to 5

    .. hint:: This Use Case implements statements from the
       test suite such as:

        - Start multicast server on WAN network to provide
          the multicast traffic in unreserved multicast group IP
          range 232.0.0.0/8
        - Start multicast server on WAN network to provide
          the multicast traffic in unreserved multicast group IP
          range FF38::8000:0/96
        - Start multicast stream on a specific Group channel
          by sending IGMPv3 report

    :param multicast_group_addr: multicast stream's group IP address
    :type multicast_group_addr: str
    :param port: multicast stream's port number
    :type port: int
    :param time: total time the session should run for
    :type time: int
    :param bit_rate: bit_rate of data to be sent (in Mbps)
    :type bit_rate: float
    :return: object holding data on the IPerf stream.
    :rtype: IPerfStream
    """
    # Ensure there is no exisiting stream with same IP and port.
    self._iperf_session_check(multicast_group_addr, port)
    ipv6_flag = (
        "-V" if isinstance(ip_address(multicast_group_addr), IPv6Address) else ""
    )
    fname = f"mserver_{port}.txt"
    self._console.execute_command(
        f"iperf {ipv6_flag} -u -f m -c {multicast_group_addr} "
        f"-p {port} --ttl 5 "
        f"-t {time} -b {bit_rate}m > {fname} &",
    )
    pid = self._console.execute_command(
        f"pgrep iperf -a | grep {port} | awk '{{print$1}}'",
    )
    return IPerfStream(None, pid, multicast_group_addr, port, fname, time)

wait_for_multicast_stream_to_end

wait_for_multicast_stream_to_end(iperf_stream: IPerfStream) -> None

Wait for all multicast streams to end.

The Use Case will wait for a time equal to the stream with the highest wait time.

If a stream from the list does not exit within the max wait time, then throw an error.

.. hint:: To be used along with the Use Case start_iperf_multicast_stream

Parameters:

Name Type Description Default
iperf_stream
IPerfStream

iperf stream instance

required

Raises:

Type Description
MulticastError

when multicast stream doesn't exit within given time

Source code in boardfarm3/lib/multicast.py
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
def wait_for_multicast_stream_to_end(self, iperf_stream: IPerfStream) -> None:
    """Wait for all multicast streams to end.

    The Use Case will wait for a time equal to the stream with
    the highest wait time.

    If a stream from the list does not exit within the
    max wait time, then throw an error.

    .. hint:: To be used along with the Use Case start_iperf_multicast_stream

    :param iperf_stream: iperf stream instance
    :type iperf_stream: IPerfStream
    :raises MulticastError: when multicast stream doesn't exit within given time
    """
    is_stream_still_running = False
    for _ in range(2):
        if not self._console.execute_command(
            f"pgrep iperf -a | grep {iperf_stream.port}| grep"
            f" {iperf_stream.address}",
        ):
            break
        sleep(1)
    else:
        is_stream_still_running = True
        self._console.execute_command(f"kill -9 {iperf_stream.pid}")
    self._console.execute_command(f"rm {iperf_stream.output_file}")
    if is_stream_still_running:
        msg = (
            f"{iperf_stream.address}:{iperf_stream.port} did not exit "
            "within {iperf_stream.time}s"
        )
        raise MulticastError(
            msg,
        )

MulticastGroupRecordType

IGMPv3 Record Types.

network_utils

Network utilities module.

Classes:

Name Description
NetworkUtility

Network utilities.

NetworkUtility

NetworkUtility(console: BoardfarmPexpect)

Network utilities.

Initialize the network utility.

Parameters:

Name Type Description Default

console

BoardfarmPexpect

console instance which has network utilities

required

Methods:

Name Description
gen_uuid

Generate unique identifier.

netstat

Perform netstat with given options.

read_tcpdump

Read tcpdump packets and delete the pcap file afterwards.

scp

Copy file between this device and the remote host.

start_tcpdump

Start tcpdump capture on given interface.

stop_tcpdump

Stop tcpdump process with given process id.

tftp

Transfer file via tftp.

traceroute_host

Run traceroute to given host ip and return result.

Source code in boardfarm3/lib/network_utils.py
25
26
27
28
29
30
31
def __init__(self, console: BoardfarmPexpect) -> None:
    """Initialize the network utility.

    :param console: console instance which has network utilities
    :type console: BoardfarmPexpect
    """
    self._console = console

gen_uuid

gen_uuid() -> str

Generate unique identifier.

Returns:

Type Description
str

uuid

Source code in boardfarm3/lib/network_utils.py
181
182
183
184
185
186
187
def gen_uuid(self) -> str:
    """Generate unique identifier.

    :return: uuid
    :rtype: str
    """
    return self._console.execute_command("uuidgen")

netstat

netstat(opts: str = '', extra_opts: str = '') -> DataFrame

Perform netstat with given options.

Parameters:

Name Type Description Default
opts
str

command line options

''
extra_opts
str

extra command line options

''

Returns:

Type Description
DataFrame

parsed netstat output

Source code in boardfarm3/lib/network_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
def netstat(self, opts: str = "", extra_opts: str = "") -> DataFrame:
    """Perform netstat with given options.

    :param opts: command line options
    :type opts: str
    :param extra_opts: extra command line options
    :type extra_opts: str
    :return: parsed netstat output
    :rtype: DataFrame
    """
    return NetstatParser().parse_netstat_output(
        self._console.execute_command(f"netstat {opts} {extra_opts}")
    )

read_tcpdump

read_tcpdump(
    capture_file: str,
    protocol: str = "",
    opts: str = "",
    timeout: int = 30,
    rm_pcap: bool = True,
) -> str

Read tcpdump packets and delete the pcap file afterwards.

Parameters:

Name Type Description Default
capture_file
str

tcpdump pcap file path

required
protocol
str

protocol to the filter

''
opts
str

command line options to tcpdump

''
timeout
int

timeout for reading the tcpdump output

30
rm_pcap
bool

remove pcap file after read

True

Returns:

Type Description
str

tcpdump output

Source code in boardfarm3/lib/network_utils.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def read_tcpdump(
    self,
    capture_file: str,
    protocol: str = "",
    opts: str = "",
    timeout: int = 30,
    rm_pcap: bool = True,
) -> str:
    """Read tcpdump packets and delete the pcap file afterwards.

    :param capture_file: tcpdump pcap file path
    :type capture_file: str
    :param protocol: protocol to the filter
    :type protocol: str
    :param opts: command line options to tcpdump
    :type opts: str
    :param timeout: timeout for reading the tcpdump output
    :type timeout: int
    :param rm_pcap: remove pcap file after read
    :type rm_pcap: bool
    :return: tcpdump output
    :rtype: str
    """
    return tcpdump_read(
        self._console,
        capture_file,
        protocol=protocol,
        opts=opts,
        timeout=timeout,
        rm_pcap=rm_pcap,
    )

scp

scp(
    ip: str,
    port: int | str,
    user: str,
    pwd: str,
    source_path: str,
    dest_path: str,
    action: Literal["download", "upload"],
) -> None

Copy file between this device and the remote host.

Parameters:

Name Type Description Default
ip
str

ip address of the remote host

required
port
int | str

port number of the remote host

required
user
str

username of the host

required
pwd
str

password of the host

required
source_path
str

source file path

required
dest_path
str

destination path

required
action
Literal['download', 'upload']

scp action(download/upload)

required
Source code in boardfarm3/lib/network_utils.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def scp(  # noqa: PLR0913
    self,
    ip: str,
    port: int | str,
    user: str,
    pwd: str,
    source_path: str,
    dest_path: str,
    action: Literal["download", "upload"],
) -> None:
    """Copy file between this device and the remote host.

    :param ip: ip address of the remote host
    :type ip: str
    :param port: port number of the remote host
    :type port: int | str
    :param user: username of the host
    :type user: str
    :param pwd: password of the host
    :type pwd: str
    :param source_path: source file path
    :type source_path: str
    :param dest_path: destination path
    :type dest_path: str
    :param action: scp action(download/upload)
    :type action: Literal["download", "upload"]
    """
    scp(
        self._console,
        host=ip,
        port=port,
        username=user,
        password=pwd,
        src_path=source_path,
        dst_path=dest_path,
        action=action,
    )

start_tcpdump

start_tcpdump(fname: str, interface: str, filters: dict | None = None) -> str

Start tcpdump capture on given interface.

Parameters:

Name Type Description Default
fname
str

tcpdump output file name

required
interface
str

interface name to be captured

required
filters
dict | None

filters as key value pair(eg: {"-v": "", "-c": "4"}) default to None

None

Returns:

Type Description
str

return the process id of the tcpdump capture

Source code in boardfarm3/lib/network_utils.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def start_tcpdump(
    self, fname: str, interface: str, filters: dict | None = None
) -> str:
    """Start tcpdump capture on given interface.

    :param fname: tcpdump output file name
    :type fname: str
    :param interface: interface name to be captured
    :type interface: str
    :param filters: filters as key value pair(eg: {"-v": "", "-c": "4"})
                    default to None
    :type filters: dict | None
    :return: return the process id of the tcpdump capture
    :rtype: str
    """
    return start_tcpdump(self._console, interface, None, fname, filters)

stop_tcpdump

stop_tcpdump(pid: str) -> None

Stop tcpdump process with given process id.

Parameters:

Name Type Description Default
pid
str

tcpdump process id

required
Source code in boardfarm3/lib/network_utils.py
64
65
66
67
68
69
70
def stop_tcpdump(self, pid: str) -> None:
    """Stop tcpdump process with given process id.

    :param pid: tcpdump process id
    :type pid: str
    """
    stop_tcpdump(self._console, pid)

tftp

tftp(tftp_server_ip: str, source_file: str, dest_file: str, timeout: int = 60) -> None

Transfer file via tftp.

Parameters:

Name Type Description Default
tftp_server_ip

tftp server ip

required

Raises:

Type Description
FileNotFoundError

Unable to find the source file

Source code in boardfarm3/lib/network_utils.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def tftp(
    self, tftp_server_ip: str, source_file: str, dest_file: str, timeout: int = 60
) -> None:
    """Transfer file via tftp.

    :param tftp_server_ip : tftp server ip
    :type tftp_server_ip : str
    :param source_file : source file name on device
    :type source_file : str
    :param dest_file : dest_file file name on device
    :type dest_file : str
    :param timeout : timeout for the tftp
    :type timeout : int
    :raises FileNotFoundError: Unable to find the source file
    """
    tftp_cmd = f"tftp -pl {source_file} {tftp_server_ip} -r {dest_file}"
    output = self._console.execute_command(tftp_cmd, timeout=timeout)

    if "can't open" in output or "No such file or directory" in output:
        msg = f"Unable to perform tftp {output}"
        raise FileNotFoundError(msg)

traceroute_host

traceroute_host(host_ip: str, version: str = '', options: str = '') -> str

Run traceroute to given host ip and return result.

Parameters:

Name Type Description Default
host_ip
str

ip address of the host

required
version
str

ip address version

''
options
str

command line options to traceroute

''

Returns:

Type Description
str

output of traceroute

Source code in boardfarm3/lib/network_utils.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def traceroute_host(
    self, host_ip: str, version: str = "", options: str = ""
) -> str:
    """Run traceroute to given host ip and return result.

    :param host_ip: ip address of the host
    :type host_ip: str
    :param version: ip address version
    :type version: str
    :param options: command line options to traceroute
    :type options: str
    :return: output of traceroute
    :rtype: str
    """
    return traceroute_host(self._console, host_ip, version=version, options=options)

networking

Boardfarm networking module.

Classes:

Name Description
DNS

Holds DNS names and their addresses.

HTTPResult

Class to save the object of parsed HTTP response.

IptablesFirewall

Linux iptables firewall.

NSLookup

NSLookup command line utility.

Functions:

Name Description
dns_lookup

Perform dig command in the devices to resolve DNS.

http_get

Peform http get (via curl) and return parsed result.

is_link_up

Check given interface is up or not.

nmap

Run an nmap scan from source to destination device.

scp

SCP file.

start_tcpdump

Start tcpdump capture on given interface.

stop_tcpdump

Stop tcpdump capture.

tcpdump_read

Read the given tcpdump and delete the file afterwards.

traceroute_host

Traceroute given host ip and return the details.

DNS

DNS(
    console: _LinuxConsole,
    device_name: str,
    ipv4_address: str | None = None,
    ipv6_address: str | None = None,
    ipv4_aux_address: IPv4Address | None = None,
    ipv6_aux_address: IPv6Address | None = None,
    aux_url: str | None = None,
)

Holds DNS names and their addresses.

Initialize DNS.

Parameters:

Name Type Description Default

console

_LinuxConsole

console or device instance

required

device_name

str

device name

required

ipv4_address

str | None

ipv4 address of the device

None

ipv6_address

str | None

ipv6 address of the device

None

ipv4_aux_address

IPv4Address | None

ipv4 aux address

None

ipv6_aux_address

IPv6Address | None

ipv6 aux address

None

aux_url

str | None

aux url

None

Methods:

Name Description
configure_hosts

Create the given number of reachable and unreachable ACS domain IP's.

Source code in boardfarm3/lib/networking.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
def __init__(  # noqa: PLR0913
    self,
    console: _LinuxConsole,
    device_name: str,
    ipv4_address: str | None = None,
    ipv6_address: str | None = None,
    ipv4_aux_address: IPv4Address | None = None,
    ipv6_aux_address: IPv6Address | None = None,
    aux_url: str | None = None,
) -> None:
    """Initialize DNS.

    :param console: console or device instance
    :type console: _LinuxConsole
    :param device_name: device name
    :type device_name: str
    :param ipv4_address: ipv4 address of the device
    :type ipv4_address: str
    :param ipv6_address: ipv6 address of the device
    :type ipv6_address: str
    :param ipv4_aux_address: ipv4 aux address
    :type ipv4_aux_address: IPv4Address
    :param ipv6_aux_address: ipv6 aux address
    :type ipv6_aux_address: IPv6Address
    :param aux_url: aux url
    :type aux_url: str
    """
    self.auxv4 = ipv4_aux_address
    self.auxv6 = ipv6_aux_address
    self.dnsv4: defaultdict = defaultdict(list)
    self.dnsv6: defaultdict = defaultdict(list)
    self.hosts_v4: defaultdict = defaultdict(list)
    self.hosts_v6: defaultdict = defaultdict(list)
    self.fqdn = f"{device_name}.boardfarm.com"
    self._add_dns_addresses(ipv4_address, ipv6_address)
    self._add_aux_dns_addresses(aux_url)
    self.hosts_v4.update(self.dnsv4)
    self.hosts_v6.update(self.dnsv6)
    self.nslookup = NSLookup(console)

configure_hosts

configure_hosts(
    reachable_ipv4: int,
    unreachable_ipv4: int,
    reachable_ipv6: int,
    unreachable_ipv6: int,
) -> None

Create the given number of reachable and unreachable ACS domain IP's.

Parameters:

Name Type Description Default
reachable_ipv4
int

no.of reachable IPv4 address for acs url

required
unreachable_ipv4
int

no.of unreachable IPv4 address for acs url

required
reachable_ipv6
int

no.of reachable IPv6 address for acs url

required
unreachable_ipv6
int

no.of unreachable IPv6 address for acs url

required
Source code in boardfarm3/lib/networking.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
def configure_hosts(
    self,
    reachable_ipv4: int,
    unreachable_ipv4: int,
    reachable_ipv6: int,
    unreachable_ipv6: int,
) -> None:
    """Create the given number of reachable and unreachable ACS domain IP's.

    :param reachable_ipv4: no.of reachable IPv4 address for acs url
    :type reachable_ipv4: int
    :param unreachable_ipv4: no.of unreachable IPv4 address for acs url
    :type unreachable_ipv4: int
    :param reachable_ipv6: no.of reachable IPv6 address for acs url
    :type reachable_ipv6: int
    :param unreachable_ipv6: no.of unreachable IPv6 address for acs url
    :type unreachable_ipv6: int
    """
    val_v4 = self.hosts_v4[self.fqdn][:reachable_ipv4]
    val_v6 = self.hosts_v6[self.fqdn][:reachable_ipv6]
    self.hosts_v4[self.fqdn] = val_v4
    self.hosts_v6[self.fqdn] = val_v6
    for val in range(unreachable_ipv4):
        ipv4 = self.auxv4 + (val + 1)
        self.hosts_v4[self.fqdn].append(str(ipv4))
    for val in range(unreachable_ipv6):
        ipv6 = self.auxv6 + (val + 1)
        self.hosts_v6[self.fqdn].append(str(ipv6))

HTTPResult

HTTPResult(response: str)

Class to save the object of parsed HTTP response.

Parse the response and save it as an instance.

Parameters:

Name Type Description Default

response

str

response from HTTP request

required
Source code in boardfarm3/lib/networking.py
565
566
567
568
569
570
571
572
def __init__(self, response: str) -> None:
    """Parse the response and save it as an instance.

    :param response: response from HTTP request
    :type response: str
    """
    self.response = response
    self.raw, self.code, self.beautified_text = self._parse_response(response)

IptablesFirewall

IptablesFirewall(console: _LinuxConsole)

Linux iptables firewall.

Initialize IptablesFirewall.

Parameters:

Name Type Description Default

console

_LinuxConsole

linux console or device instance

required

Methods:

Name Description
add_drop_rule_ip6tables

Add drop rule to ip6tables.

add_drop_rule_iptables

Add drop rule to iptables.

del_drop_rule_ip6tables

Delete drop rule from ip6tables.

del_drop_rule_iptables

Delete drop rule from iptables.

get_ip6tables_list

Return ip6tables rules as dictionary.

get_ip6tables_policy

Return ip6tables policies as dictionary.

get_iptables_list

Return iptables rules as dictionary.

get_iptables_policy

Return iptables policies as dictionary.

is_ip6table_empty

Return True if ip6tables is empty.

is_iptable_empty

Return True if iptables is empty.

Source code in boardfarm3/lib/networking.py
240
241
242
243
244
245
246
def __init__(self, console: _LinuxConsole) -> None:
    """Initialize IptablesFirewall.

    :param console: linux console or device instance
    :type console: _LinuxConsole
    """
    self._console = console

add_drop_rule_ip6tables

add_drop_rule_ip6tables(option: str, valid_ip: str) -> None

Add drop rule to ip6tables.

Parameters:

Name Type Description Default
option
str

ip6tables command line options

required
valid_ip
str

ip to be blocked from device

required

Raises:

Type Description
ValueError

on given ip6tables rule can't be added

Source code in boardfarm3/lib/networking.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def add_drop_rule_ip6tables(self, option: str, valid_ip: str) -> None:
    """Add drop rule to ip6tables.

    :param option: ip6tables command line options
    :type option: str
    :param valid_ip: ip to be blocked from device
    :type valid_ip: str
    :raises ValueError: on given ip6tables rule can't be added
    """
    ip6tables_output = self._console.execute_command(
        f"ip6tables -C INPUT {option} {valid_ip} -j DROP",
    )
    if "Bad rule" in ip6tables_output:
        self._console.execute_command(
            f"ip6tables -I INPUT 1 {option} {valid_ip} -j DROP",
        )
    if re.search(rf"host\/network.*{valid_ip}.*not found", ip6tables_output):
        msg = (
            "Firewall rule cannot be added as the ip address: "
            f"{valid_ip} could not be found"
        )
        raise ValueError(
            msg,
        )

add_drop_rule_iptables

add_drop_rule_iptables(option: str, valid_ip: str) -> None

Add drop rule to iptables.

Parameters:

Name Type Description Default
option
str

iptables command line options

required
valid_ip
str

ip to be blocked from device

required

Raises:

Type Description
ValueError

on given iptables rule can't be added

Source code in boardfarm3/lib/networking.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def add_drop_rule_iptables(self, option: str, valid_ip: str) -> None:
    """Add drop rule to iptables.

    :param option: iptables command line options
    :type option: str
    :param valid_ip: ip to be blocked from device
    :type valid_ip: str
    :raises ValueError: on given iptables rule can't be added
    """
    iptables_output = self._console.execute_command(
        f"iptables -C INPUT {option} {valid_ip} -j DROP",
    )
    if "Bad rule" in iptables_output:
        self._console.execute_command(
            f"iptables -I INPUT 1 {option} {valid_ip} -j DROP",
        )
    if re.search(rf"host\/network.*{valid_ip}.*not found", iptables_output):
        msg = (
            "Firewall rule cannot be added as the ip address: "
            f"{valid_ip} could not be found"
        )
        raise ValueError(
            msg,
        )

del_drop_rule_ip6tables

del_drop_rule_ip6tables(option: str, valid_ip: str) -> None

Delete drop rule from ip6tables.

Parameters:

Name Type Description Default
option
str

ip6tables command line options

required
valid_ip
str

ip to be unblocked

required
Source code in boardfarm3/lib/networking.py
404
405
406
407
408
409
410
411
412
def del_drop_rule_ip6tables(self, option: str, valid_ip: str) -> None:
    """Delete drop rule from ip6tables.

    :param option: ip6tables command line options
    :type option: str
    :param valid_ip: ip to be unblocked
    :type valid_ip: str
    """
    self._console.execute_command(f"ip6tables -D INPUT {option} {valid_ip} -j DROP")

del_drop_rule_iptables

del_drop_rule_iptables(option: str, valid_ip: str) -> None

Delete drop rule from iptables.

Parameters:

Name Type Description Default
option
str

iptables command line options

required
valid_ip
str

ip to be unblocked

required
Source code in boardfarm3/lib/networking.py
394
395
396
397
398
399
400
401
402
def del_drop_rule_iptables(self, option: str, valid_ip: str) -> None:
    """Delete drop rule from iptables.

    :param option: iptables command line options
    :type option: str
    :param valid_ip: ip to be unblocked
    :type valid_ip: str
    """
    self._console.execute_command(f"iptables -D INPUT {option} {valid_ip} -j DROP")

get_ip6tables_list

get_ip6tables_list(opts: str = '', extra_opts: str = '') -> dict[str, list[dict]]

Return ip6tables rules as dictionary.

Parameters:

Name Type Description Default
opts
str

command line arguments for ip6tables command

''
extra_opts
str

extra command line arguments for ip6tables command

''

Returns:

Type Description
Dict[str, List[Dict]]

ip6tables rules dictionary

Source code in boardfarm3/lib/networking.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def get_ip6tables_list(
    self,
    opts: str = "",
    extra_opts: str = "",
) -> dict[str, list[dict]]:
    """Return ip6tables rules as dictionary.

    :param opts: command line arguments for ip6tables command
    :type opts: str
    :param extra_opts: extra command line arguments for ip6tables command
    :type extra_opts: str
    :return: ip6tables rules dictionary
    :rtype: Dict[str, List[Dict]]
    """
    return IptablesParser().ip6tables(
        self._console.execute_command(f"ip6tables {opts} {extra_opts}"),
    )

get_ip6tables_policy

get_ip6tables_policy(
    opts: str = "", extra_opts: str = "-nvL --line-number"
) -> dict[str, str]

Return ip6tables policies as dictionary.

Parameters:

Name Type Description Default
opts
str

command line arguments for iptables command

''
extra_opts
str

options for iptables command, defaults to -nvL --line-number

'-nvL --line-number'

Returns:

Type Description
dict[str, str]

iptables policies dictionary

Source code in boardfarm3/lib/networking.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def get_ip6tables_policy(
    self,
    opts: str = "",
    extra_opts: str = "-nvL --line-number",
) -> dict[str, str]:
    """Return ip6tables policies as dictionary.

    :param opts: command line arguments for iptables command
    :type opts: str
    :param extra_opts: options for iptables command, defaults to -nvL --line-number
    :type extra_opts: str
    :return: iptables policies dictionary
    :rtype: dict[str, str]
    """
    return IptablesParser().iptables_policy(
        self._console.execute_command(f"ip6tables {opts} {extra_opts}"),
    )

get_iptables_list

get_iptables_list(opts: str = '', extra_opts: str = '') -> dict[str, list[dict]]

Return iptables rules as dictionary.

Parameters:

Name Type Description Default
opts
str

command line arguments for iptables command

''
extra_opts
str

extra command line arguments for iptables command

''

Returns:

Type Description
Dict[str, List[Dict]]

iptables rules dictionary

Source code in boardfarm3/lib/networking.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def get_iptables_list(
    self,
    opts: str = "",
    extra_opts: str = "",
) -> dict[str, list[dict]]:
    """Return iptables rules as dictionary.

    :param opts: command line arguments for iptables command
    :type opts: str
    :param extra_opts: extra command line arguments for iptables command
    :type extra_opts: str
    :return: iptables rules dictionary
    :rtype: Dict[str, List[Dict]]
    """
    return IptablesParser().iptables(
        self._console.execute_command(f"iptables {opts} {extra_opts}"),
    )

get_iptables_policy

get_iptables_policy(
    opts: str = "", extra_opts: str = "-nvL --line-number"
) -> dict[str, str]

Return iptables policies as dictionary.

Parameters:

Name Type Description Default
opts
str

command line arguments for iptables command

''
extra_opts
str

options for iptables command, defaults to -nvL --line-number

'-nvL --line-number'

Returns:

Type Description
dict[str, str]

iptables policies dictionary

Source code in boardfarm3/lib/networking.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def get_iptables_policy(
    self,
    opts: str = "",
    extra_opts: str = "-nvL --line-number",
) -> dict[str, str]:
    """Return iptables policies as dictionary.

    :param opts: command line arguments for iptables command
    :type opts: str
    :param extra_opts: options for iptables command, defaults to -nvL --line-number
    :type extra_opts: str
    :return: iptables policies dictionary
    :rtype: dict[str, str]
    """
    return IptablesParser().iptables_policy(
        self._console.execute_command(f"iptables {opts} {extra_opts}"),
    )

is_ip6table_empty

is_ip6table_empty(opts: str = '', extra_opts: str = '') -> bool

Return True if ip6tables is empty.

Parameters:

Name Type Description Default
opts
str

command line arguments for ip6tables command

''
extra_opts
str

extra command line arguments for ip6tables command

''

Returns:

Type Description
bool

True if ip6tables is empty, False otherwise

Source code in boardfarm3/lib/networking.py
332
333
334
335
336
337
338
339
340
341
342
def is_ip6table_empty(self, opts: str = "", extra_opts: str = "") -> bool:
    """Return True if ip6tables is empty.

    :param opts: command line arguments for ip6tables command
    :type opts: str
    :param extra_opts: extra command line arguments for ip6tables command
    :type extra_opts: str
    :return: True if ip6tables is empty, False otherwise
    :rtype: bool
    """
    return not any(self.get_ip6tables_list(opts, extra_opts).values())

is_iptable_empty

is_iptable_empty(opts: str = '', extra_opts: str = '') -> bool

Return True if iptables is empty.

Parameters:

Name Type Description Default
opts
str

command line arguments for iptables command

''
extra_opts
str

extra command line arguments for iptables command

''

Returns:

Type Description
bool

True if iptables is empty, False otherwise

Source code in boardfarm3/lib/networking.py
266
267
268
269
270
271
272
273
274
275
276
def is_iptable_empty(self, opts: str = "", extra_opts: str = "") -> bool:
    """Return True if iptables is empty.

    :param opts: command line arguments for iptables command
    :type opts: str
    :param extra_opts: extra command line arguments for iptables command
    :type extra_opts: str
    :return: True if iptables is empty, False otherwise
    :rtype: bool
    """
    return not any(self.get_iptables_list(opts, extra_opts).values())

NSLookup

NSLookup(console: _LinuxConsole)

NSLookup command line utility.

Initialize NSLookup.

Parameters:

Name Type Description Default

console

_LinuxConsole

console or device instance

required

Methods:

Name Description
__call__

Run nslookup with given arguments and return the parsed results.

nslookup

Run nslookup with given arguments and return the parsed results.

Source code in boardfarm3/lib/networking.py
418
419
420
421
422
423
424
def __init__(self, console: _LinuxConsole) -> None:
    """Initialize NSLookup.

    :param console: console or device instance
    :type console: _LinuxConsole
    """
    self._hw = console

__call__

__call__(domain_name: str, opts: str = '', extra_opts: str = '') -> dict[str, Any]

Run nslookup with given arguments and return the parsed results.

Parameters:

Name Type Description Default
domain_name
str

domain name to perform nslookup on

required
opts
str

nslookup command line options

''
extra_opts
str

nslookup additional command line options

''

Returns:

Type Description
Dict[str, Any]

parsed nslookup results as dictionary

Source code in boardfarm3/lib/networking.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def __call__(
    self,
    domain_name: str,
    opts: str = "",
    extra_opts: str = "",
) -> dict[str, Any]:
    """Run nslookup with given arguments and return the parsed results.

    :param domain_name: domain name to perform nslookup on
    :type domain_name: str
    :param opts: nslookup command line options
    :type opts: str
    :param extra_opts: nslookup additional command line options
    :type extra_opts: str
    :return: parsed nslookup results as dictionary
    :rtype: Dict[str, Any]
    """
    return self.nslookup(domain_name, opts, extra_opts)

nslookup

nslookup(domain_name: str, opts: str = '', extra_opts: str = '') -> dict[str, Any]

Run nslookup with given arguments and return the parsed results.

Parameters:

Name Type Description Default
domain_name
str

domain name to perform nslookup on

required
opts
str

nslookup command line options

''
extra_opts
str

nslookup additional command line options

''

Returns:

Type Description
Dict[str, Any]

parsed nslookup results as dictionary

Source code in boardfarm3/lib/networking.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def nslookup(
    self,
    domain_name: str,
    opts: str = "",
    extra_opts: str = "",
) -> dict[str, Any]:
    """Run nslookup with given arguments and return the parsed results.

    :param domain_name: domain name to perform nslookup on
    :type domain_name: str
    :param opts: nslookup command line options
    :type opts: str
    :param extra_opts: nslookup additional command line options
    :type extra_opts: str
    :return: parsed nslookup results as dictionary
    :rtype: Dict[str, Any]
    """
    return NslookupParser().parse_nslookup_output(
        self._hw.execute_command(f"nslookup {opts} {domain_name} {extra_opts}"),
    )

dns_lookup

dns_lookup(
    console: _LinuxConsole, domain_name: str, record_type: str, opts: str = ""
) -> list[dict[str, Any]]

Perform dig command in the devices to resolve DNS.

Parameters:

Name Type Description Default

console

_LinuxConsole

console or device instance

required

domain_name

str

domain name which needs lookup

required

record_type

str

AAAA for ipv6 else A

required

opts

str

options to be provided to dig command, defaults to ""

''

Returns:

Type Description
List[Dict[str, Any]]

parsed dig command ouput

Source code in boardfarm3/lib/networking.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
def dns_lookup(
    console: _LinuxConsole, domain_name: str, record_type: str, opts: str = ""
) -> list[dict[str, Any]]:
    """Perform ``dig`` command in the devices to resolve DNS.

    :param console: console or device instance
    :type console: _LinuxConsole
    :param domain_name: domain name which needs lookup
    :type domain_name: str
    :param record_type: AAAA for ipv6 else A
    :type record_type: str
    :param opts: options to be provided to dig command, defaults to ""
    :type opts: str
    :return: parsed dig command ouput
    :rtype: List[Dict[str, Any]]
    """
    return dig.parse(
        console.execute_command(f"dig {opts} {record_type} {domain_name}").split(
            ";", 1
        )[-1]
    )

http_get

http_get(
    console: _LinuxConsole, url: str, timeout: int = 20, options: str = ""
) -> HTTPResult

Peform http get (via curl) and return parsed result.

Parameters:

Name Type Description Default

console

_LinuxConsole

console or device instance

required

url

str

url to get the response

required

timeout

int

connection timeout for the curl command in seconds

20

options

str

additional curl command line options, defaults to ""

''

Returns:

Type Description
HTTPResult

parsed http response

Source code in boardfarm3/lib/networking.py
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def http_get(
    console: _LinuxConsole, url: str, timeout: int = 20, options: str = ""
) -> HTTPResult:
    """Peform http get (via curl) and return parsed result.

    :param console: console or device instance
    :type console: _LinuxConsole
    :param url: url to get the response
    :type url: str
    :param timeout: connection timeout for the curl command in seconds
    :type timeout: int
    :param options: additional curl command line options, defaults to ""
    :type options: str
    :return: parsed http response
    :rtype: HTTPResult
    """
    return HTTPResult(
        console.execute_command(f"curl -v {options} --connect-timeout {timeout} {url}"),
    )
is_link_up(
    console: _LinuxConsole, interface: str, pattern: str = "BROADCAST,MULTICAST,UP"
) -> bool

Check given interface is up or not.

Parameters:

Name Type Description Default
_LinuxConsole

console or device instance

required
str

interface name, defaults to "BROADCAST,MULTICAST,UP"

required
str

interface state

'BROADCAST,MULTICAST,UP'

Returns:

Type Description
bool

True if the link is up

Source code in boardfarm3/lib/networking.py
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
def is_link_up(
    console: _LinuxConsole,
    interface: str,
    pattern: str = "BROADCAST,MULTICAST,UP",
) -> bool:
    """Check given interface is up or not.

    :param console: console or device instance
    :type console: _LinuxConsole
    :param interface: interface name, defaults to "BROADCAST,MULTICAST,UP"
    :type interface: str
    :param pattern: interface state
    :type pattern: str
    :return: True if the link is up
    :rtype: bool
    """
    return pattern in console.execute_command(f"ip link show {interface}")

nmap

nmap(
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int | None = None,
    protocol: str | None = None,
    max_retries: int | None = None,
    min_rate: int | None = None,
    opts: str | None = None,
    timeout: int = 30,
) -> dict[str, str]

Run an nmap scan from source to destination device.

Parameters:

Name Type Description Default

source_device

LAN | WLAN | WAN

device initiating the scan

required

destination_device

LAN | WLAN | WAN | CPE

device to be scanned

required

ip_type

str

IP version to use in scan, must be "ipv4" or "ipv6"

required

port

str | int | None

port or port range to scan, optional

None

protocol

str | None

protocol to scan (e.g., tcp, udp), optional

None

max_retries

int | None

maximum number of retransmissions, optional

None

min_rate

int | None

minimum number of packets per second to send, optional

None

opts

str | None

additional nmap command-line options, optional

None

timeout

int

timeout value for the scan (in seconds), defaults to 30

30

Returns:

Type Description
dict[str, str]

parsed nmap scan result as a dictionary

Source code in boardfarm3/lib/networking.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
def nmap(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    source_device: LAN | WLAN | WAN,
    destination_device: LAN | WLAN | WAN | CPE,
    ip_type: str,
    port: str | int | None = None,
    protocol: str | None = None,
    max_retries: int | None = None,
    min_rate: int | None = None,
    opts: str | None = None,
    timeout: int = 30,
) -> dict[str, str]:
    """Run an nmap scan from source to destination device.

    :param source_device: device initiating the scan
    :type source_device: LAN | WLAN | WAN
    :param destination_device: device to be scanned
    :type destination_device: LAN | WLAN | WAN | CPE
    :param ip_type: IP version to use in scan, must be "ipv4" or "ipv6"
    :type ip_type: str
    :param port: port or port range to scan, optional
    :type port: str | int | None
    :param protocol: protocol to scan (e.g., tcp, udp), optional
    :type protocol: str | None
    :param max_retries: maximum number of retransmissions, optional
    :type max_retries: int | None
    :param min_rate: minimum number of packets per second to send, optional
    :type min_rate: int | None
    :param opts: additional nmap command-line options, optional
    :type opts: str | None
    :param timeout: timeout value for the scan (in seconds), defaults to 30
    :type timeout: int
    :return: parsed nmap scan result as a dictionary
    :rtype: dict[str, str]
    """
    iface: str = (
        destination_device.sw.erouter_iface
        if isinstance(destination_device, CPE)
        else destination_device.iface_dut
    )
    dest_device: LAN | WLAN | WAN | CPESW = (
        destination_device.sw
        if isinstance(destination_device, CPE)
        else destination_device
    )
    ipaddr: str = (
        dest_device.get_interface_ipv4addr(iface)
        if ip_type == "ipv4"
        else f"-6 {dest_device.get_interface_ipv6addr(iface)}"
    )
    return source_device.nmap(
        ipaddr, ip_type, port, protocol, max_retries, min_rate, opts, timeout=timeout
    )

scp

scp(
    console: _LinuxConsole,
    host: str,
    port: int | str,
    username: str,
    password: str,
    src_path: str,
    dst_path: str,
    action: Literal["download", "upload"] = "download",
    timeout: int = 30,
) -> None

SCP file.

Parameters:

Name Type Description Default

console

_LinuxConsole

linux device or console instance

required

host

str

remote ssh host ip address

required

port

int | str

remove ssh host port number

required

username

str

ssh username

required

password

str

ssh password

required

src_path

str

source file path

required

dst_path

str

destination path

required

action

Literal['download', 'upload']

scp action(download/upload), defaults to "download"

'download'

timeout

int

scp timeout in seconds, defaults to 30

30

Raises:

Type Description
SCPConnectionError

on failed to scp file

Source code in boardfarm3/lib/networking.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def scp(  # pylint: disable=too-many-arguments  # noqa: PLR0913
    console: _LinuxConsole,
    host: str,
    port: int | str,
    username: str,
    password: str,
    src_path: str,
    dst_path: str,
    action: Literal["download", "upload"] = "download",
    timeout: int = 30,
) -> None:
    """SCP file.

    :param console: linux device or console instance
    :type console: _LinuxConsole
    :param host: remote ssh host ip address
    :type host: str
    :param port: remove ssh host port number
    :type port: Union[int, str]
    :param username: ssh username
    :type username: str
    :param password: ssh password
    :type password: str
    :param src_path: source file path
    :type src_path: str
    :param dst_path: destination path
    :type dst_path: str
    :param action: scp action(download/upload), defaults to "download"
    :type action: Literal["download", "upload"], optional
    :param timeout: scp timeout in seconds, defaults to 30
    :type timeout: int
    :raises SCPConnectionError: on failed to scp file
    """
    host = host if isinstance(ip_address(host), IPv4Address) else f"[{host}]"
    if action == "download":
        command = (
            "scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
            f" -P {port} {username}@{host}:{src_path} {dst_path}"
        )
    else:
        command = (
            "scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
            f" -P {port} {src_path} {username}@{host}:{dst_path}"
        )
    console.sendline(command)
    if console.expect([pexpect.TIMEOUT, "continue connecting?"], timeout=10):
        console.sendline("y")
    if console.expect([pexpect.TIMEOUT, "assword:"], timeout=10):
        console.sendline(password)
    if console.expect_exact(["100%", pexpect.TIMEOUT], timeout=timeout):
        msg = f"Failed to scp from {src_path} to {dst_path}"
        raise SCPConnectionError(msg)

start_tcpdump

start_tcpdump(
    console: _LinuxConsole,
    interface: str,
    port: str | None,
    output_file: str = "pkt_capture.pcap",
    filters: dict | None = None,
    additional_filters: str | None = "",
) -> str

Start tcpdump capture on given interface.

Parameters:

Name Type Description Default

console

_LinuxConsole

console or device instance

required

interface

str

inteface name where packets to be captured

required

port

str | None

port number, can be a range of ports(eg: 443 or 433-443)

required

output_file

str

pcap file name, Defaults: pkt_capture.pcap

'pkt_capture.pcap'

filters

dict | None

filters as key value pair(eg: {"-v": "", "-c": "4"})

None

additional_filters

str | None

additional filters

''

Returns:

Type Description
str

console ouput and tcpdump process id

Raises:

Type Description
ValueError

on failed to start tcpdump

Source code in boardfarm3/lib/networking.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def start_tcpdump(  # noqa: PLR0913
    console: _LinuxConsole,
    interface: str,
    port: str | None,
    output_file: str = "pkt_capture.pcap",
    filters: dict | None = None,
    additional_filters: str | None = "",
) -> str:
    """Start tcpdump capture on given interface.

    :param console: console or device instance
    :type console: _LinuxConsole
    :param interface: inteface name where packets to be captured
    :type interface: str
    :param port: port number, can be a range of ports(eg: 443 or 433-443)
    :type port: str
    :param output_file: pcap file name, Defaults: pkt_capture.pcap
    :type output_file: str
    :param filters: filters as key value pair(eg: {"-v": "", "-c": "4"})
    :type filters: Optional[Dict]
    :param additional_filters: additional filters
    :type additional_filters: Optional[str]
    :raises ValueError: on failed to start tcpdump
    :return: console ouput and tcpdump process id
    :rtype: str
    """
    command = f"tcpdump -U -i {interface} -n -w {output_file} "
    filter_str = (
        " ".join([" ".join(i) for i in filters.items()]) if filters is not None else ""
    )
    filter_str += additional_filters
    if port:
        output = console.execute_command(f"{command} 'portrange {port}' {filter_str} &")
    else:
        output = console.execute_command(f"{command} {filter_str} &")
    if console.expect_exact([f"tcpdump: listening on {interface}", pexpect.TIMEOUT]):
        msg = f"Failed to start tcpdump on {interface}"
        raise ValueError(msg)
    return re.search(r"(\[\d+\]\s(\d+))", output)[2]

stop_tcpdump

stop_tcpdump(console: _LinuxConsole, process_id: str) -> None

Stop tcpdump capture.

Parameters:

Name Type Description Default

console

_LinuxConsole

linux console or device instance

required

process_id

str

tcpdump process id

required

Raises:

Type Description
ValueError

on failed to stop tcpdump process

Source code in boardfarm3/lib/networking.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def stop_tcpdump(console: _LinuxConsole, process_id: str) -> None:
    """Stop tcpdump capture.

    :param console: linux console or device instance
    :type console: _LinuxConsole
    :param process_id: tcpdump process id
    :type process_id: str
    :raises ValueError: on failed to stop tcpdump process
    """
    output = console.execute_command(f"kill {process_id}")
    if "packets captured" not in output:
        idx = console.expect_exact(["captured", pexpect.TIMEOUT])
        if idx:
            msg = f"Failed to stop tcpdump process with PID {process_id}"
            raise ValueError(msg)
    console.execute_command("sync")

tcpdump_read

tcpdump_read(
    console: _LinuxConsole,
    capture_file: str,
    protocol: str = "",
    opts: str = "",
    timeout: int = 30,
    rm_pcap: bool = True,
) -> str

Read the given tcpdump and delete the file afterwards.

Parameters:

Name Type Description Default

console

_LinuxConsole

linux device or console instance

required

capture_file

str

pcap file path

required

protocol

str

protocol to the filter

''

opts

str

command line options for reading pcap

''

timeout

int

timeout in seconds for reading pcap

30

rm_pcap

bool

romove pcap file afterwards

True

Returns:

Type Description
str

tcpdump output

Source code in boardfarm3/lib/networking.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def tcpdump_read(  # noqa: PLR0913
    console: _LinuxConsole,
    capture_file: str,
    protocol: str = "",
    opts: str = "",
    timeout: int = 30,
    rm_pcap: bool = True,
) -> str:
    """Read the given tcpdump and delete the file afterwards.

    :param console: linux device or console instance
    :type console: _LinuxConsole
    :param capture_file: pcap file path
    :type capture_file: str
    :param protocol: protocol to the filter
    :type protocol: str
    :param opts: command line options for reading pcap
    :type opts: str
    :param timeout: timeout in seconds for reading pcap
    :type timeout: int
    :param rm_pcap: romove pcap file afterwards
    :type rm_pcap: bool
    :return: tcpdump output
    :rtype: str
    """
    if opts:
        protocol = f"{protocol} and {opts}"
    tcpdump_output = console.execute_command(
        f"tcpdump -n -r {capture_file} {protocol}",
        timeout=timeout,
    )
    if rm_pcap:
        console.execute_command(f"rm {capture_file}")
    return tcpdump_output

traceroute_host

traceroute_host(
    console: _LinuxConsole, host_ip: str, version: str = "", options: str = ""
) -> str

Traceroute given host ip and return the details.

Parameters:

Name Type Description Default

console

_LinuxConsole

linux device or console instance

required

host_ip

str

host ip address

required

version

str

ip version

''

options

str

additional command line options

''

Returns:

Type Description
str

traceroute command output

Source code in boardfarm3/lib/networking.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def traceroute_host(
    console: _LinuxConsole,
    host_ip: str,
    version: str = "",
    options: str = "",
) -> str:
    """Traceroute given host ip and return the details.

    :param console: linux device or console instance
    :type console: _LinuxConsole
    :param host_ip: host ip address
    :type host_ip: str
    :param version: ip version
    :type version: str
    :param options: additional command line options
    :type options: str
    :return: traceroute command output
    :rtype: str
    """
    return console.execute_command(
        f"traceroute{version} {options} {host_ip}",
        timeout=90,
    )

odh

Boardfarm ODH client Package.

Modules:

Name Description
kafka_client

Kafka client implementation.

kafka_client

Kafka client implementation.

Classes:

Name Description
KafkaClient

Kafka client implementation.

KafkaClient

KafkaClient(bootstrap_server: str, topic_name: str)

Kafka client implementation.

Initialize the kafka client.

Parameters:

Name Type Description Default
bootstrap_server
str

Kafka server url

required
topic_name
str

the topic to subscribe to

required

Methods:

Name Description
close_connection_to_kafka

Gracefully close the connection to the kafka server.

consume_logs

Consume logs from the kafka queue for a particular duration.

consume_sw_update_logs

Consume logs from the kafka queue for given time using offsets.

read_kafka_messages

Read a given number of messages from the kafka queue.

subscribe_to_topic

Subscribe to a list topics.

Source code in boardfarm3/lib/odh/kafka_client.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self, bootstrap_server: str, topic_name: str) -> None:
    """Initialize the kafka client.

    :param bootstrap_server: Kafka server url
    :type bootstrap_server: str
    :param topic_name: the topic to subscribe to
    :type topic_name: str
    """
    self.topic = topic_name
    self._disable_log_messages_from_libraries()
    self._consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=bootstrap_server,
        auto_offset_reset="latest",
        enable_auto_commit=True,
        value_deserializer=self._deserialize_json,
    )
close_connection_to_kafka
close_connection_to_kafka() -> None

Gracefully close the connection to the kafka server.

Source code in boardfarm3/lib/odh/kafka_client.py
132
133
134
def close_connection_to_kafka(self) -> None:
    """Gracefully close the connection to the kafka server."""
    self._consumer.close()
consume_logs
consume_logs(time_period: int) -> list[Any]

Consume logs from the kafka queue for a particular duration.

Parameters:

Name Type Description Default
time_period
int

time (in s) for which to consume logs from the queue.

required

Returns:

Type Description
list[Any]

kafka logs

Source code in boardfarm3/lib/odh/kafka_client.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def consume_logs(self, time_period: int) -> list[Any]:
    """Consume logs from the kafka queue for a particular duration.

    :param time_period: time (in s) for which to consume logs from the queue.
    :type time_period: int
    :return: kafka logs
    :rtype: list[Any]
    """
    logs = []
    start_time = monotonic()
    while monotonic() < time_period + start_time:
        for _, log in self._consumer.poll(timeout_ms=1000).items():  # noqa: PERF102
            logs.extend(log)
    return logs
consume_sw_update_logs
consume_sw_update_logs(start_time: float, end_time: float) -> list[Any]

Consume logs from the kafka queue for given time using offsets.

Parameters:

Name Type Description Default
start_time
float

start time to fetch logs

required
end_time
float

end time to fetch logs

required

Returns:

Type Description
list[Any]

kafka logs

Raises:

Type Description
OffsetOutOfRangeError

if offset does not exist

Source code in boardfarm3/lib/odh/kafka_client.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def consume_sw_update_logs(self, start_time: float, end_time: float) -> list[Any]:
    """Consume logs from the kafka queue for given time using offsets.

    :param start_time: start time to fetch logs
    :type start_time: float
    :param end_time: end time to fetch logs
    :type end_time: float
    :return: kafka logs
    :rtype: list[Any]
    :raises OffsetOutOfRangeError: if offset does not exist
    """
    logs = []
    time_duration = (end_time - start_time) + (
        60 * 1000  # a minute of extra buffer time
    )
    partitions = self._consumer.partitions_for_topic(topic=self.topic)
    for partition in partitions:
        topic_partition = TopicPartition(topic=self.topic, partition=partition)
        start_offset = self._consumer.offsets_for_times(
            {topic_partition: start_time}
        )
        self._consumer.seek(topic_partition, start_offset[topic_partition].offset)
        for _ in range(5):
            log_data = self._consumer.poll(timeout_ms=time_duration)
            if start_offset[topic_partition].offset in [
                record.offset for _, val in log_data.items() for record in val
            ]:
                break
        else:
            msg = "Given offset does not exist in records"
            raise OffsetOutOfRangeError(msg)
        for log in log_data.values():
            logs.extend(log)
    return logs
read_kafka_messages
read_kafka_messages(num_messages: int) -> list[Any]

Read a given number of messages from the kafka queue.

Parameters:

Name Type Description Default
num_messages
int

The number of messages to be read from the kafka queue

required

Returns:

Type Description
list[Any]

list of telemetry logs

Source code in boardfarm3/lib/odh/kafka_client.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def read_kafka_messages(self, num_messages: int) -> list[Any]:
    """Read a given number of messages from the kafka queue.

    :param num_messages: The number of messages to be read from the kafka queue
    :type num_messages: int
    :return: list of telemetry logs
    :rtype: list[Any]
    """
    messages = []
    for _ in range(num_messages):
        message = next(self._consumer)
        if message.value is not None:
            messages.append(message.value)
    return messages
subscribe_to_topic
subscribe_to_topic(topics: list[str]) -> None

Subscribe to a list topics.

Parameters:

Name Type Description Default
topics
list[str]

the list of topics to subscribe to

required

Raises:

Type Description
UnknownTopicOrPartitionError

if a topic does not exist

Source code in boardfarm3/lib/odh/kafka_client.py
54
55
56
57
58
59
60
61
62
63
64
65
def subscribe_to_topic(self, topics: list[str]) -> None:
    """Subscribe to a list topics.

    :param topics: the list of topics to subscribe to
    :type topics: list[str]
    :raises UnknownTopicOrPartitionError: if a topic does not exist
    """
    for topic in topics:
        if topic not in self._consumer.topics():
            msg = f"The topic {topic} does not exist"
            raise UnknownTopicOrPartitionError(msg)
        self._consumer.subscribe([topic])

parsers

Linux command output parsers.

Modules:

Name Description
iptables_parser

Parser module for iptables command output.

netstat_parser

Parse netstat command lines output into dataframes.

nslookup_parser

nslookup command line utility output parser module.

iptables_parser

Parser module for iptables command output.

Classes:

Name Description
IptablesParser

Parse the iptables from table format to dict.

IptablesParser

Parse the iptables from table format to dict.

Methods:

Name Description
ip6tables

Return parsed given ip6tables output.

iptables

Return parsed given iptables output.

iptables_policy

Return the iptables policy.

ip6tables
ip6tables(ip6_tables: str) -> dict[str, list[dict]]

Return parsed given ip6tables output.

Parameters:

Name Type Description Default
ip6_tables
str

ip6tables command output

required

Returns:

Type Description
Dict[str, List[Dict]]

parsed ip6tables output in dictionary

Source code in boardfarm3/lib/parsers/iptables_parser.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def ip6tables(self, ip6_tables: str) -> dict[str, list[dict]]:
    """Return parsed given ip6tables output.

    :param ip6_tables: ip6tables command output
    :type ip6_tables: str
    :return: parsed ip6tables output in dictionary
    :rtype: Dict[str, List[Dict]]
    """
    header = self._get_headers(ip6_tables)
    header.remove("opt")
    split_chain = re.split("Chain", ip6_tables)
    key = None
    table_rule: dict[str, list[dict]] = {}
    # pylint: disable=too-many-nested-blocks, consider-using-enumerate
    for i in range(len(split_chain)):
        rule_data: list[dict[str, str]] = []
        for rule in split_chain[i].splitlines():
            rule_details: dict[str, str] = {}
            if re.match(r"\s[A-Za-z]", rule) is not None:
                key = rule.split(" ")[1]
            elif rule[:1].isdigit():
                values = list(rule.split())
                for cnt, val in enumerate(header[1:], 1):
                    rule_details[val] = values[cnt]
                if len(values) > 9:
                    dest = " ".join(map(str, values[9:]))
                    rule_details["to-target"] = dest
                rule_data.append(rule_details)
        if key:
            table_rule[key] = rule_data
    assert table_rule, "Invalid table name or Table doesn't exist"
    return table_rule
iptables
iptables(ip_tables: str) -> dict[str, list[dict]]

Return parsed given iptables output.

Parameters:

Name Type Description Default
ip_tables
str

iptables command output

required

Returns:

Type Description
Dict[str, List[Dict]]

parsed iptables output in dictionary

Source code in boardfarm3/lib/parsers/iptables_parser.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def iptables(self, ip_tables: str) -> dict[str, list[dict]]:
    """Return parsed given iptables output.

    :param ip_tables: iptables command output
    :type ip_tables: str
    :return: parsed iptables output in dictionary
    :rtype: Dict[str, List[Dict]]
    """
    headers = self._get_headers(ip_tables)
    split_chain = re.split("Chain", ip_tables)
    key = None
    table_rule: dict[str, list[dict]] = {}
    # pylint: disable=too-many-nested-blocks, consider-using-enumerate
    for i in range(len(split_chain)):
        rule_data: list[dict] = []
        for rule in split_chain[i].splitlines():
            rule_details: dict[str, str] = {}
            if re.match(r"\s[A-Za-z]", rule):
                key = rule.split(" ")[1]
            if rule[:1].isdigit():
                values = list(rule.split())
                for cnt, val in enumerate(headers[1:], 1):
                    if val not in "opt":
                        rule_details[val] = values[cnt]
                if len(values) > 10:
                    dest = " ".join(map(str, values[10:]))
                    rule_details["to-target"] = dest
                rule_data.append(rule_details)
        if key:
            table_rule[key] = rule_data
    assert table_rule, "Invalid table name or Table doesn't exist"
    return table_rule
iptables_policy
iptables_policy(ip_tables: str) -> dict[str, str]

Return the iptables policy.

Parameters:

Name Type Description Default
ip_tables
str

output of iptables command

required

Returns:

Type Description
dict[str, str]

dict of iptable policy

Source code in boardfarm3/lib/parsers/iptables_parser.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def iptables_policy(self, ip_tables: str) -> dict[str, str]:
    """Return the iptables policy.

    :param ip_tables: output of iptables command
    :type ip_tables: str
    :return: dict of iptable policy
    :rtype: dict[str, str]
    """
    policy_dict = {}
    for policies in re.split(r"Chain", ip_tables):
        if len(lines := policies.strip().split("\n")) > 1:
            policy_key = lines[0].split()[0]
            policy_value = re.search(r"\((.*?)\)", lines[0])[1]
            policy_dict[policy_key] = policy_value
    return policy_dict

netstat_parser

Parse netstat command lines output into dataframes.

Classes:

Name Description
NetstatParser

Parse netstat command lines output into dataframes.

NetstatParser

NetstatParser()

Parse netstat command lines output into dataframes.

Initialize the NetstatParser.

Methods:

Name Description
parse_netstat_output

Parse given netstat output.

Source code in boardfarm3/lib/parsers/netstat_parser.py
16
17
18
def __init__(self) -> None:
    """Initialize the NetstatParser."""
    self._inet_connections: list[str] = []
parse_netstat_output
parse_netstat_output(output: str) -> DataFrame

Parse given netstat output.

Parameters:

Name Type Description Default
output
str

netstat output

required

Returns:

Type Description
DataFrame

parsed netstat output in pandas.DataFrame

Raises:

Type Description
ValueError

on invalid netstat output

Source code in boardfarm3/lib/parsers/netstat_parser.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def parse_netstat_output(self, output: str) -> DataFrame:
    """Parse given netstat output.

    :param output: netstat output
    :type output: str
    :raises ValueError: on invalid netstat output
    :return: parsed netstat output in pandas.DataFrame
    :rtype: DataFrame
    """
    sample_bytes = bytes(output, "utf-8")
    file_out = BytesIO(sample_bytes)
    value = file_out.readline()[:5].decode("utf-8")
    counter = 0
    while "Proto" not in value:
        header = file_out.readline()
        value = header[:5].decode("utf-8")
        counter += 1
        if counter == 20:
            msg = "Invalid netstat output"
            raise ValueError(msg)
    val = file_out.readlines()
    for line in val:
        if "Active UNIX domain sockets" in str(line):
            break
        inet_header, result = self._parse_inet_connection(line, header)
        if result:
            self._inet_connections.append(result)
    return DataFrame(self._inet_connections, columns=inet_header)

nslookup_parser

nslookup command line utility output parser module.

Classes:

Name Description
NslookupParser

nslookup command line utility output parser module.

NslookupParser

nslookup command line utility output parser module.

Methods:

Name Description
parse_nslookup_output

Parse the DNS query response into dict obj.

parse_nslookup_output
parse_nslookup_output(response: str) -> dict[str, Any]

Parse the DNS query response into dict obj.

Parameters:

Name Type Description Default
response
str

nslookup CLI output

required

Returns:

Type Description
Dict[str, str]

parsed nslookup output

Source code in boardfarm3/lib/parsers/nslookup_parser.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def parse_nslookup_output(self, response: str) -> dict[str, Any]:
    """Parse the DNS query response into dict obj.

    :param response: nslookup CLI output
    :type response: str
    :return: parsed nslookup output
    :rtype: Dict[str, str]
    """
    dns_dict_obj: dict[str, Any] = {}
    val = response.replace("\t\t", " ").replace("\t", " ")
    # pylint: disable-next=too-many-nested-blocks
    for i in val.split("\r\n\r\n"):
        if "Server" in i:
            for expr in [ValidIpv4AddressRegex, AllValidIpv6AddressesRegex]:
                if re.search(expr, i):
                    matches = re.search(expr, i)[0]
                    break
            dns_dict_obj["dns_server"] = matches
        elif "Name" in i:
            dns_dict_obj["domain_name"] = re.search(r"(?:[\da-z\._]+)\.(\w+)", i)[0]
            ips: list[str] = []
            for value in [
                ValidIpv4AddressRegex,
                AllValidIpv6AddressesRegex,
            ]:
                ips.extend(matches[0] for matches in re.finditer(value, i))
            dns_dict_obj["domain_ip_addr"] = ips
        elif "AAAA" in i:
            dns_dict_obj["domain_name"] = re.search(r"(?:[\da-z\.-]+)\.(\w+)", i)[0]
            dns_dict_obj["domain_ipv6_addr"] = re.findall(
                AllValidIpv6AddressesRegex,
                i,
            )
    assert dns_dict_obj, f"Error response: {response}"
    return dns_dict_obj

power

Power module.

Functions:

Name Description
get_pdu

Get a PDU object to drive the power of an outlet.

get_pdu

get_pdu(uri: str) -> PDU

Get a PDU object to drive the power of an outlet.

Examples of pdu uris: "type://ip[:port]; outlet"

"netio://10.64.40.34; 2"
"px2://10.71.10.53:23; 2",
"px3://10.71.10.53:23; 2",
"eaton://10.71.10.53:23; 2"
more to come....

Parameters:

Name Type Description Default

uri

str

a uri with the PDU details

required

Returns:

Type Description
PDU

a PDU templated object

Raises:

Type Description
ValueError

if the PDU URI is not recognised

Source code in boardfarm3/lib/power.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def get_pdu(uri: str) -> PDU:
    """Get a PDU object to drive the power of an outlet.

    Examples of pdu uris:
        "type://ip[:port]; outlet"

        "netio://10.64.40.34; 2"
        "px2://10.71.10.53:23; 2",
        "px3://10.71.10.53:23; 2",
        "eaton://10.71.10.53:23; 2"
        more to come....

    :param uri: a uri with the PDU details
    :type uri: str
    :raises ValueError: if the PDU URI is not recognised
    :return: a PDU templated object
    :rtype: PDU
    """
    for pdu_name, pdu_type in pdu_dict.items():
        if uri.startswith(pdu_name):
            return pdu_type(uri.replace(pdu_name, ""))
    msg = f"PDU uri: '{uri}' not recognised"
    raise ValueError(msg)

python_executor

Python executor module.

Classes:

Name Description
PythonExecutor

Helper to execute python commands over a given Linux console.

PythonExecutor

PythonExecutor(console: BoardfarmPexpect, shell_prompt: list[str])

Helper to execute python commands over a given Linux console.

Initialise PythonExecutor.

Parameters:

Name Type Description Default

console

BoardfarmPexpect

the console to execute the Python commands on

required

shell_prompt

list[str]

the console shell prompt (usually a list of regexs)

required

Methods:

Name Description
exit_python

Exit the Python prompt.

run

Run the given Python command.

Source code in boardfarm3/lib/python_executor.py
14
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(self, console: BoardfarmPexpect, shell_prompt: list[str]) -> None:
    """Initialise PythonExecutor.

    :param console: the console to execute the Python commands on
    :type console: BoardfarmPexpect
    :param shell_prompt: the console shell prompt (usually a list of regexs)
    :type shell_prompt: list[str]
    """
    self._console: BoardfarmPexpect = console
    self._shell_prompt: list[str] = shell_prompt
    self._py_prompt = ">>>"
    self._console.execute_command("kill -9 $(pgrep python3)")
    self.run("python3")

exit_python

exit_python() -> None

Exit the Python prompt.

Source code in boardfarm3/lib/python_executor.py
57
58
59
60
61
62
63
def exit_python(self) -> None:
    """Exit the Python prompt."""
    with suppress(TIMEOUT):
        self._console.sendcontrol("c")
        self._console.expect(self._py_prompt)
        self._console.sendcontrol("d")
        self._console.expect(self._shell_prompt)

run

run(cmd: str, expect: str = '') -> str

Run the given Python command.

Parameters:

Name Type Description Default
cmd
str

the Python command

required
expect
str

what to expect as output from the command, defaults to ""

''

Returns:

Type Description
str

the output of the Python command

Raises:

Type Description
DeviceConnectionError

on execution/connectivity issues

Source code in boardfarm3/lib/python_executor.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def run(self, cmd: str, expect: str = "") -> str:
    """Run the given Python command.

    :param cmd: the Python command
    :type cmd: str
    :param expect: what to expect as output from the command, defaults to ""
    :type expect: str
    :raises DeviceConnectionError: on execution/connectivity issues
    :return: the output of the Python command
    :rtype: str
    """
    out: str = ""
    self._console.sendline(cmd)
    if not expect:
        self._console.expect(self._py_prompt)
        out = self._console.before
    else:
        self._console.expect(expect)
        out = self._console.before
        self._console.expect(self._py_prompt)

    if "Traceback" in out:
        msg = f"Failed to execute on python console command:\n{cmd}\nOutput:{out}"
        raise DeviceConnectionError(
            msg,
        )

    return out

regexlib

Useful regexes library.

shell_prompt

Boardfarm v3 base device shell prompts.

utils

Boardfarm common utilities module.

Functions:

Name Description
disable_logs

Disable logs for the logger with given name.

get_nth_mac_address

Get nth mac address from base mac address.

get_pytest_name

Get the test name from the test filename during runtime.

get_static_ipaddress

Return the static ip address of the device based on given ip version.

get_value_from_dict

Get value of given key from the dictionary recursively.

ip_pool_to_list

Generate ip address list based on ip pool boundaries.

retry

Retry a function if the output of the function is false.

retry_on_exception

Retry a method if any exception occurs.

disable_logs

disable_logs(logger_name: str | None = None) -> Generator

Disable logs for the logger with given name.

:yield: log with the given name

Parameters:

Name Type Description Default

logger_name

str | None

logger name, defaults to None

None
Source code in boardfarm3/lib/utils.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@contextmanager
def disable_logs(logger_name: str | None = None) -> Generator:
    """Disable logs for the logger with given name.

    :param logger_name: logger name, defaults to None
    :type logger_name: Optional[str], optional
    :yield: log with the given name
    """
    logger = logging.getLogger(logger_name)
    handlers = list(logger.handlers)
    list(map(logger.removeHandler, handlers))
    null_handler = logging.NullHandler()
    logger.addHandler(null_handler)
    try:
        yield
    finally:
        logger.removeHandler(null_handler)
        list(map(logger.addHandler, handlers))

get_nth_mac_address

get_nth_mac_address(mac_address: str, nth_number: int) -> str

Get nth mac address from base mac address.

Parameters:

Name Type Description Default

mac_address

str

base mac address

required

nth_number

int

n'th number from base mac address

required

Returns:

Type Description
str

n'th mac address

Source code in boardfarm3/lib/utils.py
20
21
22
23
24
25
26
27
def get_nth_mac_address(mac_address: str, nth_number: int) -> str:
    """Get nth mac address from base mac address.

    :param mac_address: base mac address
    :param nth_number: n'th number from base mac address
    :return: n'th mac address
    """
    return str(EUI(int(EUI(mac_address)) + nth_number, dialect=mac_unix_expanded))

get_pytest_name

get_pytest_name() -> str

Get the test name from the test filename during runtime.

Returns:

Type Description
str

current test name

Source code in boardfarm3/lib/utils.py
30
31
32
33
34
35
36
37
38
39
def get_pytest_name() -> str:
    """Get the test name from the test filename during runtime.

    :return: current test name
    """
    return (
        (os.environ.get("PYTEST_CURRENT_TEST", "::interact").split(" (setup)")[0])
        .split("::")[1]
        .replace(" ", "_")
    )

get_static_ipaddress

get_static_ipaddress(config: dict[str, Any], ip_version: str = 'ipv4') -> str | None

Return the static ip address of the device based on given ip version.

Parameters:

Name Type Description Default

config

dict[str, Any]

device config

required

ip_version

str

ip version, defaults to "ipv4"

'ipv4'

Returns:

Type Description
Optional[str]

the static ip address of the device

Source code in boardfarm3/lib/utils.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def get_static_ipaddress(
    config: dict[str, Any],
    ip_version: str = "ipv4",
) -> str | None:
    """Return the static ip address of the device based on given ip version.

    :param config: device config
    :type config: dict[str, Any]
    :param ip_version: ip version, defaults to "ipv4"
    :type ip_version: str
    :return: the static ip address of the device
    :rtype: Optional[str]
    """
    option_prefix: str = (
        "wan-static-ip:" if ip_version == "ipv4" else "wan-static-ipv6:"
    )
    if (options := config["options"].split(",")) and (
        match := next(
            (
                option.strip().split(option_prefix)[1]
                for option in options
                if option_prefix in option
            ),
            None,
        )
    ):
        return str(
            (
                IPv4Interface(match).ip
                if ip_version == "ipv4"
                else IPv6Interface(match).ip
            ),
        )
    return None

get_value_from_dict

get_value_from_dict(key: str, dictionary: dict) -> Any

Get value of given key from the dictionary recursively.

This method is used to avoid nested checks for None to get a value from dictionary without raising KeyError.

Parameters:

Name Type Description Default

key

str

name of the key

required

dictionary

dict

dictionary instance

required

Returns:

Type Description
Any

value of given key if exists, otherwise None

Source code in boardfarm3/lib/utils.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def get_value_from_dict(key: str, dictionary: dict) -> Any:  # noqa: ANN401
    """Get value of given key from the dictionary recursively.

    This method is used to avoid nested checks for None to get
    a value from dictionary without raising KeyError.

    :param key: name of the key
    :type key: str
    :param dictionary: dictionary instance
    :type dictionary: dict
    :return: value of given key if exists, otherwise None
    :rtype: Any
    """
    for name, value in dictionary.items():
        if name == key:
            return value
        if isinstance(value, dict):
            return_value = get_value_from_dict(key, value)
            if return_value is not None:
                return return_value
    return None

ip_pool_to_list

ip_pool_to_list(start_ip: IPv4Address, end_ip: IPv4Address) -> list[IPv4Address]

Generate ip address list based on ip pool boundaries.

Parameters:

Name Type Description Default

start_ip

IPv4Address

first ip of the pool

required

end_ip

IPv4Address

last ip of the pool

required

Returns:

Type Description
list[IPv4Address]

list of ip addresses based on min ip address and maximum ip address of the pool

Source code in boardfarm3/lib/utils.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def ip_pool_to_list(start_ip: IPv4Address, end_ip: IPv4Address) -> list[IPv4Address]:
    """Generate ip address list based on ip pool boundaries.

    :param start_ip: first ip of the pool
    :type start_ip: IPv4Address
    :param end_ip: last ip of the pool
    :type end_ip: IPv4Address
    :return: list of ip addresses based on min ip address and maximum
             ip address of the pool
    :rtype: list[IPv4Address]
    """
    ip_list = []
    while end_ip >= start_ip and start_ip != end_ip + 1:
        ip_list.append(start_ip)
        start_ip += 1
    return ip_list

retry

retry(func_name: Callable, max_retry: int, *args: Any) -> Any

Retry a function if the output of the function is false.

Parameters:

Name Type Description Default

func_name

Callable

name of the function to retry

required

max_retry

int

maximum number of times to be retried

required

args

Any

arguments to the function

()

Returns:

Type Description
Any

output of the function

Source code in boardfarm3/lib/utils.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def retry(func_name: Callable, max_retry: int, *args: Any) -> Any:  # noqa: ANN401
    """Retry a function if the output of the function is false.

    :param func_name: name of the function to retry
    :type func_name: Callable
    :param max_retry: maximum number of times to be retried
    :type max_retry: int
    :param args: arguments to the function
    :type args: Tuple[Any, ...]
    :return: output of the function
    :rtype: Any
    """
    for _ in range(max_retry - 1):
        output = func_name(*args)
        if output and output != "False":
            return output
        time.sleep(5)
    return func_name(*args)

retry_on_exception

retry_on_exception(
    method: Callable, args: list | tuple, retries: int = 10, tout: int = 5
) -> Any

Retry a method if any exception occurs.

Eventually, at last, throw the exception. NOTE: args must be a tuple, hence a 1 arg tuple is (,)

Parameters:

Name Type Description Default

method

Callable

name of the function to retry

required

args

list | tuple

arguments to the function

required

retries

int

maximum number of retries when a exception occur,defaults to 10. When negative, no retries are made.

10

tout

int

sleep time after every exception occur, defaults to 5

5

Returns:

Type Description
Any

output of the function

Source code in boardfarm3/lib/utils.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def retry_on_exception(
    method: Callable,
    args: list | tuple,
    retries: int = 10,
    tout: int = 5,
) -> Any:  # noqa: ANN401
    """Retry a method if any exception occurs.

    Eventually, at last, throw the exception.
    NOTE: args must be a tuple, hence a 1 arg tuple is (<arg>,)

    :param method: name of the function to retry
    :type method: Callable
    :param args: arguments to the function
    :type args: Union[List, Tuple]
    :param retries: maximum number of retries when a exception occur,defaults
                    to 10. When negative, no retries are made.
    :type retries: int
    :param tout: sleep time after every exception occur, defaults to 5
    :type tout: int
    :return: output of the function
    :rtype: Any
    """
    for re_try in range(1, retries):
        try:
            return method(*args)
        except (  # noqa: PERF203
            Exception  # noqa: BLE001  # pylint: disable=broad-except
        ) as exc:
            _LOGGER.debug("method failed %d time (%s)", re_try, exc)
            time.sleep(tout)
    return method(*args)

wrappers

Boardfarm decorators module.

Functions:

Name Description
singleton

Allow a class to become a decorator.

singleton

singleton(cls: type[AnyClass]) -> Callable[..., AnyClass]

Allow a class to become a decorator.

Parameters:

Name Type Description Default

cls

type[AnyClass]

class to become a decorator

required

Returns:

Type Description
Callable[..., AnyClass]

callable[...,AnyClass]

Source code in boardfarm3/lib/wrappers.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def singleton(cls: type[AnyClass]) -> Callable[..., AnyClass]:
    """Allow a class to become a decorator.

    :param cls: class to become a decorator
    :return: callable[...,AnyClass]
    """
    instances: dict[type[AnyClass], AnyClass] = {}

    def getinstance(*args: tuple, **kwargs: Any) -> AnyClass:  # noqa: ANN401
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)

        return instances[cls]

    return getinstance  # type: ignore[return-value]