Server IP : 195.201.23.43 / Your IP : 13.58.214.82 Web Server : Apache System : Linux webserver2.vercom.be 5.4.0-192-generic #212-Ubuntu SMP Fri Jul 5 09:47:39 UTC 2024 x86_64 User : kdecoratie ( 1041) PHP Version : 7.1.33-63+ubuntu20.04.1+deb.sury.org+1 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /lib/python3/dist-packages/twisted/pair/test/ |
Upload File : |
# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. from twisted.trial import unittest from twisted.python import components from twisted.pair import ip, raw from zope import interface @interface.implementer(raw.IRawDatagramProtocol) class MyProtocol: def __init__(self, expecting): self.expecting = list(expecting) def datagramReceived(self, data, **kw): assert self.expecting, 'Got a packet when not expecting anymore.' expectData, expectKw = self.expecting.pop(0) expectKwKeys = expectKw.keys() expectKwKeys = list(sorted(expectKwKeys)) kwKeys = kw.keys() kwKeys = list(sorted(kwKeys)) assert expectKwKeys == kwKeys, "Expected %r, got %r" % (expectKwKeys, kwKeys) for k in expectKwKeys: assert expectKw[k] == kw[k], "Expected %s=%r, got %r" % (k, expectKw[k], kw[k]) assert expectKw == kw, "Expected %r, got %r" % (expectKw, kw) assert expectData == data, "Expected %r, got %r" % (expectData, data) class IPTests(unittest.TestCase): def testPacketParsing(self): proto = ip.IPProtocol() p1 = MyProtocol([ (b'foobar', { 'partial': 0, 'dest': '1.2.3.4', 'source': '5.6.7.8', 'protocol': 0x0F, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), ]) proto.addProto(0x0F, p1) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0F" #protocol + b"FE" #checksum + b"\x05\x06\x07\x08" + b"\x01\x02\x03\x04" + b"foobar", partial=0, dest='dummy', source='dummy', protocol='dummy', ) assert not p1.expecting, \ 'Should not expect any more packets, but still want %r' % p1.expecting def testMultiplePackets(self): proto = ip.IPProtocol() p1 = MyProtocol([ (b'foobar', { 'partial': 0, 'dest': '1.2.3.4', 'source': '5.6.7.8', 'protocol': 0x0F, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), (b'quux', { 'partial': 1, 'dest': '5.4.3.2', 'source': '6.7.8.9', 'protocol': 0x0F, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), ]) proto.addProto(0x0F, p1) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0F" #protocol + b"FE" #checksum + b"\x05\x06\x07\x08" + b"\x01\x02\x03\x04" + b"foobar", partial=0, dest='dummy', source='dummy', protocol='dummy', ) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0F" #protocol + b"FE" #checksum + b"\x06\x07\x08\x09" + b"\x05\x04\x03\x02" + b"quux", partial=1, dest='dummy', source='dummy', protocol='dummy', ) assert not p1.expecting, \ 'Should not expect any more packets, but still want %r' % p1.expecting def testMultipleSameProtos(self): proto = ip.IPProtocol() p1 = MyProtocol([ (b'foobar', { 'partial': 0, 'dest': '1.2.3.4', 'source': '5.6.7.8', 'protocol': 0x0F, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), ]) p2 = MyProtocol([ (b'foobar', { 'partial': 0, 'dest': '1.2.3.4', 'source': '5.6.7.8', 'protocol': 0x0F, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), ]) proto.addProto(0x0F, p1) proto.addProto(0x0F, p2) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0F" #protocol + b"FE" #checksum + b"\x05\x06\x07\x08" + b"\x01\x02\x03\x04" + b"foobar", partial=0, dest='dummy', source='dummy', protocol='dummy', ) assert not p1.expecting, \ 'Should not expect any more packets, but still want %r' % p1.expecting assert not p2.expecting, \ 'Should not expect any more packets, but still want %r' % p2.expecting def testWrongProtoNotSeen(self): proto = ip.IPProtocol() p1 = MyProtocol([]) proto.addProto(1, p1) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0F" #protocol + b"FE" #checksum + b"\x05\x06\x07\x08" + b"\x01\x02\x03\x04" + b"foobar", partial=0, dest='dummy', source='dummy', protocol='dummy', ) def testDemuxing(self): proto = ip.IPProtocol() p1 = MyProtocol([ (b'foobar', { 'partial': 0, 'dest': '1.2.3.4', 'source': '5.6.7.8', 'protocol': 0x0F, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), (b'quux', { 'partial': 1, 'dest': '5.4.3.2', 'source': '6.7.8.9', 'protocol': 0x0F, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), ]) proto.addProto(0x0F, p1) p2 = MyProtocol([ (b'quux', { 'partial': 1, 'dest': '5.4.3.2', 'source': '6.7.8.9', 'protocol': 0x0A, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), (b'foobar', { 'partial': 0, 'dest': '1.2.3.4', 'source': '5.6.7.8', 'protocol': 0x0A, 'version': 4, 'ihl': 20, 'tos': 7, 'tot_len': 20+6, 'fragment_id': 0xDEAD, 'fragment_offset': 0x1EEF, 'dont_fragment': 0, 'more_fragments': 1, 'ttl': 0xC0, }), ]) proto.addProto(0x0A, p2) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0A" #protocol + b"FE" #checksum + b"\x06\x07\x08\x09" + b"\x05\x04\x03\x02" + b"quux", partial=1, dest='dummy', source='dummy', protocol='dummy', ) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0F" #protocol + b"FE" #checksum + b"\x05\x06\x07\x08" + b"\x01\x02\x03\x04" + b"foobar", partial=0, dest='dummy', source='dummy', protocol='dummy', ) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0F" #protocol + b"FE" #checksum + b"\x06\x07\x08\x09" + b"\x05\x04\x03\x02" + b"quux", partial=1, dest='dummy', source='dummy', protocol='dummy', ) proto.datagramReceived(b"\x54" #ihl version + b"\x07" #tos + b"\x00\x1a" #tot_len + b"\xDE\xAD" #id + b"\xBE\xEF" #frag_off + b"\xC0" #ttl + b"\x0A" #protocol + b"FE" #checksum + b"\x05\x06\x07\x08" + b"\x01\x02\x03\x04" + b"foobar", partial=0, dest='dummy', source='dummy', protocol='dummy', ) assert not p1.expecting, \ 'Should not expect any more packets, but still want %r' % p1.expecting assert not p2.expecting, \ 'Should not expect any more packets, but still want %r' % p2.expecting def testAddingBadProtos_WrongLevel(self): """Adding a wrong level protocol raises an exception.""" e = ip.IPProtocol() try: e.addProto(42, "silliness") except components.CannotAdapt: pass else: raise AssertionError('addProto must raise an exception for bad protocols') def testAddingBadProtos_TooSmall(self): """Adding a protocol with a negative number raises an exception.""" e = ip.IPProtocol() try: e.addProto(-1, MyProtocol([])) except TypeError as e: if e.args == ('Added protocol must be positive or zero',): pass else: raise else: raise AssertionError('addProto must raise an exception for bad protocols') def testAddingBadProtos_TooBig(self): """Adding a protocol with a number >=2**32 raises an exception.""" e = ip.IPProtocol() try: e.addProto(2**32, MyProtocol([])) except TypeError as e: if e.args == ('Added protocol must fit in 32 bits',): pass else: raise else: raise AssertionError('addProto must raise an exception for bad protocols') def testAddingBadProtos_TooBig2(self): """Adding a protocol with a number >=2**32 raises an exception.""" e = ip.IPProtocol() try: e.addProto(2**32+1, MyProtocol([])) except TypeError as e: if e.args == ('Added protocol must fit in 32 bits',): pass else: raise else: raise AssertionError('addProto must raise an exception for bad protocols')Private