diff --git a/amodem/__main__.py b/amodem/__main__.py index 352c4ab..be6817a 100644 --- a/amodem/__main__.py +++ b/amodem/__main__.py @@ -148,16 +148,24 @@ def create_parser(description, interface_factory): '-i', '--input', help='input file (use "-" for stdin).') receiver.add_argument( '-o', '--output', help='output file (use "-" for stdout).') + receiver.add_argument( + '-f', '--fault-tolerance', help='fault tolerance level [1]') receiver.add_argument( '-d', '--dump', type=FileType('wb'), help='Filename to save recorded audio') + receiver.add_argument( + '-s', '--stopcode', default=main.framing.StopOnFault.ALL_ERR, + type=int, + help="errors causing a decoding halt [%02x]\n%s" % ( + main.framing.StopOnFault.ALL_ERR, + main.framing.StopOnFault.helpMsg)) receiver.add_argument( '--plot', action='store_true', default=False, help='plot results using pylab module') receiver.set_defaults( main=lambda config, args: main.recv( config, src=args.src, dst=wrap(Decompressor, args.dst, args.zlib), - pylab=args.pylab, dump_audio=args.dump + pylab=args.pylab, dump_audio=args.dump, stopOnCode=args.stopcode, ), calib=lambda config, args: calib.recv( config=config, src=args.src, verbose=args.verbose, diff --git a/amodem/framing.py b/amodem/framing.py index f510532..ff4b97e 100644 --- a/amodem/framing.py +++ b/amodem/framing.py @@ -10,72 +10,151 @@ def _checksum_func(x): - ''' The result will be unsigned on Python 2/3. ''' + """ The result will be unsigned on Python 2/3. """ return binascii.crc32(bytes(x)) & 0xFFFFFFFF +def _short_checksum_func(x): + """ The result will be unsigned on Python 2/3. """ + return binascii.crc32(bytes(x)) & 0xFF + + +class StopOnFault: + """ error codes for fault tolerance - + matching this code will raise error of this type """ + + PAYLOAD_ERR = 0x01 + HEADER_ERR = 0x02 + SEQUENCE_ERR = 0x04 + ALL_ERR = 0xFF # halt on all error + helpMsg = """Payload_error: 0x01 / Header_err: 0x02 / sequence_err=0x04 """ + + class Checksum: - fmt = '>L' # unsigned longs (32-bit) + fmt = ">L" # unsigned longs (32-bit) size = struct.calcsize(fmt) + def __init__(self, stopOnCode=0): + self.fTOL = stopOnCode + def encode(self, payload): checksum = _checksum_func(payload) return struct.pack(self.fmt, checksum) + payload - def decode(self, data): - received, = struct.unpack(self.fmt, bytes(data[:self.size])) + def decode(self, data, cnt=0): + (received,) = struct.unpack(self.fmt, bytes(data[: self.size])) payload = data[self.size:] expected = _checksum_func(payload) if received != expected: - log.warning('Invalid checksum: %08x != %08x', received, expected) - raise ValueError('Invalid checksum') - log.debug('Good checksum: %08x', received) + errMSG = "Frame %d %s checksum: %02x != calc checksum: %02x " % ( + cnt, + "Payload", + received, + expected, + ) + log.warning(errMSG) + if bool(self.fTOL & StopOnFault.PAYLOAD_ERR): + raise ValueError(errMSG) + log.debug("Good checksum: %08x", received) return payload class Framer: - block_size = 250 - prefix_fmt = '>B' + """ Class for handling data frames + packing, unpacking, checksums, etc """ + + prefix_fmt = ">LBHB" prefix_len = struct.calcsize(prefix_fmt) - checksum = Checksum() - EOF = b'' + EOF = b"" + + def __init__(self, flags=0, stopOnCode=StopOnFault.ALL_ERR, + block_size=250): + self.flags = flags + self.fTOL = stopOnCode + self.checksum = Checksum(stopOnCode=stopOnCode) + self.block_size = block_size - def _pack(self, block): + def _pack(self, block, flags=0, cnt=0): frame = self.checksum.encode(block) - return bytearray(struct.pack(self.prefix_fmt, len(frame)) + frame) + prefix = struct.pack(self.prefix_fmt[:-1], cnt, flags, len(frame)) + return bytearray( + prefix + struct.pack(">B", _short_checksum_func(prefix)) + frame + ) def encode(self, data): - for block in common.iterate(data=data, size=self.block_size, - func=bytearray, truncate=False): - yield self._pack(block=block) - yield self._pack(block=self.EOF) + idx = -1 + for idx, block in enumerate(common.iterate( + data=data, size=self.block_size, func=bytearray, + truncate=False)): + yield self._pack(block=block, flags=self.flags, cnt=idx) + yield self._pack(block=self.EOF, flags=self.flags, cnt=idx + 1) def decode(self, data): data = iter(data) + local_cnt = 0 + prior_cnt = -1 + prior_len = -1 while True: - length, = _take_fmt(data, self.prefix_fmt) - frame = _take_len(data, length) - block = self.checksum.decode(frame) + cnt, flag, length, mychecksum = _take_fmt( + data, self.prefix_fmt, local_cnt, self.fTOL + ) + pre_chk = _short_checksum_func( + struct.pack(self.prefix_fmt[:-1], cnt, flag, length) + ) + if pre_chk != mychecksum: + errMSG = "Frame %d %s checksum:%02x != calc checksum:%02x" % ( + cnt, + "Header", + mychecksum, + pre_chk, + ) + log.warning(errMSG) + if bool(self.fTOL & StopOnFault.HEADER_ERR): + raise ValueError(errMSG) + + if cnt != local_cnt or (prior_cnt >= 0 and cnt != prior_cnt + 1): + errMSG = "Frame %d %s error. Msg cnt %d, Prior Msg cnt %d" % ( + local_cnt, + "Sequence counting", + cnt, + prior_cnt, + ) + log.warning(errMSG) + if bool(self.fTOL & StopOnFault.SEQUENCE_ERR): + raise ValueError(errMSG) + if length != prior_len: + length = prior_len # guessing what length should be + frame = _take_len(data, length, local_cnt, self.fTOL) + block = self.checksum.decode(frame, local_cnt) if block == self.EOF: - log.debug('EOF frame detected') + log.debug("EOF frame detected") return - + prior_cnt = cnt + prior_len = length + local_cnt += 1 yield block -def _take_fmt(data, fmt): +def _take_fmt(data, fmt, cnt=0, fTOL=StopOnFault.ALL_ERR): length = struct.calcsize(fmt) chunk = bytearray(itertools.islice(data, length)) if len(chunk) < length: - raise ValueError('missing prefix data') + errMSG = "Frame: %d - Data truncated in %s" % (cnt, "prefix") + log.error(errMSG) + if bool(fTOL & StopOnFault.HEADER_ERR): + raise ValueError(errMSG) + return struct.unpack(fmt, bytes([0]) * length) return struct.unpack(fmt, bytes(chunk)) -def _take_len(data, length): +def _take_len(data, length, cnt=0, fTOL=StopOnFault.ALL_ERR): chunk = bytearray(itertools.islice(data, length)) if len(chunk) < length: - raise ValueError('missing payload data') + errMSG = "Frame: %d - Data truncated in %s" % (cnt, "payload") + log.warning(errMSG) + if bool(fTOL & StopOnFault.PAYLOAD_ERR): + raise ValueError(errMSG) return chunk @@ -84,6 +163,7 @@ def chain_wrapper(func): def wrapped(*args, **kwargs): result = func(*args, **kwargs) return itertools.chain.from_iterable(result) + return wrapped @@ -101,9 +181,9 @@ def __init__(self): @chain_wrapper -def encode(data, framer=None): +def encode(data, framer=None, flags=0, block_size=250): converter = BitPacker() - framer = framer or Framer() + framer = framer or Framer(flags=flags, block_size=block_size) for frame in framer.encode(data): for byte in frame: yield converter.to_bits[byte] @@ -112,12 +192,13 @@ def encode(data, framer=None): @chain_wrapper def _to_bytes(bits): converter = BitPacker() - for chunk in common.iterate(data=bits, size=8, - func=tuple, truncate=True): + for chunk in common.iterate(data=bits, size=8, func=tuple, truncate=True): yield [converter.to_byte[chunk]] -def decode_frames(bits, framer=None): - framer = framer or Framer() +def decode_frames(bits, framer=None, stopOnCode=StopOnFault.ALL_ERR, + block_size=250): + """ Decodes frames from bitstream """ + framer = framer or Framer(stopOnCode=stopOnCode, block_size=block_size) for frame in framer.decode(_to_bytes(bits)): yield bytes(frame) diff --git a/amodem/main.py b/amodem/main.py index f79b0d4..4262f20 100644 --- a/amodem/main.py +++ b/amodem/main.py @@ -10,7 +10,9 @@ log = logging.getLogger(__name__) -def send(config, src, dst, gain=1.0, extra_silence=0.0): +# pylint: disable=too-many-arguments +def send(config, src, dst, gain=1.0, extra_silence=0.0, + block_size=250, flags=0x00): sender = _send.Sender(dst, config=config, gain=gain) Fs = config.Fs @@ -24,7 +26,7 @@ def send(config, src, dst, gain=1.0, extra_silence=0.0): reader = stream.Reader(src, eof=True) data = itertools.chain.from_iterable(reader) - bits = framing.encode(data) + bits = framing.encode(data, flags=flags, block_size=block_size) log.info('Starting modulation') sender.modulate(bits=bits) @@ -37,7 +39,8 @@ def send(config, src, dst, gain=1.0, extra_silence=0.0): return True -def recv(config, src, dst, dump_audio=None, pylab=None): +def recv(config, src, dst, stopOnCode=framing.StopOnFault.ALL_ERR, + dump_audio=None, pylab=None): if dump_audio: src = stream.Dumper(src, dump_audio) reader = stream.Reader(src, data_type=common.loads) @@ -61,7 +64,8 @@ def recv(config, src, dst, dump_audio=None, pylab=None): sampler = sampling.Sampler(signal, sampling.defaultInterpolator, freq=freq) - receiver.run(sampler, gain=1.0/amplitude, output=dst) + receiver.run(sampler, gain=1.0/amplitude, output=dst, + stopOnCode=stopOnCode) return True except BaseException: # pylint: disable=broad-except log.exception('Decoding failed') diff --git a/amodem/recv.py b/amodem/recv.py index 0b1259d..4691369 100644 --- a/amodem/recv.py +++ b/amodem/recv.py @@ -157,7 +157,8 @@ def _report_progress(self, noise, sampler): (1.0 - sampler.freq) * 1e6 ) - def run(self, sampler, gain, output): + def run(self, sampler, gain, output, + stopOnCode=framing.StopOnFault.ALL_ERR): log.debug('Receiving') symbols = dsp.Demux(sampler, omegas=self.omegas, Nsym=self.Nsym) self._prefix(symbols, gain=gain) @@ -168,7 +169,7 @@ def run(self, sampler, gain, output): bitstream = self._demodulate(sampler, symbols) bitstream = itertools.chain.from_iterable(bitstream) - for frame in framing.decode_frames(bitstream): + for frame in framing.decode_frames(bitstream, stopOnCode=stopOnCode): output.write(frame) self.output_size += len(frame) diff --git a/amodem/tests/test_calib.py b/amodem/tests/test_calib.py index 8da5b4e..3b871f0 100644 --- a/amodem/tests/test_calib.py +++ b/amodem/tests/test_calib.py @@ -39,8 +39,8 @@ def test_too_strong(): calib.send(config, p, gain=1.001, limit=32) p.buf.seek(0) for r in calib.detector(config, src=p): - assert not r['success'] - assert r['msg'] == 'too strong signal' + assert not r["success"] + assert r["msg"] == "too strong signal" def test_too_weak(): @@ -48,8 +48,8 @@ def test_too_weak(): calib.send(config, p, gain=0.01, limit=32) p.buf.seek(0) for r in calib.detector(config, src=p): - assert not r['success'] - assert r['msg'] == 'too weak signal' + assert not r["success"] + assert r["msg"] == "too weak signal" def test_too_noisy(): @@ -57,14 +57,15 @@ def test_too_noisy(): signal = np.array([r.choice([-1, 1]) for i in range(int(config.Fs))]) src = BytesIO(common.dumps(signal * 0.5)) for r in calib.detector(config, src=src): - assert not r['success'] - assert r['msg'] == 'too noisy signal' + assert not r["success"] + assert r["msg"] == "too noisy signal" def test_errors(): class WriteError(ProcessMock): def write(self, data): raise KeyboardInterrupt() + p = WriteError() with pytest.raises(KeyboardInterrupt): calib.send(config, p, limit=32) @@ -73,48 +74,60 @@ def write(self, data): class ReadError(ProcessMock): def read(self, n): raise KeyboardInterrupt() + p = ReadError() with pytest.raises(KeyboardInterrupt): calib.recv(config, p, verbose=True) assert p.buf.tell() == 0 -@pytest.fixture(params=[0] + [sign * mag for sign in (+1, -1) - for mag in (0.1, 1, 10, 100, 1e3, 2e3)]) -def freq_err(request): - return request.param * 1e-6 - - -def test_drift(freq_err): - freq = config.Fc * (1 + freq_err / 1e6) - t = np.arange(int(1.0 * config.Fs)) * config.Ts - frame_length = 100 - rms = 0.5 - signal = rms * np.cos(2 * np.pi * freq * t) - src = BytesIO(common.dumps(signal)) - iters = 0 - for r in calib.detector(config, src, frame_length=frame_length): - assert r['success'] is True - assert abs(r['rms'] - rms) < 1e-3 - assert abs(r['total'] - rms) < 1e-3 - iters += 1 - - assert iters > 0 - assert iters == config.baud / frame_length +# @pytest.fixture( +# scope="module", +# params=[0] +# + [sign * mag for sign in (+1, -1) for mag in +# (0.1, 1, 10, 100, 1e3, 2e3)], +# ) +# def freq_err(request): +# """Freq_Err Fixture Docstring""" +# return request.param * 1e-6 + + +def test_drift(): + drift_params = [0.0] + [ + sign * mag * 1.0e-6 + for sign in (+1.0, -1.0) + for mag in (0.1, 1.0, 10.0, 100.0, 1.0e3, 2.0e3) + ] + for freq_err in drift_params: + freq = config.Fc * (1 + freq_err / 1e6) + t = np.arange(int(1.0 * config.Fs)) * config.Ts + frame_length = 100 + rms = 0.5 + signal = rms * np.cos(2 * np.pi * freq * t) + src = BytesIO(common.dumps(signal)) + iters = 0 + for r in calib.detector(config, src, frame_length=frame_length): + assert r["success"] is True + assert abs(r["rms"] - rms) < 1e-3 + assert abs(r["total"] - rms) < 1e-3 + iters += 1 + + assert iters > 0 + assert iters == config.baud / frame_length def test_volume(): - with mock.patch('subprocess.check_call') as check_call: - ctl = calib.volume_controller('volume-control') + with mock.patch("subprocess.check_call") as check_call: + ctl = calib.volume_controller("volume-control") ctl(0.01) ctl(0.421) ctl(0.369) ctl(1) assert check_call.mock_calls == [ - mock.call(shell=True, args='volume-control 1%'), - mock.call(shell=True, args='volume-control 42%'), - mock.call(shell=True, args='volume-control 37%'), - mock.call(shell=True, args='volume-control 100%') + mock.call(shell=True, args="volume-control 1%"), + mock.call(shell=True, args="volume-control 42%"), + mock.call(shell=True, args="volume-control 37%"), + mock.call(shell=True, args="volume-control 100%"), ] with pytest.raises(AssertionError): ctl(0) @@ -125,9 +138,9 @@ def test_volume(): def test_send_max_volume(): - with mock.patch('subprocess.check_call') as check_call: - calib.send(config, dst=BytesIO(), volume_cmd='ctl', limit=1) - assert check_call.mock_calls == [mock.call(shell=True, args='ctl 100%')] + with mock.patch("subprocess.check_call") as check_call: + calib.send(config, dst=BytesIO(), volume_cmd="ctl", limit=1) + assert check_call.mock_calls == [mock.call(shell=True, args="ctl 100%")] def test_recv_binary_search(): @@ -138,12 +151,12 @@ def test_recv_binary_search(): buf.seek(0) dump = BytesIO() - with mock.patch('subprocess.check_call') as check_call: - calib.recv(config, src=buf, volume_cmd='ctl', dump_audio=dump) + with mock.patch("subprocess.check_call") as check_call: + calib.recv(config, src=buf, volume_cmd="ctl", dump_audio=dump) assert dump.getvalue() == buf.getvalue() gains.append(gains[-1]) - fmt = 'ctl {0:.0f}%' + fmt = "ctl {0:.0f}%" expected = [mock.call(shell=True, args=fmt.format(100 * g)) for g in gains] assert check_call.mock_calls == expected @@ -153,8 +166,13 @@ def test_recv_freq_change(): calib.send(config, p, gain=0.5, limit=2) offset = p.buf.tell() // 16 p.buf.seek(offset) - messages = [state['msg'] for state in calib.recv_iter(config, p)] + messages = [state["msg"] for state in calib.recv_iter(config, p)] assert messages == [ - 'good signal', 'good signal', 'good signal', - 'frequency change', - 'good signal', 'good signal', 'good signal'] + "good signal", + "good signal", + "good signal", + "frequency change", + "good signal", + "good signal", + "good signal", + ] diff --git a/amodem/tests/test_framing.py b/amodem/tests/test_framing.py index 1465d05..caae98d 100644 --- a/amodem/tests/test_framing.py +++ b/amodem/tests/test_framing.py @@ -11,43 +11,52 @@ def concat(iterable): r = random.Random(0) blob = bytearray(r.randrange(0, 256) for i in range(64 * 1024)) +data_fixture_params = [b"", b"abc", b"1234567890", blob, blob[:12345]] -@pytest.fixture(params=[b'', b'abc', b'1234567890', blob, blob[:12345]]) -def data(request): - return request.param +def test_checksum(): + for data in data_fixture_params: + c = framing.Checksum() + assert c.decode(c.encode(data)) == data -def test_checksum(data): - c = framing.Checksum() - assert c.decode(c.encode(data)) == data +def test_framer(): + for data in data_fixture_params: + f = framing.Framer() + encoded = concat(f.encode(data)) + decoded = concat(f.decode(encoded)) + assert decoded == data -def test_framer(data): - f = framing.Framer() - encoded = concat(f.encode(data)) - decoded = concat(f.decode(encoded)) - assert decoded == data - - -def test_main(data): - encoded = framing.encode(data) - decoded = framing.decode_frames(encoded) - assert concat(decoded) == data +def test_main(): + for data in data_fixture_params: + encoded = framing.encode(data) + decoded = framing.decode_frames(encoded) + assert concat(decoded) == data def test_fail(): - encoded = list(framing.encode('')) + encoded = list(framing.encode("")) encoded[-1] = not encoded[-1] with pytest.raises(ValueError): concat(framing.decode_frames(encoded)) +def test_sequenceError(): + f = framing.Framer(block_size=7) + encoded14 = concat(f.encode(b"123456789012345678901")) + footerLen = len(concat(f.encode(b""))) + blockLenBytes = int((len(encoded14) - footerLen) / 3) + badEncoded14 = encoded14[:blockLenBytes] + encoded14[2 * blockLenBytes:] + with pytest.raises(ValueError): + concat(f.decode(badEncoded14)) + + def test_missing(): f = framing.Framer() with pytest.raises(ValueError): - concat(f.decode(b'')) + concat(f.decode(b"")) with pytest.raises(ValueError): - concat(f.decode(b'\x01')) + concat(f.decode(b"\x01")) with pytest.raises(ValueError): - concat(f.decode(b'\xff')) + concat(f.decode(b"\xff")) diff --git a/amodem/tests/test_transfer.py b/amodem/tests/test_transfer.py index 308053e..fcd3dcd 100644 --- a/amodem/tests/test_transfer.py +++ b/amodem/tests/test_transfer.py @@ -10,8 +10,11 @@ import pytest import logging -logging.basicConfig(level=logging.DEBUG, # useful for debugging - format='%(asctime)s %(levelname)-12s %(message)s') + +logging.basicConfig( + level=logging.DEBUG, # useful for debugging + format="%(asctime)s %(levelname)-12s %(message)s", +) def run(size, chan=None, df=0, success=True, cfg=None): @@ -36,8 +39,9 @@ def run(size, chan=None, df=0, success=True, cfg=None): dump = BytesIO() try: - result = main.recv(config=cfg, src=rx_audio, dst=rx_data, - dump_audio=dump, pylab=None) + result = main.recv( + config=cfg, src=rx_audio, dst=rx_data, dump_audio=dump, pylab=None + ) finally: rx_audio.close() @@ -49,13 +53,14 @@ def run(size, chan=None, df=0, success=True, cfg=None): assert rx_data == tx_data -@pytest.fixture(params=[0, 1, 3, 10, 42, 123]) -def small_size(request): - return request.param +# @pytest.fixture(params=[0, 1, 3, 10, 42, 123]) +# def small_size(request): +# return request.param -def test_small(small_size): - run(small_size, chan=lambda x: x) +def test_small(): + for small_size in [0, 1, 3, 10, 42, 123]: + run(small_size, chan=lambda x: x) def test_flip(): @@ -72,14 +77,19 @@ def test_error(): run(1024, chan=lambda x: x[:-skip], success=False) -@pytest.fixture(params=[sign * mag for sign in (+1, -1) - for mag in (0.1, 1, 10, 100, 1e3, 10e3)]) -def freq_err(request): - return request.param * 1e-6 +# @pytest.fixture(params=[sign * mag for sign in (+1, -1) +# for mag in (0.1, 1, 10, 100, 1e3, 10e3)]) +# def freq_err(request): +# return request.param * 1e-6 -def test_timing(freq_err): - run(8192, df=freq_err) +def test_timing(): + for freq_err in [ + sign * mag * 1e-6 + for sign in (+1, -1) + for mag in (0.1, 1, 10, 100, 1e3, 10e3) + ]: + run(8192, df=freq_err) def test_lowpass(): @@ -108,10 +118,11 @@ def test_large(): run(54321, chan=lambda x: x) -@pytest.fixture(params=sorted(config.bitrates.keys())) -def rate(request): - return request.param +# @pytest.fixture(params=sorted(config.bitrates.keys())) +# def rate(request): +# return request.param -def test_rate(rate): - run(1, cfg=config.bitrates[rate]) +def test_rate(): + for rate in sorted(config.bitrates.keys()): + run(1, cfg=config.bitrates[rate])