Skip to content

Commit

Permalink
Test for device selection and catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
electronicayciencia committed Jul 28, 2024
1 parent e248138 commit f227c56
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 37 deletions.
45 changes: 26 additions & 19 deletions EasyMCP2221/MCP2221.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ class Device:
Hint:
Multiple :class:`EasyMCP2221.Device` instances pointing to the same physical device can cause conflict.
This happens when one instance is created by some imported library via :class:`EasyMCP2221.SMBus` class,
and a second one is created elsewhere in the main program to control GPIO.
To prevent this, :class:`EasyMCP2221.Device` will return the same object when identical
initialization parameters are used.
EasyMCP2221 keeps an internal catalog of devices initialized in the same program. It tries to detect when
double initialization happens and return the same object to prevent conflicts.
This usually happens when one instance is created by some imported library via :class:`EasyMCP2221.SMBus` class;
and a second one is created elsewhere in the main program to control GPIO, or by another library also using SMBus class.
Example:
>>> import EasyMCP2221
Expand All @@ -54,9 +55,9 @@ class Device:
}
"""

_cache = {}
_catalog = {}
"""
Keep a cache of all initialized devices with its USB serial value.
Keep a catalog of all initialized devices with its USB serial value.
This will return the same object and not two different objects driving the same physical device.
"""

Expand All @@ -74,21 +75,17 @@ def __new__(cls,

## Check if this is one of the already initialized devices.
if usbserial is not None:
cache_id = (VID, PID, usbserial)
catalog_id = (VID, PID, usbserial)
else:
cache_id = (VID, PID, devnum)
catalog_id = (VID, PID, devnum)

if cache_id in Device._cache:
if catalog_id in Device._catalog:
# Re-use object.
if debug_messages: print("Re-using cached device:", cache_id)
return Device._cache[cache_id]
if debug_messages: print("Cataloged device found:", catalog_id)
return Device._catalog[catalog_id]

else:
# Create a new device and cache it
if debug_messages: print("New device cached:", cache_id)
newdev = super().__new__(cls)
Device._cache[cache_id] = newdev
return newdev
# Create a new device, init will catalog it
return super().__new__(cls)


def __init__(self,
Expand Down Expand Up @@ -144,12 +141,14 @@ def __init__(self,
# Fix Issue #8: Select by USB Serial
if usbserial is not None:
found = False
for dev in hid.enumerate(self.VID, self.PID):
for idx, dev in enumerate(hid.enumerate(self.VID, self.PID)):
try:
self.hidhandler.open_path(dev["path"])

if usbserial == self.read_flash_info()['USB_SERIAL']:
found = True
self.usbserial = usbserial # for caching
self.devnum = idx
break
else:
self.hidhandler.close()
Expand All @@ -158,7 +157,7 @@ def __init__(self,
pass

if not found:
raise ValueError("No device found with serial number %s or already in use." % usbserial)
raise RuntimeError("No device found with serial number %s or already in use." % usbserial)
else:
break

Expand All @@ -174,11 +173,15 @@ def __init__(self,
(self.VID, self.PID, self.devnum))

self.hidhandler.open_path(devices[devnum]["path"])
self.usbserial = self.read_flash_info()['USB_SERIAL']
self.devnum = devnum
break

# Default to the first device found
else:
self.hidhandler.open_path(devices[0]["path"])
self.usbserial = self.read_flash_info()['USB_SERIAL']
self.devnum = 0
break

# Ignore any exceptions and keep trying until the timeout
Expand All @@ -188,6 +191,10 @@ def __init__(self,
else:
continue

# Device selected, catalog it by index and by serial
if debug_messages: print("New device cataloged:", catalog_id)
Device._catalog[(VID, PID, self.usbserial)] = self
Device._catalog[(VID, PID, self.devnum)] = self

# Initialize current GPIO settings
settings = self.send_cmd([CMD_GET_SRAM_SETTINGS])
Expand Down
12 changes: 5 additions & 7 deletions EasyMCP2221/smbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ class SMBus(object):
SMBus object.
Example:
.. code-block:: python
from EasyMCP2221 import SMBus
bus = SMBus()
Expand All @@ -37,15 +37,13 @@ class SMBus(object):
"""

def __init__(self, bus=None, force=False, VID=0x04D8, PID=0x00DD, usbserial=None, clock=100_000, mcp=None):
def __init__(self, bus=1, force=False, VID=0x04D8, PID=0x00DD, usbserial=None, clock=100_000, mcp=None):

if mcp:
self.mcp = mcp

else:
if bus is not None:
bus = bus - 1 # first device should be 0

