Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fault tolerance on recv #46

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion amodem/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,24 @@ def create_parser(description, interface_factory):
'-i', '--input', help='input file (use "-" for stdin).')
receiver.add_argument(
'-o', '--output', help='output file (use "-" for stdout).')
receiver.add_argument(
'-f', '--fault-tolerance', help='fault tolerance level [1]')
receiver.add_argument(
'-d', '--dump', type=FileType('wb'),
help='Filename to save recorded audio')
receiver.add_argument(
'-s', '--stopcode', default=main.framing.StopOnFault.ALL_ERR,
type=int,
help="errors causing a decoding halt [%02x]\n%s" % (
main.framing.StopOnFault.ALL_ERR,
main.framing.StopOnFault.helpMsg))
receiver.add_argument(
'--plot', action='store_true', default=False,
help='plot results using pylab module')
receiver.set_defaults(
main=lambda config, args: main.recv(
config, src=args.src, dst=wrap(Decompressor, args.dst, args.zlib),
pylab=args.pylab, dump_audio=args.dump
pylab=args.pylab, dump_audio=args.dump, stopOnCode=args.stopcode,
),
calib=lambda config, args: calib.recv(
config=config, src=args.src, verbose=args.verbose,
Expand Down
145 changes: 113 additions & 32 deletions amodem/framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,72 +10,151 @@


def _checksum_func(x):
''' The result will be unsigned on Python 2/3. '''
""" The result will be unsigned on Python 2/3. """
return binascii.crc32(bytes(x)) & 0xFFFFFFFF


def _short_checksum_func(x):
""" The result will be unsigned on Python 2/3. """
return binascii.crc32(bytes(x)) & 0xFF


class StopOnFault:
""" error codes for fault tolerance -
matching this code will raise error of this type """

PAYLOAD_ERR = 0x01
HEADER_ERR = 0x02
SEQUENCE_ERR = 0x04
ALL_ERR = 0xFF # halt on all error
helpMsg = """Payload_error: 0x01 / Header_err: 0x02 / sequence_err=0x04 """


class Checksum:
fmt = '>L' # unsigned longs (32-bit)
fmt = ">L" # unsigned longs (32-bit)
size = struct.calcsize(fmt)

def __init__(self, stopOnCode=0):
self.fTOL = stopOnCode

def encode(self, payload):
checksum = _checksum_func(payload)
return struct.pack(self.fmt, checksum) + payload

def decode(self, data):
received, = struct.unpack(self.fmt, bytes(data[:self.size]))
def decode(self, data, cnt=0):
(received,) = struct.unpack(self.fmt, bytes(data[: self.size]))
payload = data[self.size:]
expected = _checksum_func(payload)
if received != expected:
log.warning('Invalid checksum: %08x != %08x', received, expected)
raise ValueError('Invalid checksum')
log.debug('Good checksum: %08x', received)
errMSG = "Frame %d %s checksum: %02x != calc checksum: %02x " % (
cnt,
"Payload",
received,
expected,
)
log.warning(errMSG)
if bool(self.fTOL & StopOnFault.PAYLOAD_ERR):
raise ValueError(errMSG)
log.debug("Good checksum: %08x", received)
return payload


class Framer:
block_size = 250
prefix_fmt = '>B'
""" Class for handling data frames
packing, unpacking, checksums, etc """

prefix_fmt = ">LBHB"
prefix_len = struct.calcsize(prefix_fmt)
checksum = Checksum()

EOF = b''
EOF = b""

def __init__(self, flags=0, stopOnCode=StopOnFault.ALL_ERR,
block_size=250):
self.flags = flags
self.fTOL = stopOnCode
self.checksum = Checksum(stopOnCode=stopOnCode)
self.block_size = block_size

def _pack(self, block):
def _pack(self, block, flags=0, cnt=0):
frame = self.checksum.encode(block)
return bytearray(struct.pack(self.prefix_fmt, len(frame)) + frame)
prefix = struct.pack(self.prefix_fmt[:-1], cnt, flags, len(frame))
return bytearray(
prefix + struct.pack(">B", _short_checksum_func(prefix)) + frame
)

def encode(self, data):
for block in common.iterate(data=data, size=self.block_size,
func=bytearray, truncate=False):
yield self._pack(block=block)
yield self._pack(block=self.EOF)
idx = -1
for idx, block in enumerate(common.iterate(
data=data, size=self.block_size, func=bytearray,
truncate=False)):
yield self._pack(block=block, flags=self.flags, cnt=idx)
yield self._pack(block=self.EOF, flags=self.flags, cnt=idx + 1)

def decode(self, data):
data = iter(data)
local_cnt = 0
prior_cnt = -1
prior_len = -1
while True:
length, = _take_fmt(data, self.prefix_fmt)
frame = _take_len(data, length)
block = self.checksum.decode(frame)
cnt, flag, length, mychecksum = _take_fmt(
data, self.prefix_fmt, local_cnt, self.fTOL
)
pre_chk = _short_checksum_func(
struct.pack(self.prefix_fmt[:-1], cnt, flag, length)
)
if pre_chk != mychecksum:
errMSG = "Frame %d %s checksum:%02x != calc checksum:%02x" % (
cnt,
"Header",
mychecksum,
pre_chk,
)
log.warning(errMSG)
if bool(self.fTOL & StopOnFault.HEADER_ERR):
raise ValueError(errMSG)

if cnt != local_cnt or (prior_cnt >= 0 and cnt != prior_cnt + 1):
errMSG = "Frame %d %s error. Msg cnt %d, Prior Msg cnt %d" % (
local_cnt,
"Sequence counting",
cnt,
prior_cnt,
)
log.warning(errMSG)
if bool(self.fTOL & StopOnFault.SEQUENCE_ERR):
raise ValueError(errMSG)
if length != prior_len:
length = prior_len # guessing what length should be
frame = _take_len(data, length, local_cnt, self.fTOL)
block = self.checksum.decode(frame, local_cnt)
if block == self.EOF:
log.debug('EOF frame detected')
log.debug("EOF frame detected")
return

prior_cnt = cnt
prior_len = length
local_cnt += 1
yield block


def _take_fmt(data, fmt):
def _take_fmt(data, fmt, cnt=0, fTOL=StopOnFault.ALL_ERR):
length = struct.calcsize(fmt)
chunk = bytearray(itertools.islice(data, length))
if len(chunk) < length:
raise ValueError('missing prefix data')
errMSG = "Frame: %d - Data truncated in %s" % (cnt, "prefix")
log.error(errMSG)
if bool(fTOL & StopOnFault.HEADER_ERR):
raise ValueError(errMSG)
return struct.unpack(fmt, bytes([0]) * length)
return struct.unpack(fmt, bytes(chunk))


def _take_len(data, length):
def _take_len(data, length, cnt=0, fTOL=StopOnFault.ALL_ERR):
chunk = bytearray(itertools.islice(data, length))
if len(chunk) < length:
raise ValueError('missing payload data')
errMSG = "Frame: %d - Data truncated in %s" % (cnt, "payload")
log.warning(errMSG)
if bool(fTOL & StopOnFault.PAYLOAD_ERR):
raise ValueError(errMSG)
return chunk


Expand All @@ -84,6 +163,7 @@ def chain_wrapper(func):
def wrapped(*args, **kwargs):
result = func(*args, **kwargs)
return itertools.chain.from_iterable(result)

return wrapped


Expand All @@ -101,9 +181,9 @@ def __init__(self):


@chain_wrapper
def encode(data, framer=None):
def encode(data, framer=None, flags=0, block_size=250):
converter = BitPacker()
framer = framer or Framer()
framer = framer or Framer(flags=flags, block_size=block_size)
for frame in framer.encode(data):
for byte in frame:
yield converter.to_bits[byte]
Expand All @@ -112,12 +192,13 @@ def encode(data, framer=None):
@chain_wrapper
def _to_bytes(bits):
converter = BitPacker()
for chunk in common.iterate(data=bits, size=8,
func=tuple, truncate=True):
for chunk in common.iterate(data=bits, size=8, func=tuple, truncate=True):
yield [converter.to_byte[chunk]]


def decode_frames(bits, framer=None):
framer = framer or Framer()
def decode_frames(bits, framer=None, stopOnCode=StopOnFault.ALL_ERR,
block_size=250):
""" Decodes frames from bitstream """
framer = framer or Framer(stopOnCode=stopOnCode, block_size=block_size)
for frame in framer.decode(_to_bytes(bits)):
yield bytes(frame)
12 changes: 8 additions & 4 deletions amodem/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
log = logging.getLogger(__name__)


def send(config, src, dst, gain=1.0, extra_silence=0.0):
# pylint: disable=too-many-arguments
def send(config, src, dst, gain=1.0, extra_silence=0.0,
block_size=250, flags=0x00):
sender = _send.Sender(dst, config=config, gain=gain)
Fs = config.Fs

Expand All @@ -24,7 +26,7 @@ def send(config, src, dst, gain=1.0, extra_silence=0.0):

reader = stream.Reader(src, eof=True)
data = itertools.chain.from_iterable(reader)
bits = framing.encode(data)
bits = framing.encode(data, flags=flags, block_size=block_size)
log.info('Starting modulation')
sender.modulate(bits=bits)

Expand All @@ -37,7 +39,8 @@ def send(config, src, dst, gain=1.0, extra_silence=0.0):
return True


def recv(config, src, dst, dump_audio=None, pylab=None):
def recv(config, src, dst, stopOnCode=framing.StopOnFault.ALL_ERR,
dump_audio=None, pylab=None):
if dump_audio:
src = stream.Dumper(src, dump_audio)
reader = stream.Reader(src, data_type=common.loads)
Expand All @@ -61,7 +64,8 @@ def recv(config, src, dst, dump_audio=None, pylab=None):

sampler = sampling.Sampler(signal, sampling.defaultInterpolator,
freq=freq)
receiver.run(sampler, gain=1.0/amplitude, output=dst)
receiver.run(sampler, gain=1.0/amplitude, output=dst,
stopOnCode=stopOnCode)
return True
except BaseException: # pylint: disable=broad-except
log.exception('Decoding failed')
Expand Down
5 changes: 3 additions & 2 deletions amodem/recv.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ def _report_progress(self, noise, sampler):
(1.0 - sampler.freq) * 1e6
)

def run(self, sampler, gain, output):
def run(self, sampler, gain, output,
stopOnCode=framing.StopOnFault.ALL_ERR):
log.debug('Receiving')
symbols = dsp.Demux(sampler, omegas=self.omegas, Nsym=self.Nsym)
self._prefix(symbols, gain=gain)
Expand All @@ -168,7 +169,7 @@ def run(self, sampler, gain, output):
bitstream = self._demodulate(sampler, symbols)
bitstream = itertools.chain.from_iterable(bitstream)

for frame in framing.decode_frames(bitstream):
for frame in framing.decode_frames(bitstream, stopOnCode=stopOnCode):
output.write(frame)
self.output_size += len(frame)

Expand Down
Loading