|
14 | 14 | assert not len(w), 'expected vanetza stub imports to pass without any error'
|
15 | 15 |
|
16 | 16 |
|
17 |
| -from vanetza.net import ByteBufferConvertible, CohesivePacket, OsiLayer |
| 17 | +from vanetza.net import ByteBufferConvertible, CohesivePacket, ChunkPacket, OsiLayer, \ |
| 18 | + osi_layer_iterable, max_osi_layer, min_osi_layer, num_osi_layers, osi_layer_list |
18 | 19 |
|
19 | 20 |
|
20 | 21 | def test_buffer_convertable():
|
@@ -69,3 +70,50 @@ def test_cohesive_packet():
|
69 | 70 |
|
70 | 71 | packet.trim(OsiLayer.Link, 0)
|
71 | 72 | assert packet.size() == part1
|
| 73 | + |
| 74 | + |
| 75 | +def test_osi_layer_aux(): |
| 76 | + start, end = OsiLayer.Presentation, OsiLayer.Application |
| 77 | + diff = 2 |
| 78 | + |
| 79 | + assert len(list(osi_layer_iterable(start, end))) == diff |
| 80 | + assert len(osi_layer_list(start, end)) == diff |
| 81 | + |
| 82 | + assert max_osi_layer() == OsiLayer.Application |
| 83 | + assert min_osi_layer() == OsiLayer.Physical |
| 84 | + |
| 85 | + assert num_osi_layers(start, end) == diff |
| 86 | + |
| 87 | + |
| 88 | +def test_chunk_packet(): |
| 89 | + def yield_size(iterable): |
| 90 | + return map(lambda p: len(p), iterable) |
| 91 | + |
| 92 | + def yield_encoded(iterable): |
| 93 | + return map(lambda p: p.encode(), iterable) |
| 94 | + |
| 95 | + part1, part2, part3 = yield_encoded(('hello', ' ', 'world!')) |
| 96 | + |
| 97 | + packet = ChunkPacket() |
| 98 | + for layer, buffer in zip( |
| 99 | + osi_layer_list(OsiLayer.Physical, OsiLayer.Network), (part1, part2, part3) |
| 100 | + ): |
| 101 | + packet[layer] = ByteBufferConvertible(buffer) |
| 102 | + |
| 103 | + assert packet.size() == sum(yield_size((part1, part2, part3))) |
| 104 | + assert packet.size_range(OsiLayer.Physical, OsiLayer.Link) == sum(yield_size((part1, part2))) |
| 105 | + |
| 106 | + new_part3 = part3 + '!'.encode() |
| 107 | + packet.set_layer(OsiLayer.Network, ByteBufferConvertible(new_part3)) |
| 108 | + |
| 109 | + assert packet[OsiLayer.Network].convert() == new_part3 |
| 110 | + |
| 111 | + new_packet = packet.extract(OsiLayer.Physical, OsiLayer.Link) |
| 112 | + assert new_packet.size() == sum(yield_size((part1, part2))) |
| 113 | + |
| 114 | + part4, part5 = yield_encoded(('!', '!')) |
| 115 | + new_packet[OsiLayer.Presentation] = ByteBufferConvertible(part4) |
| 116 | + new_packet[OsiLayer.Application] = ByteBufferConvertible(part5) |
| 117 | + |
| 118 | + packet = packet.merge(new_packet, OsiLayer.Presentation, OsiLayer.Application) |
| 119 | + assert packet.size() == sum(yield_size((new_part3, part4, part5))) |
0 commit comments