bus = bus - 1 # first device should be 0
self.mcp = EasyMCP2221.Device(VID, PID, devnum=bus, usbserial=usbserial)
self.mcp.I2C_speed(clock)

Expand Down
7 changes: 3 additions & 4 deletions docs/source/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ Latest (unreleased)
-------------------

Misc:
* New feature: Device instance reuse.
* New feature: Device instance catalog.
Multiple :class:`EasyMCP2221.Device` instances pointing to the same physical device can cause conflict.
This happens when one instance is created by some imported library via :class:`EasyMCP2221.SMBus` class,
and a second one is created elsewhere in the main program to control GPIO.
This will be detected and the same object will be returned for both.
EasyMCP2221 keeps an internal catalog of devices initialized in the same program. It tries to detect when
double initialization happens and return the same object to prevent conflicts.
* Improved device selection flow.

I2C:
Expand Down
212 changes: 212 additions & 0 deletions test/test_device_selection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import unittest

import EasyMCP2221
from EasyMCP2221.exceptions import *
from EasyMCP2221 import SMBus

SERIAL_OK="0002596888"
SERIAL_WRONG="9999999999"
DEV_DEFAULT_VID = 0x04D8
DEV_DEFAULT_PID = 0x00DD

class DevSelect(unittest.TestCase):

def setUp(self):
pass

def tearDown(self):
pass

#----- single selection ---------------

def test_select_by_default(self):
"""Select the default device"""
mcp = EasyMCP2221.Device()

serial = mcp.read_flash_info()['USB_SERIAL']
self.assertEqual(serial, SERIAL_OK)

def test_select_by_index_0(self):
"""Select the first device found"""
mcp = EasyMCP2221.Device(devnum=0)

serial = mcp.read_flash_info()['USB_SERIAL']
self.assertEqual(serial, SERIAL_OK)

def test_VID_PID_index_pos(self):
"""Forcing the default parameters, positional"""
mcp = EasyMCP2221.Device(DEV_DEFAULT_VID, DEV_DEFAULT_PID, 0)

serial = mcp.read_flash_info()['USB_SERIAL']
self.assertEqual(serial, SERIAL_OK)

def test_VID_PID_index_named(self):
"""Forcing the default parameters, named"""
mcp = EasyMCP2221.Device(VID=DEV_DEFAULT_VID, PID=DEV_DEFAULT_PID, devnum=0)

serial = mcp.read_flash_info()['USB_SERIAL']
self.assertEqual(serial, SERIAL_OK)

def test_wrong_VID(self):
"""Select wrong VID"""
with self.assertRaises(RuntimeError):
mcp = EasyMCP2221.Device(VID=DEV_DEFAULT_VID+1)

def test_wrong_PID(self):
"""Select wrong PID"""
with self.assertRaises(RuntimeError):
mcp = EasyMCP2221.Device(PID=DEV_DEFAULT_PID+1)

def test_select_by_serial(self):
"""Select a device by its serial number"""
mcp = EasyMCP2221.Device(usbserial=SERIAL_OK)

serial = mcp.read_flash_info()['USB_SERIAL']
self.assertEqual(serial, SERIAL_OK)

def test_select_by_wrong_serial(self):
"""Select a wrong serial number"""
with self.assertRaises(RuntimeError):
mcp = EasyMCP2221.Device(usbserial=SERIAL_WRONG)

def test_select_by_serial_and_index(self):
"""Select with serial and index, serial must take precedence"""
mcp = EasyMCP2221.Device(devnum=1,usbserial=SERIAL_OK)

serial = mcp.read_flash_info()['USB_SERIAL']
self.assertEqual(serial, SERIAL_OK)

def test_select_by_wrong_index(self):
"""Select with a wrong index"""
with self.assertRaises(RuntimeError):
mcp = EasyMCP2221.Device(devnum=10)

#----- double selections ---------------

def test_double_select_by_default(self):
"""Double select the default device"""
mcp1 = EasyMCP2221.Device()
mcp2 = EasyMCP2221.Device()

self.assertTrue(mcp1 is mcp2)

def test_double_select_default_pos(self):
"""Double select the default device. Explicit positional parameters."""
mcp1 = EasyMCP2221.Device(DEV_DEFAULT_VID, DEV_DEFAULT_PID, 0)
mcp2 = EasyMCP2221.Device(DEV_DEFAULT_VID, DEV_DEFAULT_PID, 0)

self.assertTrue(mcp1 is mcp2)

