Skip to content

Commit

Permalink
Fixes for python2
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelarnauts committed Jan 29, 2021
1 parent a4de270 commit d46b3aa
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 86 deletions.
46 changes: 17 additions & 29 deletions .github/workflows/addon-check.yml
Original file line number Diff line number Diff line change
@@ -1,39 +1,27 @@
name: Kodi
on:
- pull_request
- push
# Run action when pushed to master, or for commits in a pull request.
push:
branches:
- master
pull_request:
branches:
- master
jobs:
tests:
kodi-addon-checker:
name: Addon checker
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
kodi-branch: [leia, matrix]
kodi-version: [ leia, matrix ]
steps:
- uses: actions/checkout@v2
with:
path: ${{ github.repository }}
- name: Set up Python 3.8
uses: actions/setup-python@v1
with:
python-version: 3.8
- name: Install dependencies
run: |
sudo apt-get install xmlstarlet
python -m pip install --upgrade pip
# FIXME: Requires changes from xbmc/addon-check#217
#pip install kodi-addon-checker
pip install git+git://github.com/xbmc/addon-check.git@master
- name: Remove unwanted files
run: awk '/export-ignore/ { print $1 }' .gitattributes | xargs rm -rf --
working-directory: ${{ github.repository }}
- name: Rewrite addon.xml for Matrix
run: |
xmlstarlet ed -L -u '/addon/requires/import[@addon="xbmc.python"]/@version' -v "3.0.0" addon.xml
version=$(xmlstarlet sel -t -v 'string(/addon/@version)' addon.xml)
xmlstarlet ed -L -u '/addon/@version' -v "${version}+matrix.99" addon.xml
working-directory: ${{ github.repository }}
if: matrix.kodi-branch == 'matrix'
- name: Check out ${{ github.sha }} from repository ${{ github.repository }}
uses: actions/checkout@v2

- name: Run kodi-addon-checker
run: kodi-addon-checker --branch=${{ matrix.kodi-branch }} ${{ github.repository }}/
uses: xbmc/action-kodi-addon-checker@v1.2
with:
kodi-version: ${{ matrix.kodi-version }}
rewrite-for-matrix: true
addon-id: ${{ github.event.repository.name }}
1 change: 1 addition & 0 deletions resources/lib/modules/iptvsimple.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def check(cls):
try:
addon = kodiutils.get_addon(IPTV_SIMPLE_ID)
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception(exc)
return True # It might be restarting

# Validate IPTV Simple configuration
Expand Down
31 changes: 21 additions & 10 deletions resources/lib/modules/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
class Sources:
"""Helper class for Source updating"""

def __init__(self):
""" Initialise object """

@classmethod
def refresh(cls, show_progress=False):
"""Update channels and EPG data"""
Expand Down Expand Up @@ -88,9 +91,12 @@ def refresh(cls, show_progress=False):
progress.close()


class Source:
class Source(object): # pylint: disable=useless-object-inheritance
""" Base class for a Source """

def __init__(self):
""" Initialise object """

@staticmethod
def detect_sources():
""" Detect available sources. """
Expand All @@ -117,31 +123,30 @@ def _load_url(self, url):
""" Load the specified URL. """
response = requests.get(url)
response.raise_for_status()
data = response.content

if url.lower().endswith('.gz'):
return self._decompress_gz(data)
return self._decompress_gz(response.content)
if url.lower().endswith('.bz2'):
return self._decompress_bz2(data)
return self._decompress_bz2(response.content)

return data
return response.text

def _load_file(self, filename):
""" Load the specified file. """
with open(filename, 'r') as fdesc:
with open(filename, 'rb') as fdesc:
data = fdesc.read()

if filename.lower().endswith('.gz'):
return self._decompress_gz(data)
if filename.lower().endswith('.bz2'):
return self._decompress_bz2(data)

return data
return data.decode()

@staticmethod
def _extract_m3u(data):
""" Extract the m3u content """
return data.replace('#EXTM3U\n', '')
return data.replace('#EXTM3U', '').strip()

@staticmethod
def _extract_xmltv(data):
Expand All @@ -151,8 +156,14 @@ def _extract_xmltv(data):
@staticmethod
def _decompress_gz(data):
""" Decompress gzip data. """
from gzip import decompress
return decompress(data).decode()
try: # Python 3
from gzip import decompress
return decompress(data).decode()
except ImportError: # Python 2
from gzip import GzipFile
from StringIO import StringIO
with GzipFile(fileobj=StringIO(data)) as fdesc:
return fdesc.read().decode()

@staticmethod
def _decompress_bz2(data):
Expand Down
2 changes: 2 additions & 0 deletions resources/lib/modules/sources/addon.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class AddonSource(Source):
EPG_VERSION = 1

