Add implementation and tests for Sequence Number Extension.
One implementation is based on an IETF draft: https://datatracker.ietf.org/doc/draft-touch-sne/
The linux implementation is simpler and doesn't require additional flags, it just relies on standard before/after macros.
Signed-off-by: Leonard Crestez cdleonard@gmail.com --- .../tcp_authopt/tcp_authopt_test/sne_alg.py | 111 ++++++++++++++++++ .../tcp_authopt_test/test_sne_alg.py | 96 +++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/sne_alg.py create mode 100644 tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sne_alg.py
diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sne_alg.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sne_alg.py new file mode 100644 index 000000000000..252356dc87a4 --- /dev/null +++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/sne_alg.py @@ -0,0 +1,111 @@ +# SPDX-License-Identifier: GPL-2.0 +"""Python implementation of SNE algorithms""" + + +def distance(x, y): + if x < y: + return y - x + else: + return x - y + + +class SequenceNumberExtender: + """Based on https://datatracker.ietf.org/doc/draft-touch-sne/""" + + sne: int = 0 + sne_flag: int = 1 + prev_seq: int = 0 + + def calc(self, seq): + """Update internal state and return SNE for certain SEQ""" + # use current SNE to start + result = self.sne + + # both in same SNE range? + if distance(seq, self.prev_seq) < 0x80000000: + # jumps fwd over N/2? + if seq >= 0x80000000 and self.prev_seq < 0x80000000: + self.sne_flag = 0 + # move prev forward if needed + self.prev_seq = max(seq, self.prev_seq) + # both in diff SNE ranges? + else: + # jumps forward over zero? + if seq < 0x80000000: + # update prev + self.prev_seq = seq + # first jump over zero? (wrap) + if self.sne_flag == 0: + # set flag so we increment once + self.sne_flag = 1 + # increment window + self.sne = self.sne + 1 + # use updated SNE value + result = self.sne + # jump backward over zero? + else: + # use pre-rollover SNE value + result = self.sne - 1 + + return result + + +class SequenceNumberExtenderRFC: + """Based on sample code in original RFC5925 document""" + + sne: int = 0 + sne_flag: int = 1 + prev_seq: int = 0 + + def calc(self, seq): + """Update internal state and return SNE for certain SEQ""" + # set the flag when the SEG.SEQ first rolls over + if self.sne_flag == 0 and self.prev_seq > 0x7FFFFFFF and seq < 0x7FFFFFFF: + self.sne = self.sne + 1 + self.sne_flag = 1 + # decide which SNE to use after incremented + if self.sne_flag and seq > 0x7FFFFFFF: + # use the pre-increment value + sne = self.sne - 1 + else: + # use the current value + sne = self.sne + # reset the flag in the *middle* of the window + if self.prev_seq < 0x7FFFFFFF and seq > 0x7FFFFFFF: + self.sne_flag = 0 + # save the current SEQ for the next time through the code + self.prev_seq = seq + + return sne + + +def tcp_seq_before(a, b) -> bool: + return ((a - b) & 0xFFFFFFFF) > 0x80000000 + + +def tcp_seq_after(a, b) -> bool: + return tcp_seq_before(a, b) + + +class SequenceNumberExtenderLinux: + """Based on linux implementation and with no extra flags""" + + sne: int = 0 + prev_seq: int = 0 + + def reset(self, seq, sne=0): + self.prev_seq = seq + self.sne = sne + + def calc(self, seq, update=True): + sne = self.sne + if tcp_seq_before(seq, self.prev_seq): + if seq > self.prev_seq: + sne -= 1 + else: + if seq < self.prev_seq: + sne += 1 + if update and tcp_seq_before(self.prev_seq, seq): + self.prev_seq = seq + self.sne = sne + return sne diff --git a/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sne_alg.py b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sne_alg.py new file mode 100644 index 000000000000..9b74873cff4a --- /dev/null +++ b/tools/testing/selftests/tcp_authopt/tcp_authopt_test/test_sne_alg.py @@ -0,0 +1,96 @@ +# SPDX-License-Identifier: GPL-2.0 +"""Test SNE algorithm implementations""" + +import logging + +import pytest + +from .sne_alg import ( + SequenceNumberExtender, + SequenceNumberExtenderLinux, + SequenceNumberExtenderRFC, +) + +logger = logging.getLogger(__name__) + + +# Data from https://datatracker.ietf.org/doc/draft-touch-sne/ +_SNE_TEST_DATA = [ + (0x00000000, 0x00000000), + (0x00000000, 0x30000000), + (0x00000000, 0x90000000), + (0x00000000, 0x70000000), + (0x00000000, 0xA0000000), + (0x00000001, 0x00000001), + (0x00000000, 0xE0000000), + (0x00000001, 0x00000000), + (0x00000001, 0x7FFFFFFF), + (0x00000001, 0x00000000), + (0x00000001, 0x50000000), + (0x00000001, 0x80000000), + (0x00000001, 0x00000001), + (0x00000001, 0x40000000), + (0x00000001, 0x90000000), + (0x00000001, 0xB0000000), + (0x00000002, 0x0FFFFFFF), + (0x00000002, 0x20000000), + (0x00000002, 0x90000000), + (0x00000002, 0x70000000), + (0x00000002, 0xA0000000), + (0x00000003, 0x00004000), + (0x00000002, 0xD0000000), + (0x00000003, 0x20000000), + (0x00000003, 0x90000000), + (0x00000003, 0x70000000), + (0x00000003, 0xA0000000), + (0x00000004, 0x00004000), + (0x00000003, 0xD0000000), +] + + +# Easier test data with small jumps <= 0x30000000 +SNE_DATA_EASY = [ + (0x00000000, 0x00000000), + (0x00000000, 0x30000000), + (0x00000000, 0x60000000), + (0x00000000, 0x80000000), + (0x00000000, 0x90000000), + (0x00000000, 0xC0000000), + (0x00000000, 0xF0000000), + (0x00000001, 0x10000000), + (0x00000000, 0xF0030000), + (0x00000001, 0x00030000), + (0x00000001, 0x10030000), +] + + +def check_sne_alg(alg, data): + for sne, seq in data: + observed_sne = alg.calc(seq) + logger.info( + "seq %08x expected sne %08x observed sne %08x", seq, sne, observed_sne + ) + assert observed_sne == sne + + +def test_sne_alg(): + check_sne_alg(SequenceNumberExtender(), _SNE_TEST_DATA) + + +def test_sne_alg_easy(): + check_sne_alg(SequenceNumberExtender(), SNE_DATA_EASY) + + +@pytest.mark.xfail +def test_sne_alg_rfc(): + check_sne_alg(SequenceNumberExtenderRFC(), _SNE_TEST_DATA) + + +@pytest.mark.xfail +def test_sne_alg_rfc_easy(): + check_sne_alg(SequenceNumberExtenderRFC(), SNE_DATA_EASY) + + +def test_sne_alg_linux(): + check_sne_alg(SequenceNumberExtenderLinux(), _SNE_TEST_DATA) + check_sne_alg(SequenceNumberExtenderLinux(), SNE_DATA_EASY)