def test_double_select_default_named(self):
"""Double select the default device. Explicit named parameters."""
mcp1 = EasyMCP2221.Device(VID=DEV_DEFAULT_VID, PID=DEV_DEFAULT_PID, devnum=0)
mcp2 = EasyMCP2221.Device(VID=DEV_DEFAULT_VID, PID=DEV_DEFAULT_PID, devnum=0)

self.assertTrue(mcp1 is mcp2)

def test_double_select_default_and_pos(self):
"""Double select the default device. Default implicit and explicit positional parameters."""
mcp1 = EasyMCP2221.Device()
mcp2 = EasyMCP2221.Device(DEV_DEFAULT_VID, DEV_DEFAULT_PID, 0)

self.assertTrue(mcp1 is mcp2)

def test_double_select_default_and_named(self):
"""Double select the default device. Default implicit and explicit named parameters."""
mcp1 = EasyMCP2221.Device()
mcp2 = EasyMCP2221.Device(VID=DEV_DEFAULT_VID, PID=DEV_DEFAULT_PID, devnum=0)

self.assertTrue(mcp1 is mcp2)

def test_double_select_serial(self):
"""Double select the device by serial."""
mcp1 = EasyMCP2221.Device(usbserial=SERIAL_OK)
mcp2 = EasyMCP2221.Device(usbserial=SERIAL_OK)

self.assertTrue(mcp1 is mcp2)

def test_double_select_serial_devnum(self):
"""Double select the device, both by serial and devnum."""
mcp1 = EasyMCP2221.Device(devnum=0)
mcp2 = EasyMCP2221.Device(usbserial=SERIAL_OK)

self.assertTrue(mcp1 is mcp2)

def test_double_select_serial_devnum2(self):
"""Double select the device, both by serial and devnum, reverse order."""
mcp1 = EasyMCP2221.Device(usbserial=SERIAL_OK)
mcp2 = EasyMCP2221.Device(devnum=0)

self.assertTrue(mcp1 is mcp2)

#----- double selections SMBus default ---------------

def test_double_select_smbus_default(self):
"""Double select the device via SMBus. Default parameters."""
bus1 = SMBus()
bus2 = SMBus()

mcp1 = bus1.mcp
mcp2 = bus2.mcp

self.assertTrue(mcp1 is mcp2)

def test_double_select_smbus_default_forced(self):
"""Double select the device via SMBus. Forced default parameters."""
bus1 = SMBus(1)
bus2 = SMBus(1)

mcp1 = bus1.mcp
mcp2 = bus2.mcp

self.assertTrue(mcp1 is mcp2)

def test_double_select_smbus_default_implicit_forced(self):
"""Double select the device via SMBus. Implicit and forced default parameters."""
bus1 = SMBus()
bus2 = SMBus(1)

mcp1 = bus1.mcp
mcp2 = bus2.mcp

self.assertTrue(mcp1 is mcp2)

def test_double_select_smbus_main(self):
"""Double select the device. Once via SMBus. The other with main library."""
bus = SMBus()
mcp2 = EasyMCP2221.Device()

mcp1 = bus.mcp

self.assertTrue(mcp1 is mcp2)

def test_double_select_smbus_main_serial(self):
"""Double select the device. Once via SMBus. The other with main library. Usign serial."""
bus = SMBus(usbserial = SERIAL_OK)
mcp2 = EasyMCP2221.Device(usbserial = SERIAL_OK)

mcp1 = bus.mcp

self.assertTrue(mcp1 is mcp2)

def test_double_select_smbus_main_serial_idx(self):
"""Double select the device. Once via SMBus by index. The other with main library by serial."""
bus = SMBus(1)
mcp2 = EasyMCP2221.Device(usbserial = SERIAL_OK)

mcp1 = bus.mcp

self.assertTrue(mcp1 is mcp2)

def test_double_select_smbus_main_serial_idx_2(self):
"""Double select the device. Once via SMBus by index. The other with main library by serial. Reverse."""
mcp2 = EasyMCP2221.Device(usbserial = SERIAL_OK)
bus = SMBus(1)

mcp1 = bus.mcp

self.assertTrue(mcp1 is mcp2)


if __name__ == '__main__':
unittest.main()
7 changes: 0 additions & 7 deletions test/test_gpio.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,5 @@ def test_gpio_sram_preserve_gpio(self):
self.assertFalse(self.mcp.GPIO_read()[3])


def test_double_initialization(self):
"""Double-initialization prevention. Both mcp objects must be the same."""
mcp2 = EasyMCP2221.Device()
self.assertTrue(mcp2 is self.mcp)



if __name__ == '__main__':
unittest.main()

0 comments on commit f227c56

Please sign in to comment.