chris.client.authed

  1import abc
  2import os
  3from pathlib import Path
  4from typing import Optional, Generic, Callable, Sequence
  5
  6import aiohttp
  7from async_property import async_cached_property
  8
  9from chris.client.base import BaseChrisClient
 10from chris.client.base import L, CSelf
 11from chris.link import http
 12from chris.link.linked import deserialize_res
 13from chris.models.logged_in import Plugin, File, User, PluginInstance, Feed
 14from chris.models.public import ComputeResource
 15from chris.models.types import ChrisURL, Username, Password
 16from chris.util.errors import IncorrectLoginError, raise_for_status
 17from chris.util.search import Search, acollect
 18
 19
 20class AuthenticatedClient(BaseChrisClient[L, CSelf], Generic[L, CSelf], abc.ABC):
 21    """
 22    An authenticated ChRIS client.
 23    """
 24
 25    @classmethod
 26    async def from_login(
 27        cls,
 28        url: str | ChrisURL,
 29        username: str | Username,
 30        password: str | Password,
 31        max_search_requests: int = 100,
 32        connector: Optional[aiohttp.TCPConnector] = None,
 33        connector_owner: bool = True,
 34    ) -> CSelf:
 35        """
 36        Get authentication token using username and password, then construct the client.
 37
 38        See `chris.client.base.BaseChrisClient.new` for parameter documentation.
 39        """
 40        async with aiohttp.ClientSession(
 41            connector=connector, connector_owner=False
 42        ) as session:
 43            try:
 44                c = await cls.__from_login_with(
 45                    url=url,
 46                    username=username,
 47                    password=password,
 48                    max_search_requests=max_search_requests,
 49                    session=session,
 50                    connector_owner=connector_owner,
 51                )
 52            except BaseException as e:
 53                if connector is None:
 54                    await session.connector.close()
 55                raise e
 56        return c
 57
 58    @classmethod
 59    async def __from_login_with(
 60        cls,
 61        url: str | ChrisURL,
 62        username: Username,
 63        password: Password,
 64        max_search_requests: int,
 65        session: aiohttp.ClientSession,
 66        connector_owner: bool,
 67    ) -> CSelf:
 68        """
 69        Get authentication token using the given session, and then construct the client.
 70        """
 71        payload = {"username": username, "password": password}
 72        login = await session.post(url + "auth-token/", json=payload)
 73        if login.status == 400:
 74            raise IncorrectLoginError(await login.text())
 75        await raise_for_status(login)
 76        data = await login.json()
 77        return await cls.from_token(
 78            url=url,
 79            token=data["token"],
 80            max_search_requests=max_search_requests,
 81            connector=session.connector,
 82            connector_owner=connector_owner,
 83        )
 84
 85    @classmethod
 86    async def from_token(
 87        cls,
 88        url: str | ChrisURL,
 89        token: str,
 90        max_search_requests: int,
 91        connector: Optional[aiohttp.TCPConnector] = None,
 92        connector_owner: Optional[bool] = True,
 93    ) -> CSelf:
 94        """
 95        Construct an authenticated client using the given token.
 96
 97        See `chris.client.base.BaseChrisClient.new` for parameter documentation.
 98        """
 99        return await cls.new(
100            url=url,
101            max_search_requests=max_search_requests,
102            connector=connector,
103            connector_owner=connector_owner,
104            session_modifier=cls.__curry_token(token),
105        )
106
107    @staticmethod
108    def __curry_token(token: str) -> Callable[[aiohttp.ClientSession], None]:
109        def add_token_to(session: aiohttp.ClientSession) -> None:
110            session.headers.update({"Authorization": "Token " + token})
111
112        return add_token_to
113
114    # ============================================================
115    # CUBE API methods
116    # ============================================================
117
118    @http.search(".")
119    def search_feeds(self, **query) -> Search[Feed]:
120        """
121        Search for feeds.
122        """
123        ...
124
125    @http.search("plugins")
126    def search_plugins(self, **query) -> Search[Plugin]:
127        """
128        Search for plugins.
129        """
130        ...
131
132    @http.search("plugin_instances")
133    def plugin_instances(self, **query) -> Search[PluginInstance]:
134        """
135        Search for plugin instances.
136        """
137        ...
138
139    async def upload_file(
140        self, local_file: str | os.PathLike, upload_path: str
141    ) -> File:
142        """
143        Upload a local file to *ChRIS*.
144
145        .. warning:: Uses non-async code.
146                     The file is read using non-async code.
147                     Performance will suffer with large files and hard drives.
148                     See [aiolibs/aiohttp#7174](https://github.com/aio-libs/aiohttp/issues/7174)
149
150        Examples
151        --------
152
153        Upload a single file:
154
155        ```python
156        chris = await ChrisClient.from_login(
157            username='chris',
158            password='chris1234',
159            url='https://cube.chrisproject.org/api/v1/'
160        )
161        file = await chris.upload_file("./my_data.dat", 'dir/my_data.dat')
162        assert file.fname == 'chris/uploads/dir/my_data.dat'
163        ```
164
165        Upload (in parallel) all `*.txt` files in a directory
166        `'incoming'` to `chris/uploads/big_folder`:
167
168        ```python
169        upload_jobs = (
170            chris.upload_file(p, f'big_folder/{p}')
171            for p in Path('incoming')
172        )
173        await asyncio.gather(upload_jobs)
174        ```
175
176        Parameters
177        ----------
178        local_file
179            Path of an existing local file to upload.
180        upload_path
181            A subpath of `{username}/uploads/` where to upload the file to in *CUBE*
182        """
183        upload_path = await self._add_upload_prefix(upload_path)
184        local_file = Path(local_file)
185        with local_file.open("rb") as f:
186            data = aiohttp.FormData()
187            data.add_field("upload_path", upload_path)
188            data.add_field("fname", f, filename=local_file.name)
189            sent = self.s.post(self.collection_links.uploadedfiles, data=data)
190            return await deserialize_res(
191                sent, self, {"fname": local_file, "upload_path": upload_path}, File
192            )
193
194        # read_stream = _file_sender(local_file, chunk_size)
195        # file_length = os.stat(local_file).st_size
196        # return await self.upload_stream(read_stream, upload_path, str(local_file), file_length)
197
198    # doesn't work: 411 Length Required
199    # async def upload_stream(self, read_stream: AsyncIterable[bytes], upload_path: str, fname: str, length: int
200    #                         ) -> File:
201    #     """
202    #     Stream a file upload to *ChRIS*. For a higher-level wrapper which accepts
203    #     a path argument instead, see `upload`.
204    #
205    #     Parameters
206    #     ----------
207    #     read_stream
208    #         bytes stream
209    #     upload_path
210    #         uploadedfiles path starting with `'{username}/uploads/`
211    #     fname
212    #         file name to use in the multipart POST request
213    #     length
214    #         content length
215    #     """
216    #     data = aiohttp.FormData()
217    #     data.add_field('upload_path', upload_path)
218    #     data.add_field('fname', read_stream, filename=fname)
219    #     async with self.s.post(self.collection_links.uploadedfiles, data=data) as res:
220    #         return serde.json.from_json(File, await res.text())
221    #
222    #     with aiohttp.MultipartWriter() as mpwriter:
223    #         mpwriter.append_form({'upload_path': upload_path})
224    #         mpwriter.append(read_stream, headers={
225    #             'Content-Disposition': 'form-data; name="fname"; filename="value_goes_here"'
226    #         })
227
228    async def _add_upload_prefix(self, upload_path: str) -> str:
229        upload_prefix = f"{await self.username()}/uploads/"
230        if str(upload_path).startswith(upload_prefix):
231            return upload_path
232        return f"{upload_prefix}{upload_path}"
233
234    @http.get("user")
235    async def user(self) -> User:
236        """Gets the user's information."""
237        ...
238
239    @async_cached_property
240    async def _user(self) -> User:
241        return await self.user()
242
243    async def username(self) -> Username:
244        """
245        Gets the username. In contrast to `self.user`, this method will use a cached API call.
246        """
247        user = await self._user  # this is weird
248        return user.username
249
250    @http.search("compute_resources")
251    def search_compute_resources(self, **query) -> Search[ComputeResource]:
252        """
253        Search for existing compute resources.
254
255        See also
256        --------
257        `get_all_compute_resources` :
258        """
259        ...
260
261    async def get_all_compute_resources(self) -> Sequence[ComputeResource]:
262        """
263        Get all compute resources.
264
265        This method exists for convenience.
266        The number of compute resources of a CUBE is typically small so it's ok.
267
268        See also
269        --------
270        `search_compute_resources` :
271        """
272        return await acollect(self.search_compute_resources())
273
274
275# async def _file_sender(file_name: str | os.PathLike, chunk_size: int):
276#     """
277#     Stream the reading of a file using an asynchronous generator.
278#
279#     https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads
280#     """
281#     async with aiofiles.open(file_name, 'rb') as f:
282#         chunk = await f.read(chunk_size)
283#         while chunk:
284#             yield chunk
285#             chunk = await f.read(chunk_size)
class AuthenticatedClient(chris.client.base.BaseChrisClient[~L, ~CSelf], typing.Generic[~L, ~CSelf], abc.ABC):
 21class AuthenticatedClient(BaseChrisClient[L, CSelf], Generic[L, CSelf], abc.ABC):
 22    """
 23    An authenticated ChRIS client.
 24    """
 25
 26    @classmethod
 27    async def from_login(
 28        cls,
 29        url: str | ChrisURL,
 30        username: str | Username,
 31        password: str | Password,
 32        max_search_requests: int = 100,
 33        connector: Optional[aiohttp.TCPConnector] = None,
 34        connector_owner: bool = True,
 35    ) -> CSelf:
 36        """
 37        Get authentication token using username and password, then construct the client.
 38
 39        See `chris.client.base.BaseChrisClient.new` for parameter documentation.
 40        """
 41        async with aiohttp.ClientSession(
 42            connector=connector, connector_owner=False
 43        ) as session:
 44            try:
 45                c = await cls.__from_login_with(
 46                    url=url,
 47                    username=username,
 48                    password=password,
 49                    max_search_requests=max_search_requests,
 50                    session=session,
 51                    connector_owner=connector_owner,
 52                )
 53            except BaseException as e:
 54                if connector is None:
 55                    await session.connector.close()
 56                raise e
 57        return c
 58
 59    @classmethod
 60    async def __from_login_with(
 61        cls,
 62        url: str | ChrisURL,
 63        username: Username,
 64        password: Password,
 65        max_search_requests: int,
 66        session: aiohttp.ClientSession,
 67        connector_owner: bool,
 68    ) -> CSelf:
 69        """
 70        Get authentication token using the given session, and then construct the client.
 71        """
 72        payload = {"username": username, "password": password}
 73        login = await session.post(url + "auth-token/", json=payload)
 74        if login.status == 400:
 75            raise IncorrectLoginError(await login.text())
 76        await raise_for_status(login)
 77        data = await login.json()
 78        return await cls.from_token(
 79            url=url,
 80            token=data["token"],
 81            max_search_requests=max_search_requests,
 82            connector=session.connector,
 83            connector_owner=connector_owner,
 84        )
 85
 86    @classmethod
 87    async def from_token(
 88        cls,
 89        url: str | ChrisURL,
 90        token: str,
 91        max_search_requests: int,
 92        connector: Optional[aiohttp.TCPConnector] = None,
 93        connector_owner: Optional[bool] = True,
 94    ) -> CSelf:
 95        """
 96        Construct an authenticated client using the given token.
 97
 98        See `chris.client.base.BaseChrisClient.new` for parameter documentation.
 99        """