def __init__(self, addon_id, enabled=False, channels_uri=None, epg_uri=None):
""" Initialise object """
super(AddonSource, self).__init__()
self.addon_id = addon_id
self.enabled = enabled
self.channels_uri = channels_uri
Expand Down
15 changes: 11 additions & 4 deletions resources/lib/modules/sources/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class CustomSource(Source):
TYPE_FILE = 2

def __init__(self, uuid, name, enabled, playlist_uri=None, playlist_type=TYPE_NONE, epg_uri=None, epg_type=TYPE_NONE):
""" Initialise object """
super(CustomSource, self).__init__()
self.uuid = uuid
self.name = name
self.enabled = enabled
Expand Down Expand Up @@ -106,28 +108,33 @@ def get_epg(self):

def save(self):
""" Save this source. """
output_path = kodiutils.addon_profile()
try:
with open(os.path.join(kodiutils.addon_profile(), CustomSource.SOURCES_FILE), 'r') as fdesc:
if not os.path.exists(output_path):
os.mkdir(output_path)

with open(os.path.join(output_path, CustomSource.SOURCES_FILE), 'r') as fdesc:
sources = json.loads(fdesc.read())
except (IOError, TypeError, ValueError):
sources = {}

# Update the element with my uuid
sources[self.uuid] = self.__dict__

with open(os.path.join(kodiutils.addon_profile(), CustomSource.SOURCES_FILE), 'w') as fdesc:
with open(os.path.join(output_path, CustomSource.SOURCES_FILE), 'w') as fdesc:
json.dump(sources, fdesc)

def delete(self):
""" Delete this source. """
output_path = kodiutils.addon_profile()
try:
with open(os.path.join(kodiutils.addon_profile(), CustomSource.SOURCES_FILE), 'r') as fdesc:
with open(os.path.join(output_path, CustomSource.SOURCES_FILE), 'r') as fdesc:
sources = json.loads(fdesc.read())
except (IOError, TypeError, ValueError):
sources = {}

# Remove the element with my uuid
sources.pop(self.uuid)

with open(os.path.join(kodiutils.addon_profile(), CustomSource.SOURCES_FILE), 'w') as fdesc:
with open(os.path.join(output_path, CustomSource.SOURCES_FILE), 'w') as fdesc:
json.dump(sources, fdesc)
Binary file added tests/data/custom_playlist.m3u.gz
Binary file not shown.
8 changes: 4 additions & 4 deletions tests/home/addons/plugin.video.example.raw/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ def send(self):
@via_socket
def send_channels(): # pylint: disable=no-method-argument
"""Return JSON-STREAMS formatted information to IPTV Manager"""
with open(os.path.dirname(__file__) + '/resources/raw_playlist.m3u', 'r') as fdesc:
with open(os.path.dirname(__file__) + '/resources/raw_playlist.m3u', 'rb') as fdesc:
channels = fdesc.read()
return channels
return channels.decode()

@via_socket
def send_epg(): # pylint: disable=no-method-argument
"""Return JSON-EPG formatted information to IPTV Manager"""
with open(os.path.dirname(__file__) + '/resources/raw_epg.xml', 'r') as fdesc:
with open(os.path.dirname(__file__) + '/resources/raw_epg.xml', 'rb') as fdesc:
epg = fdesc.read()
return epg
return epg.decode()


if __name__ == "__main__":
Expand Down
102 changes: 63 additions & 39 deletions tests/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ def test_create(self):
self.assertNotIn(key, [source.uuid for source in sources])

def test_fetch_none(self):
source = CustomSource(uuid=str(uuid4()),
name='Test Source',
enabled=True,
playlist_uri=None,
playlist_type=CustomSource.TYPE_NONE,
epg_uri=None,
epg_type=CustomSource.TYPE_NONE,
)
source = CustomSource(
uuid=str(uuid4()),
name='Test Source',
enabled=True,
playlist_uri=None,
playlist_type=CustomSource.TYPE_NONE,
epg_uri=None,
epg_type=CustomSource.TYPE_NONE,
)

channels = source.get_channels()
self.assertEqual(channels, '')
Expand All @@ -69,20 +70,30 @@ def test_fetch_none(self):
self.assertEqual(epg, '')

