chris.client.normal
1from contextlib import asynccontextmanager 2from typing import Optional 3 4import aiohttp 5from serde.json import from_json 6 7from chris.client.authed import AuthenticatedClient 8from chris.util.errors import raise_for_status 9from chris.models.collection_links import CollectionLinks 10from chris.models.data import UserData 11from chris.models.types import ChrisURL, Username, Password 12 13 14class ChrisClient(AuthenticatedClient[CollectionLinks, "ChrisClient"]): 15 """ 16 A normal user *ChRIS* client, who may upload files and create plugin instances. 17 """ 18 19 @classmethod 20 async def create_user( 21 cls, 22 url: ChrisURL | str, 23 username: Username | str, 24 password: Password | str, 25 email: str, 26 session: Optional[aiohttp.ClientSession], 27 ) -> UserData: 28 payload = { 29 "template": { 30 "data": [ 31 {"name": "email", "value": email}, 32 {"name": "username", "value": username}, 33 {"name": "password", "value": password}, 34 ] 35 } 36 } 37 headers = { 38 "Content-Type": "application/vnd.collection+json", 39 "Accept": "application/json", 40 } 41 async with _optional_session(session) as session: 42 res = await session.post(url + "users/", json=payload, headers=headers) 43 await raise_for_status(res) 44 return from_json(UserData, await res.text()) 45 46 47@asynccontextmanager 48async def _optional_session(session: Optional[aiohttp.ClientSession]): 49 if session is not None: 50 yield session 51 return 52 async with aiohttp.ClientSession() as session: 53 yield session
class
ChrisClient(chris.client.authed.AuthenticatedClient[chris.models.collection_links.CollectionLinks, 'ChrisClient']):
15class ChrisClient(AuthenticatedClient[CollectionLinks, "ChrisClient"]): 16 """ 17 A normal user *ChRIS* client, who may upload files and create plugin instances. 18 """ 19 20 @classmethod 21 async def create_user( 22 cls, 23 url: ChrisURL | str, 24 username: Username | str, 25 password: Password | str, 26 email: str, 27 session: Optional[aiohttp.ClientSession], 28 ) -> UserData: 29 payload = { 30 "template": { 31 "data": [ 32 {"name": "email", "value": email}, 33 {"name": "username", "value": username}, 34 {"name": "password", "value": password}, 35 ] 36 } 37 } 38 headers = { 39 "Content-Type": "application/vnd.collection+json", 40 "Accept": "application/json", 41 } 42 async with _optional_session(session) as session: 43 res = await session.post(url + "users/", json=payload, headers=headers) 44 await raise_for_status(res) 45 return from_json(UserData, await res.text())
A normal user ChRIS client, who may upload files and create plugin instances.
@classmethod
async def
create_user( cls, url: Union[chris.models.types.ChrisURL, str], username: Union[chris.models.types.Username, str], password: Union[chris.models.types.Password, str], email: str, session: Optional[aiohttp.client.ClientSession]) -> chris.models.data.UserData:
20 @classmethod 21 async def create_user( 22 cls, 23 url: ChrisURL | str, 24 username: Username | str, 25 password: Password | str, 26 email: str, 27 session: Optional[aiohttp.ClientSession], 28 ) -> UserData: 29 payload = { 30 "template": { 31 "data": [ 32 {"name": "email", "value": email}, 33 {"name": "username", "value": username}, 34 {"name": "password", "value": password}, 35 ] 36 } 37 } 38 headers = { 39 "Content-Type": "application/vnd.collection+json", 40 "Accept": "application/json", 41 } 42 async with _optional_session(session) as session: 43 res = await session.post(url + "users/", json=payload, headers=headers) 44 await raise_for_status(res) 45 return from_json(UserData, await res.text())
Inherited Members
- chris.link.collection_client.CollectionJsonApiClient
- CollectionJsonApiClient
- url
- collection_links
- chris.client.authed.AuthenticatedClient
- from_login
- from_token
- search_feeds
- search_plugins
- plugin_instances
- upload_file
- user
- username
- search_compute_resources
- get_all_compute_resources
- chris.link.linked.Linked
- max_search_requests