100        return await cls.new(
101            url=url,
102            max_search_requests=max_search_requests,
103            connector=connector,
104            connector_owner=connector_owner,
105            session_modifier=cls.__curry_token(token),
106        )
107
108    @staticmethod
109    def __curry_token(token: str) -> Callable[[aiohttp.ClientSession], None]:
110        def add_token_to(session: aiohttp.ClientSession) -> None:
111            session.headers.update({"Authorization": "Token " + token})
112
113        return add_token_to
114
115    # ============================================================
116    # CUBE API methods
117    # ============================================================
118
119    @http.search(".")
120    def search_feeds(self, **query) -> Search[Feed]:
121        """
122        Search for feeds.
123        """
124        ...
125
126    @http.search("plugins")
127    def search_plugins(self, **query) -> Search[Plugin]:
128        """
129        Search for plugins.
130        """
131        ...
132
133    @http.search("plugin_instances")
134    def plugin_instances(self, **query) -> Search[PluginInstance]:
135        """
136        Search for plugin instances.
137        """
138        ...
139
140    async def upload_file(
141        self, local_file: str | os.PathLike, upload_path: str
142    ) -> File:
143        """
144        Upload a local file to *ChRIS*.
145
146        .. warning:: Uses non-async code.
147                     The file is read using non-async code.
148                     Performance will suffer with large files and hard drives.
149                     See [aiolibs/aiohttp#7174](https://github.com/aio-libs/aiohttp/issues/7174)
150
151        Examples
152        --------
153
154        Upload a single file:
155
156        ```python
157        chris = await ChrisClient.from_login(
158            username='chris',
159            password='chris1234',
160            url='https://cube.chrisproject.org/api/v1/'
161        )
162        file = await chris.upload_file("./my_data.dat", 'dir/my_data.dat')
163        assert file.fname == 'chris/uploads/dir/my_data.dat'
164        ```
165
166        Upload (in parallel) all `*.txt` files in a directory
167        `'incoming'` to `chris/uploads/big_folder`:
168
169        ```python
170        upload_jobs = (
171            chris.upload_file(p, f'big_folder/{p}')
172            for p in Path('incoming')
173        )
174        await asyncio.gather(upload_jobs)
175        ```
176
177        Parameters
178        ----------
179        local_file
180            Path of an existing local file to upload.
181        upload_path
182            A subpath of `{username}/uploads/` where to upload the file to in *CUBE*
183        """
184        upload_path = await self._add_upload_prefix(upload_path)
185        local_file = Path(local_file)
186        with local_file.open("rb") as f:
187            data = aiohttp.FormData()
188            data.add_field("upload_path", upload_path)
189            data.add_field("fname", f, filename=local_file.name)
190            sent = self.s.post(self.collection_links.uploadedfiles, data=data)
191            return await deserialize_res(
192                sent, self, {"fname": local_file, "upload_path": upload_path}, File
193            )
194
195        # read_stream = _file_sender(local_file, chunk_size)
196        # file_length = os.stat(local_file).st_size
197        # return await self.upload_stream(read_stream, upload_path, str(local_file), file_length)
198
199    # doesn't work: 411 Length Required
200    # async def upload_stream(self, read_stream: AsyncIterable[bytes], upload_path: str, fname: str, length: int
201    #                         ) -> File:
202    #     """
203    #     Stream a file upload to *ChRIS*. For a higher-level wrapper which accepts
204    #     a path argument instead, see `upload`.
205    #
206    #     Parameters
207    #     ----------
208    #     read_stream
209    #         bytes stream
210    #     upload_path
211    #         uploadedfiles path starting with `'{username}/uploads/`
212    #     fname
213    #         file name to use in the multipart POST request
214    #     length
215    #         content length
216    #     """
217    #     data = aiohttp.FormData()
218    #     data.add_field('upload_path', upload_path)
219    #     data.add_field('fname', read_stream, filename=fname)
220    #     async with self.s.post(self.collection_links.uploadedfiles, data=data) as res:
221    #         return serde.json.from_json(File, await res.text())
222    #
223    #     with aiohttp.MultipartWriter() as mpwriter:
224    #         mpwriter.append_form({'upload_path': upload_path})
225    #         mpwriter.append(read_stream, headers={
226    #             'Content-Disposition': 'form-data; name="fname"; filename="value_goes_here"'
227    #         })
228
229    async def _add_upload_prefix(self, upload_path: str) -> str:
230        upload_prefix = f"{await self.username()}/uploads/"
231        if str(upload_path).startswith(upload_prefix):
232            return upload_path
233        return f"{upload_prefix}{upload_path}"
234
235    @http.get("user")
236    async def user(self) -> User:
237        """Gets the user's information."""
238        ...
239
240    @async_cached_property
241    async def _user(self) -> User:
242        return await self.user()
243
244    async def username(self) -> Username:
245        """
246        Gets the username. In contrast to `self.user`, this method will use a cached API call.
247        """
248        user = await self._user  # this is weird
249        return user.username
250
251    @http.search("compute_resources")
252    def search_compute_resources(self, **query) -> Search[ComputeResource]:
253        """
254        Search for existing compute resources.
255
256        See also
257        --------
258        `get_all_compute_resources` :
259        """
260        ...
261
262    async def get_all_compute_resources(self) -> Sequence[ComputeResource]:
263        """
264        Get all compute resources.
265
266        This method exists for convenience.
267        The number of compute resources of a CUBE is typically small so it's ok.
268
269        See also
270        --------
271        `search_compute_resources` :
272        """
273        return await acollect(self.search_compute_resources())

