Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

Added usbtin interface #1

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions can/interfaces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"canalystii": ("can.interfaces.canalystii", "CANalystIIBus"),
"systec": ("can.interfaces.systec", "UcanBus"),
"seeedstudio": ("can.interfaces.seeedstudio", "SeeedBus"),
"usbtin": ("can.interfaces.usbtin", "USBtinBus"),
}

BACKENDS.update(
Expand Down
77 changes: 77 additions & 0 deletions can/interfaces/usbtin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
Interface for the USBtin module

Implementation references:
* Uses the pyUSBTin API (https://github.com/fishpepper/pyUSBtin)
* Interfaces with the USBtin module (https://www.fischl.de/usbtin/)
"""

from typing import List, Optional, Tuple, Any

import logging
import pyusbtin # type: ignore
import can

logger = logging.getLogger(__name__)


class USBtinBus(can.BusABC):
"""
usbtin interface
"""

def __init__(self, channel: str, baudrate: int):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you capture all other arguments (*args, **kwargs) and forward them to super?

super().__init__(channel, baudrate=baudrate)

self.usbtin = pyusbtin.usbtin.USBtin()
self.baudrate = baudrate
self.usbtin.connect(channel)
self.rx_fifo: List[Any] = []
self.channel_info = "usbtin"

self.usbtin.open_can_channel(self.baudrate, pyusbtin.usbtin.USBtin.ACTIVE)
self.usbtin.add_message_listener(self._bridge_cb)

def _bridge_cb(self, msg: Any) -> None:
self.rx_fifo.append(msg)

def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[can.Message], bool]:
if len(self.rx_fifo) == 0:
return None, False

canmsg = self.rx_fifo.pop(0)
data = [

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use bytearray() or similar to cast it? What are the different types?

canmsg[0],
canmsg[1],
canmsg[2],
canmsg[3],
canmsg[4],
canmsg[5],
canmsg[6],
canmsg[7],
]
msg = can.Message(
arbitration_id=canmsg.mid,
dlc=canmsg.dlc,
data=data[: canmsg.dlc],
is_remote_frame=canmsg.rtr,
)

return msg, False

def send(self, msg: can.Message, timeout: Optional[float] = None) -> None:
data = list(msg.data)
pymsg = pyusbtin.canmessage.CANMessage(
mid=msg.arbitration_id, dlc=msg.dlc, data=data
)
self.usbtin.send(pymsg)

def shutdown(self) -> None:
self.usbtin.close_can_channel()
self.usbtin.disconnect()

@staticmethod
def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where/how is this used?

raise NotImplementedError()
1 change: 1 addition & 0 deletions doc/interfaces.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The available interfaces are:
interfaces/canalystii
interfaces/systec
interfaces/seeedstudio
interfaces/usbtin

Additional interfaces can be added via a plugin interface. An external package
can register a new interface by using the ``can.interface`` entry point in its setup.py.
Expand Down
12 changes: 12 additions & 0 deletions doc/interfaces/usbtin.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
USBtin
=====

Interface for `Thomas Fischl's`_ USBtin device.


Bus
---

.. autoclass:: can.interfaces.usbtin.USBtinBus

.. _`Thomas Fischl's`: https://www.fischl.de/usbtin/
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
"filelock",
"mypy_extensions >= 0.4.0, < 0.5.0",
'pywin32;platform_system=="Windows"',
"pyusbtin @ git+https://github.com/fishpepper/pyUSBtin@master#egg=pyusbtin",
Copy link

@LouisBrunner LouisBrunner May 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not released on PyPi right?

],
setup_requires=pytest_runner,
extras_require=extras_require,
Expand Down
83 changes: 83 additions & 0 deletions test/test_usbtin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python
# coding: utf-8

"""
Test for usbtin interface
"""

from unittest import TestCase, mock

import can
import pyusbtin
from can.interfaces.usbtin import USBtinBus


class TestUSBtinBus(TestCase):
@mock.patch("can.interfaces.usbtin.pyusbtin.usbtin")
def setUp(self, pyusbtin_usbtin):
# setup the bus
test_port = "test_port"
test_baudrate = 100
self.bus = bus = can.Bus(
bustype="usbtin", channel=test_port, baudrate=test_baudrate
)

# test the bus
self.assertEqual(bus.baudrate, test_baudrate)
bus.usbtin.connect.assert_called_once_with(test_port)
self.assertEqual(bus.channel_info, "usbtin")
bus.usbtin.open_can_channel.assert_called_once_with(
test_baudrate, pyusbtin.usbtin.USBtin.ACTIVE
)
bus.usbtin.add_message_listener.assert_called_once_with(bus._bridge_cb)

# setup some test messages
test_id = 1
test_rtr = False
test_dlc = 2
test_data = [3, 4]
self.test_can_message = can.Message(
arbitration_id=test_id,
is_remote_frame=test_rtr,
dlc=test_dlc,
data=bytearray(test_data),
)
self.test_pyusbtin_can_message = pyusbtin.canmessage.CANMessage(
mid=test_id, dlc=test_dlc, data=test_data
)

def test_send(self):
self.bus.send(self.test_can_message)

# TODO: add addTypeEqualityFunc() to make things cleaner
self.assertEqual(
self.test_pyusbtin_can_message.mid, self.bus.usbtin.send.call_args[0][0].mid
)
self.assertEqual(
self.test_pyusbtin_can_message.dlc, self.bus.usbtin.send.call_args[0][0].dlc
)
self.assertEqual(
self.test_pyusbtin_can_message.rtr, self.bus.usbtin.send.call_args[0][0].rtr
)
self.assertEqual(
self.test_pyusbtin_can_message._data,
self.bus.usbtin.send.call_args[0][0]._data,
)

def test_shutdown(self):
self.bus.shutdown()
self.bus.usbtin.close_can_channel.assert_called_once()
self.bus.usbtin.disconnect.assert_called_once()

def test__recv_internal_empty_fifo(self):
self.assertEqual(self.bus._recv_internal(0), (None, False))

def test__recv_internal_fifo_with_entries(self):
self.bus.rx_fifo = [self.test_pyusbtin_can_message]
resp = self.bus._recv_internal(0)

self.assertEqual(resp[0].arbitration_id, self.test_can_message.arbitration_id)
self.assertEqual(resp[0].dlc, self.test_can_message.dlc)
self.assertEqual(resp[0].is_remote_frame, self.test_can_message.is_remote_frame)
self.assertEqual(resp[0].data, self.test_can_message.data)
self.assertEqual(resp[1], False)