Skip to content

Commit

Permalink
Merge pull request #2 from NLeSC-GO-common-infrastructure/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
fnattino authored Apr 9, 2021
2 parents b52b9b4 + 6be1abb commit b4d1f2e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Build

on: [push, pull_request]
on: push

jobs:

Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ Change Log
All notable changes to this project will be documented in this file.
This project adheres to `Semantic Versioning <http://semver.org/>`_.

[0.1.3]

Fixed
-----
* adapt to changes in AsyncFileSystem in fsspec version 0.9.0

[0.1.1]

Changed
Expand Down
4 changes: 2 additions & 2 deletions CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ authors:
family-names: Nattino
given-names: Francesco
orcid: "https://orcid.org/0000-0003-3286-0139"
date-released: 2021-01-13
date-released: 2021-04-09
doi: "10.5281/zenodo.4436720"
version: "0.1.2"
version: "0.1.3"
repository-code: "https://github.com/NLeSC-GO-common-infrastructure/dcachefs"
keywords:
- "dCache"
Expand Down
2 changes: 1 addition & 1 deletion dcachefs/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.2'
__version__ = '0.1.3'
83 changes: 31 additions & 52 deletions dcachefs/dcachefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import weakref

from datetime import datetime
from fsspec.asyn import maybe_sync, sync, AsyncFileSystem
from fsspec.asyn import sync_wrapper, sync, AsyncFileSystem
from fsspec.implementations.http import get_client, HTTPFile, HTTPStreamFile
from fsspec.utils import DEFAULT_BLOCK_SIZE
from urllib.parse import quote
Expand Down Expand Up @@ -215,6 +215,8 @@ async def _ls(self, path, detail=True, limit=None, **kwargs):
:param kwargs: (dict) optional arguments passed on to requests
:return list of dictionaries or list of str
"""
path = self._strip_protocol(path)

info = await self._get_info(path, children=True, limit=limit,
**kwargs)
details = _get_details(path, info)
Expand All @@ -229,18 +231,11 @@ async def _ls(self, path, detail=True, limit=None, **kwargs):
else:
return [d.get('name') for d in details]

def ls(self, path, detail=True, limit=None, **kwargs):
path = self._strip_protocol(path)
return maybe_sync(
self._ls,
self,
path,
detail=detail,
limit=limit,
**kwargs
)
ls = sync_wrapper(_ls)

async def _cat_file(self, url, start=None, end=None, **kwargs):
self.webdav_url = self._get_webdav_url(url) or self.webdav_url

path = self._strip_protocol(url)
url = URL(self.webdav_url) / path
url = url.as_uri()
Expand All @@ -260,6 +255,8 @@ async def _cat_file(self, url, start=None, end=None, **kwargs):
return out

async def _get_file(self, rpath, lpath, chunk_size=5 * 2 ** 20, **kwargs):
self.webdav_url = self._get_webdav_url(rpath) or self.webdav_url

path = self._strip_protocol(rpath)
url = URL(self.webdav_url) / path
url = url.as_uri()
Expand All @@ -276,6 +273,8 @@ async def _get_file(self, rpath, lpath, chunk_size=5 * 2 ** 20, **kwargs):
fd.write(chunk)

async def _put_file(self, lpath, rpath, **kwargs):
self.webdav_url = self._get_webdav_url(rpath) or self.webdav_url

path = self._strip_protocol(rpath)
url = URL(self.webdav_url) / path
url = url.as_uri()
Expand All @@ -285,40 +284,23 @@ async def _put_file(self, lpath, rpath, **kwargs):
r = await self.session.put(url, data=fd, **self.kwargs)
r.raise_for_status()

def cat(self, path, recursive=False, on_error="raise", **kwargs):
self.webdav_url = self._get_webdav_url(path) or self.webdav_url
return super().cat(
path=path,
recursive=recursive,
on_error=on_error,
**kwargs
)

def get(self, rpath, lpath, recursive=False, **kwargs):
self.webdav_url = self._get_webdav_url(rpath) or self.webdav_url
super().get(
rpath=rpath,
lpath=lpath,
recursive=recursive,
**kwargs
)

def put(self, lpath, rpath, recursive=False, **kwargs):
self.webdav_url = self._get_webdav_url(rpath) or self.webdav_url
super().put(
lpath=lpath,
rpath=rpath,
recursive=recursive,
**kwargs
)

async def _cp_file(self, path1, path2, **kwargs):
raise NotImplementedError

async def _pipe_file(self, path1, path2, **kwargs):
raise NotImplementedError

async def _mv(self, path1, path2, **kwargs):
"""
Rename path1 to path2
:param path1: (str) source path
:param path2: (str) destination path
:param kwargs: (dict) optional arguments passed on to requests
"""
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)

url = URL(self.api_url) / 'namespace' / _encode(path1)
url = url.as_uri()
data = dict(action='mv', destination=path2)
Expand All @@ -330,17 +312,7 @@ async def _mv(self, path1, path2, **kwargs):
r.raise_for_status()
return await r.json()

def mv(self, path1, path2, **kwargs):
"""
Rename path1 to path2
:param path1: (str) source path
:param path2: (str) destination path
:param kwargs: (dict) optional arguments passed on to requests
"""
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
maybe_sync(self._mv, self, path1, path2, **kwargs)
mv = sync_wrapper(_mv)

async def _rm_file(self, path, **kwargs):
"""
Expand All @@ -362,10 +334,13 @@ async def _rm(self, path, recursive=False, **kwargs):
Asynchronous remove method. Need to delete elements from branches
towards root, awaiting tasks to be completed.
"""
path = await self._expand_path(path, recursive=recursive)
for p in reversed(path):
await asyncio.gather(self._rm_file(p, **kwargs))

def info(self, path, **kwargs):
rm = sync_wrapper(_rm)

async def _info(self, path, **kwargs):
"""
Give details about a file or a directory
Expand All @@ -374,9 +349,11 @@ def info(self, path, **kwargs):
:return (dict)
"""
path = self._strip_protocol(path)
info = maybe_sync(self._get_info, self, path, **kwargs)
info = await self._get_info(path, **kwargs)
return _get_details(path, info)

info = sync_wrapper(_info)

def created(self, path):
"""
Date and time in which the path was created
Expand Down Expand Up @@ -530,7 +507,7 @@ def flush(self, force=False):
if force and self.forced:
raise ValueError("Force flush cannot be called more than once")
if force:
maybe_sync(self._write_chunked, self)
self.write_chunked()
self.forced = True

async def _write_chunked(self):
Expand All @@ -539,6 +516,8 @@ async def _write_chunked(self):
r.raise_for_status()
return False

write_chunked = sync_wrapper(_write_chunked)

def close(self):
super(HTTPFile, self).close()

Expand Down
2 changes: 1 addition & 1 deletion tests/test_dcachefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def test_read_remote_file(test_fs):
with test_fs.open(remote_path) as f:
assert isinstance(f, dCacheFile)
assert f.read(5) == b'Hello'
assert f.cache.data == bytes(_file_content, 'utf-8')
assert f.cache.cache == bytes(_file_content, 'utf-8')
f.read(1)
assert f.read(5) == b'world'

Expand Down

0 comments on commit b4d1f2e

Please sign in to comment.