An authenticated ChRIS client.

@classmethod
async def from_login( cls, url: Union[str, chris.models.types.ChrisURL], username: Union[str, chris.models.types.Username], password: Union[str, chris.models.types.Password], max_search_requests: int = 100, connector: Optional[aiohttp.connector.TCPConnector] = None, connector_owner: bool = True) -> ~CSelf:
26    @classmethod
27    async def from_login(
28        cls,
29        url: str | ChrisURL,
30        username: str | Username,
31        password: str | Password,
32        max_search_requests: int = 100,
33        connector: Optional[aiohttp.TCPConnector] = None,
34        connector_owner: bool = True,
35    ) -> CSelf:
36        """
37        Get authentication token using username and password, then construct the client.
38
39        See `chris.client.base.BaseChrisClient.new` for parameter documentation.
40        """
41        async with aiohttp.ClientSession(
42            connector=connector, connector_owner=False
43        ) as session:
44            try:
45                c = await cls.__from_login_with(
46                    url=url,
47                    username=username,
48                    password=password,
49                    max_search_requests=max_search_requests,
50                    session=session,
51                    connector_owner=connector_owner,
52                )
53            except BaseException as e:
54                if connector is None:
55                    await session.connector.close()
56                raise e
57        return c

Get authentication token using username and password, then construct the client.

