** Background ** Currently, OVS supports several packet sampling mechanisms (sFlow, per-bridge IPFIX, per-flow IPFIX). These end up being translated into a userspace action that needs to be handled by ovs-vswitchd's handler threads only to be forwarded to some third party application that will somehow process the sample and provide observability on the datapath.
A particularly interesting use-case is controller-driven per-flow IPFIX sampling where the OpenFlow controller can add metadata to samples (via two 32bit integers) and this metadata is then available to the sample-collecting system for correlation.
** Problem ** The fact that sampled traffic share netlink sockets and handler thread time with upcalls, apart from being a performance bottleneck in the sample extraction itself, can severely compromise the datapath, yielding this solution unfit for highly loaded production systems.
Users are left with little options other than guessing what sampling rate will be OK for their traffic pattern and system load and dealing with the lost accuracy.
Looking at available infrastructure, an obvious candidated would be to use psample. However, it's current state does not help with the use-case at stake because sampled packets do not contain user-defined metadata.
** Proposal ** This series is an attempt to fix this situation by extending the existing psample infrastructure to carry a variable length user-defined cookie.
The main existing user of psample is tc's act_sample. It is also xtended to forward the action's cookie to psample.
Finally, OVS sample action is extended with a couple of attributes (OVS_SAMPLE_ATTR_PSAMPLE_{GROUP,COOKIE}) that contain a 32 group_id and a variable length cookie. When provided, OVS sends the packet to psample for observability.
In order to make it easier for users to receive samples coming from a specific source, group_id filtering is added to psample as well as a tracepoint for troubleshooting.
-- rfc_v2 -> v1: - Accomodate Ilya's comments. - Split OVS's attribute in two attributes and simplify internal handling of psample arguments. - Extend psample and tc with a user-defined cookie. - Add a tracepoint to psample to facilitate troubleshooting.
rfc_v1 -> rfc_v2: - Use psample instead of a new OVS-only multicast group. - Extend psample and tc with a user-defined cookie.
Adrian Moreno (8): net: netlink: export genl private pointer getters net: psample: add multicast filtering on group_id net: psample: add user cookie net: psample: add tracepoint net: sched: act_sample: add action cookie to sample net:openvswitch: add psample support selftests: openvswitch: add sample action. selftests: openvswitch: add psample test
Documentation/netlink/specs/ovs_flow.yaml | 6 + include/net/psample.h | 2 + include/uapi/linux/openvswitch.h | 49 ++++- include/uapi/linux/psample.h | 2 + net/netlink/genetlink.c | 2 + net/openvswitch/actions.c | 51 ++++- net/openvswitch/flow_netlink.c | 80 +++++-- net/psample/psample.c | 131 ++++++++++- net/psample/trace.h | 62 ++++++ net/sched/act_sample.c | 12 + .../selftests/net/openvswitch/openvswitch.sh | 97 +++++++- .../selftests/net/openvswitch/ovs-dpctl.py | 207 +++++++++++++++++- 12 files changed, 655 insertions(+), 46 deletions(-) create mode 100644 net/psample/trace.h
Add sample action support to ovs-dpctl.py.
Signed-off-by: Adrian Moreno amorenoz@redhat.com --- .../selftests/net/openvswitch/ovs-dpctl.py | 96 ++++++++++++++++++- 1 file changed, 95 insertions(+), 1 deletion(-)
diff --git a/tools/testing/selftests/net/openvswitch/ovs-dpctl.py b/tools/testing/selftests/net/openvswitch/ovs-dpctl.py index 1dd057afd3fb..3a2dddc57e42 100644 --- a/tools/testing/selftests/net/openvswitch/ovs-dpctl.py +++ b/tools/testing/selftests/net/openvswitch/ovs-dpctl.py @@ -8,6 +8,7 @@ import argparse import errno import ipaddress import logging +import math import multiprocessing import re import struct @@ -58,6 +59,7 @@ OVS_FLOW_CMD_DEL = 2 OVS_FLOW_CMD_GET = 3 OVS_FLOW_CMD_SET = 4
+UINT32_MAX = 0xFFFFFFFF
def macstr(mac): outstr = ":".join(["%02X" % i for i in mac]) @@ -285,7 +287,7 @@ class ovsactions(nla): ("OVS_ACTION_ATTR_SET", "none"), ("OVS_ACTION_ATTR_PUSH_VLAN", "none"), ("OVS_ACTION_ATTR_POP_VLAN", "flag"), - ("OVS_ACTION_ATTR_SAMPLE", "none"), + ("OVS_ACTION_ATTR_SAMPLE", "sample"), ("OVS_ACTION_ATTR_RECIRC", "uint32"), ("OVS_ACTION_ATTR_HASH", "none"), ("OVS_ACTION_ATTR_PUSH_MPLS", "none"), @@ -306,6 +308,91 @@ class ovsactions(nla): ("OVS_ACTION_ATTR_DROP", "uint32"), )
+ class sample(nla): + nla_flags = NLA_F_NESTED + + nla_map = ( + ("OVS_SAMPLE_ATTR_UNSPEC", "none"), + ("OVS_SAMPLE_ATTR_PROBABILITY", "uint32"), + ("OVS_SAMPLE_ATTR_ACTIONS", "ovsactions"), + ("OVS_SAMPLE_ATTR_PSAMPLE_GROUP", "uint32"), + ("OVS_SAMPLE_ATTR_PSAMPLE_COOKIE", "array(uint8)"), + ) + + def dpstr(self, more=False): + args = [] + + args.append("sample={:.2f}%".format( + 100 * self.get_attr("OVS_SAMPLE_ATTR_PROBABILITY") / + UINT32_MAX)) + + group = self.get_attr("OVS_SAMPLE_ATTR_PSAMPLE_GROUP") + cookie = self.get_attr("OVS_SAMPLE_ATTR_PSAMPLE_COOKIE") + actions = self.get_attr("OVS_SAMPLE_ATTR_ACTIONS") + + if group: + args.append("group_id=%d" % group) + if cookie: + args.append("cookie=%s" % + "".join(format(x, "02x") for x in cookie)) + if actions: + args.append("actions(%s)" % actions.dpstr(more)) + + return "sample(%s)" % ",".join(args) + + def parse(self, actstr): + """ Parses the input action string and populates the internal + attributes. The input string must start with "sample(" + + Returns the remaining action string. + Raises ValueError if the action string has invalid content. + """ + + def parse_nested_actions(actstr): + subacts = ovsactions() + parsed_len = subacts.parse(actstr) + return subacts, parsed_len + + def percent_to_rate(percent): + percent = float(percent.strip('%')) + return int(math.floor(UINT32_MAX * (percent / 100.0) + .5)) + + for (key, attr, func) in ( + ("sample", "OVS_SAMPLE_ATTR_PROBABILITY", percent_to_rate), + ("group_id", "OVS_SAMPLE_ATTR_PSAMPLE_GROUP", int), + ("cookie", "OVS_SAMPLE_ATTR_PSAMPLE_COOKIE", + lambda x: list(bytearray.fromhex(x))), + ("actions", "OVS_SAMPLE_ATTR_ACTIONS", parse_nested_actions), + ): + if not actstr.startswith(key): + continue + + actstr = actstr[len(key) :] + + if not func: + self["attrs"].append([attr, None]) + continue + + # The length of complex attributes cannot be determined + # beforehand and must be reported by the parsing func. + delim = actstr[0] + actstr = actstr[1:] + if delim == "=": + pos = strcspn(actstr, ",)") + datum = func(actstr[:pos]) + elif delim == "(": + datum, pos = func(actstr) + + self["attrs"].append([attr, datum]) + actstr = actstr[pos:] + actstr = actstr[strspn(actstr, ", ") :] + + if actstr[0] != ")": + raise ValueError("Action str: '%s' unbalanced" % actstr) + + return actstr[1:] + + class ctact(nla): nla_flags = NLA_F_NESTED
@@ -637,6 +724,13 @@ class ovsactions(nla): self["attrs"].append(["OVS_ACTION_ATTR_CT", ctact]) parsed = True
+ elif parse_starts_block(actstr, "sample(", False): + sampleact = self.sample() + actstr = sampleact.parse(actstr[len("sample(") : ]) + self["attrs"].append(["OVS_ACTION_ATTR_SAMPLE", sampleact]) + parsed = True + + actstr = actstr[strspn(actstr, ", ") :] while parencount > 0: parencount -= 1
Add a test to verify sampling packets via psample works.
In order to do that, create a subcommand in ovs-dpctl.py to listen to on the psample multicast group and print samples.
In order to also test simultaneous sFlow and psample actions, add missing parsing support for "userspace" action (via refactoring the one in sample).
Signed-off-by: Adrian Moreno amorenoz@redhat.com --- .../selftests/net/openvswitch/openvswitch.sh | 97 +++++++++- .../selftests/net/openvswitch/ovs-dpctl.py | 167 ++++++++++++++---- 2 files changed, 231 insertions(+), 33 deletions(-)
diff --git a/tools/testing/selftests/net/openvswitch/openvswitch.sh b/tools/testing/selftests/net/openvswitch/openvswitch.sh index 5cae53543849..7a2307a384a9 100755 --- a/tools/testing/selftests/net/openvswitch/openvswitch.sh +++ b/tools/testing/selftests/net/openvswitch/openvswitch.sh @@ -20,7 +20,8 @@ tests=" nat_related_v4 ip4-nat-related: ICMP related matches work with SNAT netlink_checks ovsnl: validate netlink attrs and settings upcall_interfaces ovs: test the upcall interfaces - drop_reason drop: test drop reasons are emitted" + drop_reason drop: test drop reasons are emitted + psample psample: Sampling packets with psample"
info() { [ $VERBOSE = 0 ] || echo $* @@ -170,6 +171,19 @@ ovs_drop_reason_count() return `echo "$perf_output" | grep "$pattern" | wc -l` }
+ovs_test_flow_fails () { + ERR_MSG="Flow actions may not be safe on all matching packets" + + PRE_TEST=$(dmesg | grep -c "${ERR_MSG}") + ovs_add_flow $@ &> /dev/null $@ && return 1 + POST_TEST=$(dmesg | grep -c "${ERR_MSG}") + + if [ "$PRE_TEST" == "$POST_TEST" ]; then + return 1 + fi + return 0 +} + usage() { echo echo "$0 [OPTIONS] [TEST]..." @@ -184,6 +198,87 @@ usage() { exit 1 }
+ +# psample test +# - samples packets with psample +test_psample() { + sbx_add "test_psample" || return $? + + # Add a datapath with per-vport dispatching. + ovs_add_dp "test_psample" psample -V 2:1 || return 1 + + info "create namespaces" + ovs_add_netns_and_veths "test_psample" "psample" \ + client c0 c1 172.31.110.10/24 -u || return 1 + ovs_add_netns_and_veths "test_psample" "psample" \ + server s0 s1 172.31.110.20/24 -u || return 1 + + # Check if psample actions can be configured. + ovs_add_flow "test_psample" psample \ + 'in_port(1),eth(),eth_type(0x0806),arp()' 'sample(sample=100%,group_id=1,cookie=0102)' + if [ $? == 1 ]; then + info "no support for psample - skipping" + ovs_exit_sig + return $ksft_skip + fi + + ovs_del_flows "test_psample" psample + + # Allow ARP + ovs_add_flow "test_psample" psample \ + 'in_port(1),eth(),eth_type(0x0806),arp()' '2' || return 1 + ovs_add_flow "test_psample" psample \ + 'in_port(2),eth(),eth_type(0x0806),arp()' '1' || return 1 + + # Test action verification. + OLDIFS=$IFS + IFS='*' + min_key='in_port(1),eth(),eth_type(0x800),ipv4()' + for testcase in \ + "cookie to large"*"sample(sample=100%,group_id=1,cookie=1615141312111009080706050403020100)" \ + "no group or action"*"sample(sample=100%)" \ + "no group or action with cookie"*"sample(sample=100%,cookie=deadbeef)"; + do + set -- $testcase; + ovs_test_flow_fails "test_psample" psample $min_key $2 + if [ $? == 1 ]; then + info "failed - $1" + return 1 + fi + done + IFS=$OLDIFS + + # Sample all traffic. In this case the sample action only has psample + # arguments. + ovs_add_flow "test_psample" psample \ + "in_port(1),eth(),eth_type(0x0800),ipv4(src=172.31.110.10,proto=1),icmp()" "sample(sample=100%,group_id=1,cookie=c0ffee),2" + + # Sample all traffic. In this case the sample action has both psample + # arguments and an upcall emulating simultaneous psample and + # sFlow / IPFIX. + nlpid=$(grep -E "listening on upcall packet handler" $ovs_dir/s0.out | cut -d ":" -f 2 | tr -d ' ') + ovs_add_flow "test_psample" psample \ + "in_port(2),eth(),eth_type(0x0800),ipv4(src=172.31.110.20,proto=1),icmp()" "sample(sample=100%,group_id=2,cookie=eeff0c,actions(userspace(pid=${nlpid},userdata=eeff0c))),1" + + # Record psample data. + python3 $ovs_base/ovs-dpctl.py psample >$ovs_dir/psample.out 2>$ovs_dir/psample.err & + pid=$! + on_exit "ovs_sbx test_psample kill -TERM $pid 2>/dev/null" + + # Send a single ping. + sleep 1 + ovs_sbx "test_psample" ip netns exec client ping -I c1 172.31.110.20 -c 1 || return 1 + sleep 1 + + # We should have received one userspace action upcall and 2 psample packets. + grep -E "userspace action command" $ovs_dir/s0.out >/dev/null 2>&1 || return 1 + + grep -E "rate:1,group:1,cookie:c0ffee" $ovs_dir/psample.out >/dev/null 2>&1 || return 1 + grep -E "rate:1,group:2,cookie:eeff0c" $ovs_dir/psample.out >/dev/null 2>&1 || return 1 + + return 0 +} + # drop_reason test # - drop packets and verify the right drop reason is reported test_drop_reason() { diff --git a/tools/testing/selftests/net/openvswitch/ovs-dpctl.py b/tools/testing/selftests/net/openvswitch/ovs-dpctl.py index 3a2dddc57e42..2fb5bcfe9c36 100644 --- a/tools/testing/selftests/net/openvswitch/ovs-dpctl.py +++ b/tools/testing/selftests/net/openvswitch/ovs-dpctl.py @@ -27,8 +27,10 @@ try: from pyroute2.netlink import genlmsg from pyroute2.netlink import nla from pyroute2.netlink import nlmsg_atoms - from pyroute2.netlink.exceptions import NetlinkError + from pyroute2.netlink.event import EventSocket from pyroute2.netlink.generic import GenericNetlinkSocket + from pyroute2.netlink.nlsocket import Marshal + from pyroute2.netlink.exceptions import NetlinkError import pyroute2
except ModuleNotFoundError: @@ -269,6 +271,47 @@ def parse_extract_field( return str_skipped, data
+def parse_attributes(actstr, attributes): + """Parses actstr according to attribute description. attributes must be + a list of tuples (name, attribute, parse_func), e.g: + ("pid", OVS_USERSPACE_ATTR_PID, int) + + Returns a list of parsed attributes followed by the remaining string. + """ + attrs = [] + for (key, attr, func) in attributes: + if not actstr.startswith(key): + continue + + actstr = actstr[len(key) :] + + if not func: + attrs.append([attr]) + continue + + # The length of complex attributes cannot be determined + # beforehand and must be reported by the parsing func. + delim = actstr[0] + actstr = actstr[1:] + if delim == "=": + pos = strcspn(actstr, ",)") + datum = func(actstr[:pos]) + elif delim == "(": + datum, pos = func(actstr) + + attrs.append([attr, datum]) + actstr = actstr[pos:] + + if delim == "(": + actstr = actstr[1:] + + actstr = actstr[strspn(actstr, ", ") :] + + if actstr[0] != ")": + raise ValueError("Action str: '%s' unbalanced" % actstr) + + return attrs, actstr[1:] + class ovs_dp_msg(genlmsg): # include the OVS version # We need a custom header rather than just being able to rely on @@ -357,41 +400,19 @@ class ovsactions(nla): percent = float(percent.strip('%')) return int(math.floor(UINT32_MAX * (percent / 100.0) + .5))
- for (key, attr, func) in ( + attrs_desc = ( ("sample", "OVS_SAMPLE_ATTR_PROBABILITY", percent_to_rate), ("group_id", "OVS_SAMPLE_ATTR_PSAMPLE_GROUP", int), ("cookie", "OVS_SAMPLE_ATTR_PSAMPLE_COOKIE", lambda x: list(bytearray.fromhex(x))), ("actions", "OVS_SAMPLE_ATTR_ACTIONS", parse_nested_actions), - ): - if not actstr.startswith(key): - continue - - actstr = actstr[len(key) :] - - if not func: - self["attrs"].append([attr, None]) - continue - - # The length of complex attributes cannot be determined - # beforehand and must be reported by the parsing func. - delim = actstr[0] - actstr = actstr[1:] - if delim == "=": - pos = strcspn(actstr, ",)") - datum = func(actstr[:pos]) - elif delim == "(": - datum, pos = func(actstr) - - self["attrs"].append([attr, datum]) - actstr = actstr[pos:] - actstr = actstr[strspn(actstr, ", ") :] - - if actstr[0] != ")": - raise ValueError("Action str: '%s' unbalanced" % actstr) + )
- return actstr[1:] + attrs, actstr = parse_attributes(actstr, attrs_desc) + for attr in attrs: + self["attrs"].append(attr)
+ return actstr
class ctact(nla): nla_flags = NLA_F_NESTED @@ -521,6 +542,18 @@ class ovsactions(nla): print_str += ")" return print_str
+ def parse(self, actstr): + attrs_desc = ( + ("pid", "OVS_USERSPACE_ATTR_PID", int), + ("userdata", "OVS_USERSPACE_ATTR_USERDATA", + lambda x: list(bytearray.fromhex(x))), + ("egress_tun_port", "OVS_USERSPACE_ATTR_EGRESS_TUN_PORT", int) + ) + attrs, actstr = parse_attributes(actstr, attrs_desc) + for attr in attrs: + self["attrs"].append(attr) + return actstr + def dpstr(self, more=False): print_str = ""
@@ -730,6 +763,11 @@ class ovsactions(nla): self["attrs"].append(["OVS_ACTION_ATTR_SAMPLE", sampleact]) parsed = True
+ elif parse_starts_block(actstr, "userspace(", False): + uact = self.userspace() + actstr = uact.parse(actstr[len("userpsace(") : ]) + self["attrs"].append(["OVS_ACTION_ATTR_USERSPACE", uact]) + parsed = True
actstr = actstr[strspn(actstr, ", ") :] while parencount > 0: @@ -2112,10 +2150,70 @@ class OvsFlow(GenericNetlinkSocket): print("MISS upcall[%d/%s]: %s" % (seq, pktpres, keystr), flush=True)
def execute(self, packetmsg): - print("userspace execute command") + print("userspace execute command", flush=True)
def action(self, packetmsg): - print("userspace action command") + print("userspace action command", flush=True) + + +class psample_sample(genlmsg): + nla_map = ( + ("PSAMPLE_ATTR_IIFINDEX", "none"), + ("PSAMPLE_ATTR_OIFINDEX", "none"), + ("PSAMPLE_ATTR_ORIGSIZE", "none"), + ("PSAMPLE_ATTR_SAMPLE_GROUP", "uint32"), + ("PSAMPLE_ATTR_GROUP_SEQ", "none"), + ("PSAMPLE_ATTR_SAMPLE_RATE", "uint32"), + ("PSAMPLE_ATTR_DATA", "array(uint8)"), + ("PSAMPLE_ATTR_GROUP_REFCOUNT", "none"), + ("PSAMPLE_ATTR_TUNNEL", "none"), + ("PSAMPLE_ATTR_PAD", "none"), + ("PSAMPLE_ATTR_OUT_TC", "none"), + ("PSAMPLE_ATTR_OUT_TC_OCC", "none"), + ("PSAMPLE_ATTR_LATENCY", "none"), + ("PSAMPLE_ATTR_TIMESTAMP", "none"), + ("PSAMPLE_ATTR_PROTO", "none"), + ("PSAMPLE_ATTR_USER_COOKIE", "array(uint8)"), + ) + + def dpstr(self): + fields = [] + data = "" + for (attr, value) in self["attrs"]: + if attr == "PSAMPLE_ATTR_SAMPLE_GROUP": + fields.append("group:%d" % value) + if attr == "PSAMPLE_ATTR_SAMPLE_RATE": + fields.append("rate:%d" % value) + if attr == "PSAMPLE_ATTR_USER_COOKIE": + value = "".join(format(x, "02x") for x in value) + fields.append("cookie:%s" % value) + if attr == "PSAMPLE_ATTR_DATA" and len(value) > 0: + data = "data:%s" % "".join(format(x, "02x") for x in value) + + return ("%s %s" % (",".join(fields), data)).strip() + + +class psample_msg(Marshal): + PSAMPLE_CMD_SAMPLE = 0 + PSAMPLE_CMD_GET_GROUP = 1 + PSAMPLE_CMD_NEW_GROUP = 2 + PSAMPLE_CMD_DEL_GROUP = 3 + PSAMPLE_CMD_SET_FILTER = 4 + msg_map = {PSAMPLE_CMD_SAMPLE: psample_sample} + + +class Psample(EventSocket): + genl_family = "psample" + mcast_groups = ["packets"] + marshal_class = psample_msg + + def read_samples(self): + while True: + try: + for msg in self.get(): + print(msg.dpstr(), flush=True) + except NetlinkError as ne: + raise ne
def print_ovsdp_full(dp_lookup_rep, ifindex, ndb=NDB(), vpl=OvsVport()): @@ -2175,7 +2273,7 @@ def main(argv): help="Increment 'verbose' output counter.", default=0, ) - subparsers = parser.add_subparsers() + subparsers = parser.add_subparsers(dest="subcommand")
showdpcmd = subparsers.add_parser("show") showdpcmd.add_argument( @@ -2232,6 +2330,8 @@ def main(argv): delfscmd = subparsers.add_parser("del-flows") delfscmd.add_argument("flsbr", help="Datapath name")
+ subparsers.add_parser("psample") + args = parser.parse_args()
if args.verbose > 0: @@ -2246,6 +2346,9 @@ def main(argv):
sys.setrecursionlimit(100000)
+ if args.subcommand == "psample": + Psample().read_samples() + if hasattr(args, "showdp"): found = False for iface in ndb.interfaces:
linux-kselftest-mirror@lists.linaro.org