More complex tests often have to spawn a background process, like a server which will respond to requests or tcpdump.
Add support for creating such processes using the with keyword:
with bkg("my-daemon", ..): # my-daemon is alive in this block
My initial thought was to add this support to cmd() directly but it runs the command in the constructor, so by the time we __enter__ it's too late to make sure we used "background=True".
Second useful helper transplanted from net_helper.sh is wait_port_listen().
The test itself uses socat, which insists on v6 addresses being wrapped in [], it's not the only command which requires this format, so add the wrapped address to env. The hope is to save test code from checking if address is v6.
Signed-off-by: Jakub Kicinski kuba@kernel.org --- .../selftests/drivers/net/lib/py/env.py | 5 +++ tools/testing/selftests/drivers/net/ping.py | 21 ++++++++- tools/testing/selftests/net/lib/py/utils.py | 43 +++++++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-)
diff --git a/tools/testing/selftests/drivers/net/lib/py/env.py b/tools/testing/selftests/drivers/net/lib/py/env.py index 579c5b34e6fd..dd5cb0226a31 100644 --- a/tools/testing/selftests/drivers/net/lib/py/env.py +++ b/tools/testing/selftests/drivers/net/lib/py/env.py @@ -110,6 +110,11 @@ from .remote import Remote self.addr = self.v6 if self.v6 else self.v4 self.remote_addr = self.remote_v6 if self.remote_v6 else self.remote_v4
+ self.addr_ipver = "6" if self.v6 else "4" + # Bracketed addresses, some commands need IPv6 to be inside [] + self.baddr = f"[{self.v6}]" if self.v6 else self.v4 + self.remote_baddr = f"[{self.remote_v6}]" if self.remote_v6 else self.remote_v4 + self.ifname = self.dev['ifname'] self.ifindex = self.dev['ifindex']
diff --git a/tools/testing/selftests/drivers/net/ping.py b/tools/testing/selftests/drivers/net/ping.py index 9f65a0764aab..4b49de59231c 100755 --- a/tools/testing/selftests/drivers/net/ping.py +++ b/tools/testing/selftests/drivers/net/ping.py @@ -2,8 +2,9 @@ # SPDX-License-Identifier: GPL-2.0
from lib.py import ksft_run, ksft_exit +from lib.py import ksft_eq from lib.py import NetDrvEpEnv -from lib.py import cmd +from lib.py import bkg, cmd, wait_port_listen, rand_port
def test_v4(cfg) -> None: @@ -16,6 +17,24 @@ from lib.py import cmd cmd(f"ping -c 1 -W0.5 {cfg.v6}", host=cfg.remote)
+def test_tcp(cfg) -> None: + port = rand_port() + listen_cmd = f"socat -{cfg.addr_ipver} -t 2 -u TCP-LISTEN:{port},reuseport STDOUT" + + with bkg(listen_cmd, exit_wait=True) as nc: + wait_port_listen(port) + + cmd(f"echo ping | socat -t 2 -u STDIN TCP:{cfg.baddr}:{port}", + shell=True, host=cfg.remote) + ksft_eq(nc.stdout.strip(), "ping") + + with bkg(listen_cmd, host=cfg.remote, exit_wait=True) as nc: + wait_port_listen(port, host=cfg.remote) + + cmd(f"echo ping | socat -t 2 -u STDIN TCP:{cfg.remote_baddr}:{port}", shell=True) + ksft_eq(nc.stdout.strip(), "ping") + + def main() -> None: with NetDrvEpEnv(__file__) as cfg: ksft_run(globs=globals(), case_pfx={"test_"}, args=(cfg, )) diff --git a/tools/testing/selftests/net/lib/py/utils.py b/tools/testing/selftests/net/lib/py/utils.py index 7347d0c0ff05..d3715e6c21f2 100644 --- a/tools/testing/selftests/net/lib/py/utils.py +++ b/tools/testing/selftests/net/lib/py/utils.py @@ -1,7 +1,11 @@ # SPDX-License-Identifier: GPL-2.0
import json as _json +import random +import re import subprocess +import time +
class cmd: def __init__(self, comm, shell=True, fail=True, ns=None, background=False, host=None): @@ -38,6 +42,20 @@ import subprocess (self.proc.args, stdout, stderr))
+class bkg(cmd): + def __init__(self, comm, shell=True, fail=True, ns=None, host=None, + exit_wait=False): + super().__init__(comm, background=True, + shell=shell, fail=fail, ns=ns, host=host) + self.terminate = not exit_wait + + def __enter__(self): + return self + + def __exit__(self, ex_type, ex_value, ex_tb): + return self.process(terminate=self.terminate) + + def ip(args, json=None, ns=None, host=None): cmd_str = "ip " if json: @@ -47,3 +65,28 @@ import subprocess if json: return _json.loads(cmd_obj.stdout) return cmd_obj + + +def rand_port(): + """ + Get unprivileged port, for now just random, one day we may decide to check if used. + """ + return random.randint(1024, 65535) + + +def wait_port_listen(port, proto="tcp", ns=None, host=None, sleep=0.005, deadline=5): + end = time.monotonic() + deadline + + pattern = f":{port:04X} .* " + if proto == "tcp": # for tcp protocol additionally check the socket state + pattern += "0A" + pattern = re.compile(pattern) + + while True: + data = cmd(f'cat /proc/net/{proto}*', ns=ns, host=host, shell=True).stdout + for row in data.split("\n"): + if pattern.search(row): + return + if time.monotonic() > end: + raise Exception("Waiting for port listen timed out") + time.sleep(sleep)