See chris.client.base.BaseChrisClient.new for parameter documentation.

@classmethod
async def from_token( cls, url: Union[str, chris.models.types.ChrisURL], token: str, max_search_requests: int, connector: Optional[aiohttp.connector.TCPConnector] = None, connector_owner: Optional[bool] = True) -> ~CSelf:
 86    @classmethod
 87    async def from_token(
 88        cls,
 89        url: str | ChrisURL,
 90        token: str,
 91        max_search_requests: int,
 92        connector: Optional[aiohttp.TCPConnector] = None,
 93        connector_owner: Optional[bool] = True,
 94    ) -> CSelf:
 95        """
 96        Construct an authenticated client using the given token.
 97
 98        See `chris.client.base.BaseChrisClient.new` for parameter documentation.
 99        """
100        return await cls.new(
101            url=url,
102            max_search_requests=max_search_requests,
103            connector=connector,
104            connector_owner=connector_owner,
105            session_modifier=cls.__curry_token(token),
106        )

Construct an authenticated client using the given token.

See chris.client.base.BaseChrisClient.new for parameter documentation.

@http.search('.')
def search_feeds(self, **query) -> chris.util.search.Search[chris.models.logged_in.Feed]:
119    @http.search(".")
120    def search_feeds(self, **query) -> Search[Feed]:
121        """
122        Search for feeds.
123        """
124        ...

