chris.client.normal

 1from contextlib import asynccontextmanager
 2from typing import Optional
 3
 4import aiohttp
 5from serde.json import from_json
 6
 7from chris.client.authed import AuthenticatedClient
 8from chris.util.errors import raise_for_status
 9from chris.models.collection_links import CollectionLinks
10from chris.models.data import UserData
11from chris.models.types import ChrisURL, Username, Password
12
13
14class ChrisClient(AuthenticatedClient[CollectionLinks, "ChrisClient"]):
15    """
16    A normal user *ChRIS* client, who may upload files and create plugin instances.
17    """
18
19    @classmethod
20    async def create_user(
21        cls,
22        url: ChrisURL | str,
23        username: Username | str,
24        password: Password | str,
25        email: str,
26        session: Optional[aiohttp.ClientSession],
27    ) -> UserData:
28        payload = {
29            "template": {
30                "data": [
31                    {"name": "email", "value": email},
32                    {"name": "username", "value": username},
33                    {"name": "password", "value": password},
34                ]
35            }
36        }
37        headers = {
38            "Content-Type": "application/vnd.collection+json",
39            "Accept": "application/json",
40        }
41        async with _optional_session(session) as session:
42            res = await session.post(url + "users/", json=payload, headers=headers)
43            await raise_for_status(res)
44            return from_json(UserData, await res.text())
45
46
47@asynccontextmanager
48async def _optional_session(session: Optional[aiohttp.ClientSession]):
49    if session is not None:
50        yield session
51        return
52    async with aiohttp.ClientSession() as session:
53        yield session
15class ChrisClient(AuthenticatedClient[CollectionLinks, "ChrisClient"]):
16    """
17    A normal user *ChRIS* client, who may upload files and create plugin instances.
18    """
19
20    @classmethod
21    async def create_user(
22        cls,
23        url: ChrisURL | str,
24        username: Username | str,
25        password: Password | str,
26        email: str,
27        session: Optional[aiohttp.ClientSession],
28    ) -> UserData:
29        payload = {
30            "template": {
31                "data": [
32                    {"name": "email", "value": email},
33                    {"name": "username", "value": username},
34                    {"name": "password", "value": password},
35                ]
36            }
37        }
38        headers = {
39            "Content-Type": "application/vnd.collection+json",
40            "Accept": "application/json",
41        }
42        async with _optional_session(session) as session:
43            res = await session.post(url + "users/", json=payload, headers=headers)
44            await raise_for_status(res)
45            return from_json(UserData, await res.text())

A normal user ChRIS client, who may upload files and create plugin instances.

@classmethod
async def create_user( cls, url: Union[chris.models.types.ChrisURL, str], username: Union[chris.models.types.Username, str], password: Union[chris.models.types.Password, str], email: str, session: Optional[aiohttp.client.ClientSession]) -> chris.models.data.UserData:
20    @classmethod
21    async def create_user(
22        cls,
23        url: ChrisURL | str,
24        username: Username | str,
25        password: Password | str,
26        email: str,
27        session: Optional[aiohttp.ClientSession],
28    ) -> UserData:
29        payload = {
30            "template": {
31                "data": [
32                    {"name": "email", "value": email},
33                    {"name": "username", "value": username},
34                    {"name": "password", "value": password},
35                ]
36            }
37        }
38        headers = {
39            "Content-Type": "application/vnd.collection+json",
40            "Accept": "application/json",
41        }
42        async with _optional_session(session) as session:
43            res = await session.post(url + "users/", json=payload, headers=headers)
44            await raise_for_status(res)
45            return from_json(UserData, await res.text())
Inherited Members
chris.link.collection_client.CollectionJsonApiClient
CollectionJsonApiClient
url
chris.client.authed.AuthenticatedClient
from_login
from_token
search_feeds
search_plugins
plugin_instances
upload_file
user
username
search_compute_resources
get_all_compute_resources
chris.client.base.BaseChrisClient
new
close
chris.link.linked.Linked
max_search_requests