def test_fetch_file(self):
source = CustomSource(uuid=str(uuid4()),
name='Test Source',
enabled=True,
playlist_uri=os.path.realpath('tests/data/custom_playlist.m3u'),
playlist_type=CustomSource.TYPE_FILE,
epg_uri=os.path.realpath('tests/data/custom_epg.xml'),
epg_type=CustomSource.TYPE_FILE,
)
source = CustomSource(
uuid=str(uuid4()),
name='Test Source',
enabled=True,
playlist_uri=os.path.realpath('tests/data/custom_playlist.m3u'),
playlist_type=CustomSource.TYPE_FILE,
epg_uri=os.path.realpath('tests/data/custom_epg.xml'),
epg_type=CustomSource.TYPE_FILE,
)
expected_channels = Source._extract_m3u(open('tests/data/custom_playlist.m3u', 'r').read())
expected_epg = Source._extract_xmltv(open('tests/data/custom_epg.xml', 'r').read())

# Test channels
channels = source.get_channels()
self.assertEqual(channels.replace('\r\n', '\n'), expected_channels)

# Test channels (gzip)
source.playlist_uri = os.path.realpath('tests/data/custom_playlist.m3u.gz')
channels = source.get_channels()
self.assertEqual(channels, Source._extract_m3u(open('tests/data/custom_playlist.m3u').read()))
self.assertEqual(channels, expected_channels)

# Test EPG
epg = source.get_epg()
self.assertEqual(epg, Source._extract_xmltv(open('tests/data/custom_epg.xml').read()))
self.assertEqual(epg.replace('\r\n', '\n'), expected_epg)

def test_fetch_url(self):

Expand All @@ -99,55 +110,68 @@ def raise_for_status(self):
def content(self):
return self.data

@property
def text(self):
return self.data.decode()

if args[0].endswith('m3u'):
data = open('tests/data/custom_playlist.m3u', 'r').read()
data = open('tests/data/custom_playlist.m3u', 'rb').read()
return MockResponse(data, 200)

if args[0].endswith('m3u.gz'):
from gzip import compress
data = open('tests/data/custom_playlist.m3u', 'rb').read()
return MockResponse(compress(data), 200)
try: # Python 3
from gzip import compress
return MockResponse(compress(data), 200)
except ImportError: # Python 2
from gzip import GzipFile
from StringIO import StringIO
buf = StringIO()
with GzipFile(fileobj=buf, mode='wb') as f:
f.write(data)
return MockResponse(buf.getvalue(), 200)

if args[0].endswith('m3u.bz2'):
from bz2 import compress
data = open('tests/data/custom_playlist.m3u', 'rb').read()
return MockResponse(compress(data), 200)

if args[0].endswith('xml'):
data = open('tests/data/custom_epg.xml', 'r').read()
data = open('tests/data/custom_epg.xml', 'rb').read()
return MockResponse(data, 200)

return MockResponse(None, 404)

with patch('requests.get', side_effect=mocked_requests_get):
source = CustomSource(uuid=str(uuid4()),
name='Test Source',
enabled=True,
playlist_uri='https://example.com/playlist.m3u',
playlist_type=CustomSource.TYPE_URL,
epg_uri='https://example.com/xmltv.xml',
epg_type=CustomSource.TYPE_URL,
)
source = CustomSource(
uuid=str(uuid4()),
name='Test Source',
enabled=True,
playlist_uri='https://example.com/playlist.m3u',
playlist_type=CustomSource.TYPE_URL,
epg_uri='https://example.com/xmltv.xml',
epg_type=CustomSource.TYPE_URL,
)
expected_channels = Source._extract_m3u(open('tests/data/custom_playlist.m3u', 'r').read())
expected_epg = Source._extract_xmltv(open('tests/data/custom_epg.xml', 'r').read())

with patch('requests.get', side_effect=mocked_requests_get):
# Test channels
channels = source.get_channels()
self.assertEqual(channels, Source._extract_m3u(open('tests/data/custom_playlist.m3u').read()))

# Test EPG
epg = source.get_epg()
self.assertEqual(epg, Source._extract_xmltv(open('tests/data/custom_epg.xml').read()))
self.assertEqual(channels.replace('\r\n', '\n'), expected_channels)

# Test channels (gzip)
source.playlist_uri = 'https://example.com/playlist.m3u.gz'
channels = source.get_channels()
self.assertEqual(channels, Source._extract_m3u(open('tests/data/custom_playlist.m3u').read()))
self.assertEqual(channels, expected_channels)

# Test channels (bzip2)
source.playlist_uri = 'https://example.com/playlist.m3u.bz2'
channels = source.get_channels()
self.assertEqual(channels, Source._extract_m3u(open('tests/data/custom_playlist.m3u').read()))

self.assertEqual(channels, expected_channels)

# Test EPG
epg = source.get_epg()
self.assertEqual(epg.replace('\r\n', '\n'), expected_epg)


if __name__ == '__main__':
Expand Down

0 comments on commit d46b3aa

Please sign in to comment.