Search for feeds.

@http.search('plugins')
def search_plugins(self, **query) -> chris.util.search.Search[chris.models.logged_in.Plugin]:
126    @http.search("plugins")
127    def search_plugins(self, **query) -> Search[Plugin]:
128        """
129        Search for plugins.
130        """
131        ...

Search for plugins.

@http.search('plugin_instances')
def plugin_instances( self, **query) -> chris.util.search.Search[chris.models.logged_in.PluginInstance]:
133    @http.search("plugin_instances")
134    def plugin_instances(self, **query) -> Search[PluginInstance]:
135        """
136        Search for plugin instances.
137        """
138        ...

Search for plugin instances.

async def upload_file( self, local_file: str | os.PathLike, upload_path: str) -> chris.models.logged_in.File:
140    async def upload_file(
141        self, local_file: str | os.PathLike, upload_path: str
142    ) -> File:
143        """
144        Upload a local file to *ChRIS*.
145
146        .. warning:: Uses non-async code.
147                     The file is read using non-async code.
148                     Performance will suffer with large files and hard drives.
149                     See [aiolibs/aiohttp#7174](https://github.com/aio-libs/aiohttp/issues/7174)
150
151        Examples
152        --------
153
154        Upload a single file:
155
156        ```python
157        chris = await ChrisClient.from_login(
158            username='chris',
159            password='chris1234',
160            url='https://cube.chrisproject.org/api/v1/'
161        )
162        file = await chris.upload_file("./my_data.dat", 'dir/my_data.dat')
163        assert file.fname == 'chris/uploads/dir/my_data.dat'
164        ```
165
166        Upload (in parallel) all `*.txt` files in a directory
167        `'incoming'` to `chris/uploads/big_folder`:
168
169        ```python
170        upload_jobs = (
171            chris.upload_file(p, f'big_folder/{p}')
172            for p in Path('incoming')
173        )
174        await asyncio.gather(upload_jobs)
175        ```
176
177        Parameters
178        ----------
179        local_file
180            Path of an existing local file to upload.
181        upload_path
182            A subpath of `{username}/uploads/` where to upload the file to in *CUBE*
183        """
184        upload_path = await self._add_upload_prefix(upload_path)
185        local_file = Path(local_file)
186        with local_file.open("rb") as f:
187            data = aiohttp.FormData()
188            data.add_field("upload_path", upload_path)
189            data.add_field("fname", f, filename=local_file.name)
190            sent = self.s.post(self.collection_links.uploadedfiles, data=data)
191            return await deserialize_res(
192                sent, self, {"fname": local_file, "upload_path": upload_path}, File
193            )
194
195        # read_stream = _file_sender(local_file, chunk_size)
196        # file_length = os.stat(local_file).st_size
197        # return await self.upload_stream(read_stream, upload_path, str(local_file), file_length)

Upload a local file to ChRIS.

Uses non-async code.

The file is read using non-async code. Performance will suffer with large files and hard drives. See aiolibs/aiohttp#7174

Examples

Upload a single file:

chris = await ChrisClient.from_login(
    username='chris',
    password='chris1234',
    url='https://cube.chrisproject.org/api/v1/'
)
file = await chris.upload_file("./my_data.dat", 'dir/my_data.dat')
assert file.fname == 'chris/uploads/dir/my_data.dat'

Upload (in parallel) all *.txt files in a directory 'incoming' to chris/uploads/big_folder:

upload_jobs = (
    chris.upload_file(p, f'big_folder/{p}')
    for p in Path('incoming')
)
await asyncio.gather(upload_jobs)
Parameters
  • local_file: Path of an existing local file to upload.
  • upload_path: A subpath of {username}/uploads/ where to upload the file to in CUBE
@http.get('user')
async def user(self) -> chris.models.logged_in.User:
235    @http.get("user")
236    async def user(self) -> User:
237        """Gets the user's information."""
238        ...

Gets the user's information.

async def username(self) -> chris.models.types.Username:
244    async def username(self) -> Username:
245        """
246        Gets the username. In contrast to `self.user`, this method will use a cached API call.
247        """
248        user = await self._user  # this is weird
249        return user.username

Gets the username. In contrast to self.user, this method will use a cached API call.

@http.search('compute_resources')
def search_compute_resources( self, **query) -> chris.util.search.Search[chris.models.public.ComputeResource]:
251    @http.search("compute_resources")
252    def search_compute_resources(self, **query) -> Search[ComputeResource]:
253        """
254        Search for existing compute resources.
255
256        See also
257        --------
258        `get_all_compute_resources` :
259        """
260        ...

Search for existing compute resources.

See also

get_all_compute_resources :

async def get_all_compute_resources(self) -> Sequence[chris.models.public.ComputeResource]:
262    async def get_all_compute_resources(self) -> Sequence[ComputeResource]:
263        """
264        Get all compute resources.
265
266        This method exists for convenience.
267        The number of compute resources of a CUBE is typically small so it's ok.
268
269        See also
270        --------
271        `search_compute_resources` :
272        """
273        return await acollect(self.search_compute_resources())

Get all compute resources.

This method exists for convenience. The number of compute resources of a CUBE is typically small so it's ok.

See also

search_compute_resources :

Inherited Members
chris.link.collection_client.CollectionJsonApiClient
CollectionJsonApiClient
url
chris.client.base.BaseChrisClient
new
close
chris.link.linked.Linked
max_search_requests