chris.client.authed
1import abc 2import os 3from pathlib import Path 4from typing import Optional, Generic, Callable, Sequence 5 6import aiohttp 7from async_property import async_cached_property 8 9from chris.client.base import BaseChrisClient 10from chris.client.base import L, CSelf 11from chris.link import http 12from chris.link.linked import deserialize_res 13from chris.models.logged_in import Plugin, File, User, PluginInstance, Feed 14from chris.models.public import ComputeResource 15from chris.models.types import ChrisURL, Username, Password 16from chris.util.errors import IncorrectLoginError, raise_for_status 17from chris.util.search import Search, acollect 18 19 20class AuthenticatedClient(BaseChrisClient[L, CSelf], Generic[L, CSelf], abc.ABC): 21 """ 22 An authenticated ChRIS client. 23 """ 24 25 @classmethod 26 async def from_login( 27 cls, 28 url: str | ChrisURL, 29 username: str | Username, 30 password: str | Password, 31 max_search_requests: int = 100, 32 connector: Optional[aiohttp.TCPConnector] = None, 33 connector_owner: bool = True, 34 ) -> CSelf: 35 """ 36 Get authentication token using username and password, then construct the client. 37 38 See `chris.client.base.BaseChrisClient.new` for parameter documentation. 39 """ 40 async with aiohttp.ClientSession( 41 connector=connector, connector_owner=False 42 ) as session: 43 try: 44 c = await cls.__from_login_with( 45 url=url, 46 username=username, 47 password=password, 48 max_search_requests=max_search_requests, 49 session=session, 50 connector_owner=connector_owner, 51 ) 52 except BaseException as e: 53 if connector is None: 54 await session.connector.close() 55 raise e 56 return c 57 58 @classmethod 59 async def __from_login_with( 60 cls, 61 url: str | ChrisURL, 62 username: Username, 63 password: Password, 64 max_search_requests: int, 65 session: aiohttp.ClientSession, 66 connector_owner: bool, 67 ) -> CSelf: 68 """ 69 Get authentication token using the given session, and then construct the client. 70 """ 71 payload = {"username": username, "password": password} 72 login = await session.post(url + "auth-token/", json=payload) 73 if login.status == 400: 74 raise IncorrectLoginError(await login.text()) 75 await raise_for_status(login) 76 data = await login.json() 77 return await cls.from_token( 78 url=url, 79 token=data["token"], 80 max_search_requests=max_search_requests, 81 connector=session.connector, 82 connector_owner=connector_owner, 83 ) 84 85 @classmethod 86 async def from_token( 87 cls, 88 url: str | ChrisURL, 89 token: str, 90 max_search_requests: int, 91 connector: Optional[aiohttp.TCPConnector] = None, 92 connector_owner: Optional[bool] = True, 93 ) -> CSelf: 94 """ 95 Construct an authenticated client using the given token. 96 97 See `chris.client.base.BaseChrisClient.new` for parameter documentation. 98 """ 99 return await cls.new( 100 url=url, 101 max_search_requests=max_search_requests, 102 connector=connector, 103 connector_owner=connector_owner, 104 session_modifier=cls.__curry_token(token), 105 ) 106 107 @staticmethod 108 def __curry_token(token: str) -> Callable[[aiohttp.ClientSession], None]: 109 def add_token_to(session: aiohttp.ClientSession) -> None: 110 session.headers.update({"Authorization": "Token " + token}) 111 112 return add_token_to 113 114 # ============================================================ 115 # CUBE API methods 116 # ============================================================ 117 118 @http.search(".") 119 def search_feeds(self, **query) -> Search[Feed]: 120 """ 121 Search for feeds. 122 """ 123 ... 124 125 @http.search("plugins") 126 def search_plugins(self, **query) -> Search[Plugin]: 127 """ 128 Search for plugins. 129 """ 130 ... 131 132 @http.search("plugin_instances") 133 def plugin_instances(self, **query) -> Search[PluginInstance]: 134 """ 135 Search for plugin instances. 136 """ 137 ... 138 139 async def upload_file( 140 self, local_file: str | os.PathLike, upload_path: str 141 ) -> File: 142 """ 143 Upload a local file to *ChRIS*. 144 145 .. warning:: Uses non-async code. 146 The file is read using non-async code. 147 Performance will suffer with large files and hard drives. 148 See [aiolibs/aiohttp#7174](https://github.com/aio-libs/aiohttp/issues/7174) 149 150 Examples 151 -------- 152 153 Upload a single file: 154 155 ```python 156 chris = await ChrisClient.from_login( 157 username='chris', 158 password='chris1234', 159 url='https://cube.chrisproject.org/api/v1/' 160 ) 161 file = await chris.upload_file("./my_data.dat", 'dir/my_data.dat') 162 assert file.fname == 'chris/uploads/dir/my_data.dat' 163 ``` 164 165 Upload (in parallel) all `*.txt` files in a directory 166 `'incoming'` to `chris/uploads/big_folder`: 167 168 ```python 169 upload_jobs = ( 170 chris.upload_file(p, f'big_folder/{p}') 171 for p in Path('incoming') 172 ) 173 await asyncio.gather(upload_jobs) 174 ``` 175 176 Parameters 177 ---------- 178 local_file 179 Path of an existing local file to upload. 180 upload_path 181 A subpath of `{username}/uploads/` where to upload the file to in *CUBE* 182 """ 183 upload_path = await self._add_upload_prefix(upload_path) 184 local_file = Path(local_file) 185 with local_file.open("rb") as f: 186 data = aiohttp.FormData() 187 data.add_field("upload_path", upload_path) 188 data.add_field("fname", f, filename=local_file.name) 189 sent = self.s.post(self.collection_links.uploadedfiles, data=data) 190 return await deserialize_res( 191 sent, self, {"fname": local_file, "upload_path": upload_path}, File 192 ) 193 194 # read_stream = _file_sender(local_file, chunk_size) 195 # file_length = os.stat(local_file).st_size 196 # return await self.upload_stream(read_stream, upload_path, str(local_file), file_length) 197 198 # doesn't work: 411 Length Required 199 # async def upload_stream(self, read_stream: AsyncIterable[bytes], upload_path: str, fname: str, length: int 200 # ) -> File: 201 # """ 202 # Stream a file upload to *ChRIS*. For a higher-level wrapper which accepts 203 # a path argument instead, see `upload`. 204 # 205 # Parameters 206 # ---------- 207 # read_stream 208 # bytes stream 209 # upload_path 210 # uploadedfiles path starting with `'{username}/uploads/` 211 # fname 212 # file name to use in the multipart POST request 213 # length 214 # content length 215 # """ 216 # data = aiohttp.FormData() 217 # data.add_field('upload_path', upload_path) 218 # data.add_field('fname', read_stream, filename=fname) 219 # async with self.s.post(self.collection_links.uploadedfiles, data=data) as res: 220 # return serde.json.from_json(File, await res.text()) 221 # 222 # with aiohttp.MultipartWriter() as mpwriter: 223 # mpwriter.append_form({'upload_path': upload_path}) 224 # mpwriter.append(read_stream, headers={ 225 # 'Content-Disposition': 'form-data; name="fname"; filename="value_goes_here"' 226 # }) 227 228 async def _add_upload_prefix(self, upload_path: str) -> str: 229 upload_prefix = f"{await self.username()}/uploads/" 230 if str(upload_path).startswith(upload_prefix): 231 return upload_path 232 return f"{upload_prefix}{upload_path}" 233 234 @http.get("user") 235 async def user(self) -> User: 236 """Gets the user's information.""" 237 ... 238 239 @async_cached_property 240 async def _user(self) -> User: 241 return await self.user() 242 243 async def username(self) -> Username: 244 """ 245 Gets the username. In contrast to `self.user`, this method will use a cached API call. 246 """ 247 user = await self._user # this is weird 248 return user.username 249 250 @http.search("compute_resources") 251 def search_compute_resources(self, **query) -> Search[ComputeResource]: 252 """ 253 Search for existing compute resources. 254 255 See also 256 -------- 257 `get_all_compute_resources` : 258 """ 259 ... 260 261 async def get_all_compute_resources(self) -> Sequence[ComputeResource]: 262 """ 263 Get all compute resources. 264 265 This method exists for convenience. 266 The number of compute resources of a CUBE is typically small so it's ok. 267 268 See also 269 -------- 270 `search_compute_resources` : 271 """ 272 return await acollect(self.search_compute_resources()) 273 274 275# async def _file_sender(file_name: str | os.PathLike, chunk_size: int): 276# """ 277# Stream the reading of a file using an asynchronous generator. 278# 279# https://docs.aiohttp.org/en/stable/client_quickstart.html#streaming-uploads 280# """ 281# async with aiofiles.open(file_name, 'rb') as f: 282# chunk = await f.read(chunk_size) 283# while chunk: 284# yield chunk 285# chunk = await f.read(chunk_size)
21class AuthenticatedClient(BaseChrisClient[L, CSelf], Generic[L, CSelf], abc.ABC): 22 """ 23 An authenticated ChRIS client. 24 """ 25 26 @classmethod 27 async def from_login( 28 cls, 29 url: str | ChrisURL, 30 username: str | Username, 31 password: str | Password, 32 max_search_requests: int = 100, 33 connector: Optional[aiohttp.TCPConnector] = None, 34 connector_owner: bool = True, 35 ) -> CSelf: 36 """ 37 Get authentication token using username and password, then construct the client. 38 39 See `chris.client.base.BaseChrisClient.new` for parameter documentation. 40 """ 41 async with aiohttp.ClientSession( 42 connector=connector, connector_owner=False 43 ) as session: 44 try: 45 c = await cls.__from_login_with( 46 url=url, 47 username=username, 48 password=password, 49 max_search_requests=max_search_requests, 50 session=session, 51 connector_owner=connector_owner, 52 ) 53 except BaseException as e: 54 if connector is None: 55 await session.connector.close() 56 raise e 57 return c 58 59 @classmethod 60 async def __from_login_with( 61 cls, 62 url: str | ChrisURL, 63 username: Username, 64 password: Password, 65 max_search_requests: int, 66 session: aiohttp.ClientSession, 67 connector_owner: bool, 68 ) -> CSelf: 69 """ 70 Get authentication token using the given session, and then construct the client. 71 """ 72 payload = {"username": username, "password": password} 73 login = await session.post(url + "auth-token/", json=payload) 74 if login.status == 400: 75 raise IncorrectLoginError(await login.text()) 76 await raise_for_status(login) 77 data = await login.json() 78 return await cls.from_token( 79 url=url, 80 token=data["token"], 81 max_search_requests=max_search_requests, 82 connector=session.connector, 83 connector_owner=connector_owner, 84 ) 85 86 @classmethod 87 async def from_token( 88 cls, 89 url: str | ChrisURL, 90 token: str, 91 max_search_requests: int, 92 connector: Optional[aiohttp.TCPConnector] = None, 93 connector_owner: Optional[bool] = True, 94 ) -> CSelf: 95 """ 96 Construct an authenticated client using the given token. 97 98 See `chris.client.base.BaseChrisClient.new` for parameter documentation. 99 """ 100 return await cls.new( 101 url=url, 102 max_search_requests=max_search_requests, 103 connector=connector, 104 connector_owner=connector_owner, 105 session_modifier=cls.__curry_token(token), 106 ) 107 108 @staticmethod 109 def __curry_token(token: str) -> Callable[[aiohttp.ClientSession], None]: 110 def add_token_to(session: aiohttp.ClientSession) -> None: 111 session.headers.update({"Authorization": "Token " + token}) 112 113 return add_token_to 114 115 # ============================================================ 116 # CUBE API methods 117 # ============================================================ 118 119 @http.search(".") 120 def search_feeds(self, **query) -> Search[Feed]: 121 """ 122 Search for feeds. 123 """ 124 ... 125 126 @http.search("plugins") 127 def search_plugins(self, **query) -> Search[Plugin]: 128 """ 129 Search for plugins. 130 """ 131 ... 132 133 @http.search("plugin_instances") 134 def plugin_instances(self, **query) -> Search[PluginInstance]: 135 """ 136 Search for plugin instances. 137 """ 138 ... 139 140 async def upload_file( 141 self, local_file: str | os.PathLike, upload_path: str 142 ) -> File: 143 """ 144 Upload a local file to *ChRIS*. 145 146 .. warning:: Uses non-async code. 147 The file is read using non-async code. 148 Performance will suffer with large files and hard drives. 149 See [aiolibs/aiohttp#7174](https://github.com/aio-libs/aiohttp/issues/7174) 150 151 Examples 152 -------- 153 154 Upload a single file: 155 156 ```python 157 chris = await ChrisClient.from_login( 158 username='chris', 159 password='chris1234', 160 url='https://cube.chrisproject.org/api/v1/' 161 ) 162 file = await chris.upload_file("./my_data.dat", 'dir/my_data.dat') 163 assert file.fname == 'chris/uploads/dir/my_data.dat' 164 ``` 165 166 Upload (in parallel) all `*.txt` files in a directory 167 `'incoming'` to `chris/uploads/big_folder`: 168 169 ```python 170 upload_jobs = ( 171 chris.upload_file(p, f'big_folder/{p}') 172 for p in Path('incoming') 173 ) 174 await asyncio.gather(upload_jobs) 175 ``` 176 177 Parameters 178 ---------- 179 local_file 180 Path of an existing local file to upload. 181 upload_path 182 A subpath of `{username}/uploads/` where to upload the file to in *CUBE* 183 """ 184 upload_path = await self._add_upload_prefix(upload_path) 185 local_file = Path(local_file) 186 with local_file.open("rb") as f: 187 data = aiohttp.FormData() 188 data.add_field("upload_path", upload_path) 189 data.add_field("fname", f, filename=local_file.name) 190 sent = self.s.post(self.collection_links.uploadedfiles, data=data) 191 return await deserialize_res( 192 sent, self, {"fname": local_file, "upload_path": upload_path}, File 193 ) 194 195 # read_stream = _file_sender(local_file, chunk_size) 196 # file_length = os.stat(local_file).st_size 197 # return await self.upload_stream(read_stream, upload_path, str(local_file), file_length) 198 199 # doesn't work: 411 Length Required 200 # async def upload_stream(self, read_stream: AsyncIterable[bytes], upload_path: str, fname: str, length: int 201 # ) -> File: 202 # """ 203 # Stream a file upload to *ChRIS*. For a higher-level wrapper which accepts 204 # a path argument instead, see `upload`. 205 # 206 # Parameters 207 # ---------- 208 # read_stream 209 # bytes stream 210 # upload_path 211 # uploadedfiles path starting with `'{username}/uploads/` 212 # fname 213 # file name to use in the multipart POST request 214 # length 215 # content length 216 # """ 217 # data = aiohttp.FormData() 218 # data.add_field('upload_path', upload_path) 219 # data.add_field('fname', read_stream, filename=fname) 220 # async with self.s.post(self.collection_links.uploadedfiles, data=data) as res: 221 # return serde.json.from_json(File, await res.text()) 222 # 223 # with aiohttp.MultipartWriter() as mpwriter: 224 # mpwriter.append_form({'upload_path': upload_path}) 225 # mpwriter.append(read_stream, headers={ 226 # 'Content-Disposition': 'form-data; name="fname"; filename="value_goes_here"' 227 # }) 228 229 async def _add_upload_prefix(self, upload_path: str) -> str: 230 upload_prefix = f"{await self.username()}/uploads/" 231 if str(upload_path).startswith(upload_prefix): 232 return upload_path 233 return f"{upload_prefix}{upload_path}" 234 235 @http.get("user") 236 async def user(self) -> User: 237 """Gets the user's information.""" 238 ... 239 240 @async_cached_property 241 async def _user(self) -> User: 242 return await self.user() 243 244 async def username(self) -> Username: 245 """ 246 Gets the username. In contrast to `self.user`, this method will use a cached API call. 247 """ 248 user = await self._user # this is weird 249 return user.username 250 251 @http.search("compute_resources") 252 def search_compute_resources(self, **query) -> Search[ComputeResource]: 253 """ 254 Search for existing compute resources. 255 256 See also 257 -------- 258 `get_all_compute_resources` : 259 """ 260 ... 261 262 async def get_all_compute_resources(self) -> Sequence[ComputeResource]: 263 """ 264 Get all compute resources. 265 266 This method exists for convenience. 267 The number of compute resources of a CUBE is typically small so it's ok. 268 269 See also 270 -------- 271 `search_compute_resources` : 272 """ 273 return await acollect(self.search_compute_resources())
An authenticated ChRIS client.
26 @classmethod 27 async def from_login( 28 cls, 29 url: str | ChrisURL, 30 username: str | Username, 31 password: str | Password, 32 max_search_requests: int = 100, 33 connector: Optional[aiohttp.TCPConnector] = None, 34 connector_owner: bool = True, 35 ) -> CSelf: 36 """ 37 Get authentication token using username and password, then construct the client. 38 39 See `chris.client.base.BaseChrisClient.new` for parameter documentation. 40 """ 41 async with aiohttp.ClientSession( 42 connector=connector, connector_owner=False 43 ) as session: 44 try: 45 c = await cls.__from_login_with( 46 url=url, 47 username=username, 48 password=password, 49 max_search_requests=max_search_requests, 50 session=session, 51 connector_owner=connector_owner, 52 ) 53 except BaseException as e: 54 if connector is None: 55 await session.connector.close() 56 raise e 57 return c
Get authentication token using username and password, then construct the client.
See chris.client.base.BaseChrisClient.new
for parameter documentation.
86 @classmethod 87 async def from_token( 88 cls, 89 url: str | ChrisURL, 90 token: str, 91 max_search_requests: int, 92 connector: Optional[aiohttp.TCPConnector] = None, 93 connector_owner: Optional[bool] = True, 94 ) -> CSelf: 95 """ 96 Construct an authenticated client using the given token. 97 98 See `chris.client.base.BaseChrisClient.new` for parameter documentation. 99 """ 100 return await cls.new( 101 url=url, 102 max_search_requests=max_search_requests, 103 connector=connector, 104 connector_owner=connector_owner, 105 session_modifier=cls.__curry_token(token), 106 )
Construct an authenticated client using the given token.
See chris.client.base.BaseChrisClient.new
for parameter documentation.
119 @http.search(".") 120 def search_feeds(self, **query) -> Search[Feed]: 121 """ 122 Search for feeds. 123 """ 124 ...
Search for feeds.
126 @http.search("plugins") 127 def search_plugins(self, **query) -> Search[Plugin]: 128 """ 129 Search for plugins. 130 """ 131 ...
Search for plugins.
133 @http.search("plugin_instances") 134 def plugin_instances(self, **query) -> Search[PluginInstance]: 135 """ 136 Search for plugin instances. 137 """ 138 ...
Search for plugin instances.
140 async def upload_file( 141 self, local_file: str | os.PathLike, upload_path: str 142 ) -> File: 143 """ 144 Upload a local file to *ChRIS*. 145 146 .. warning:: Uses non-async code. 147 The file is read using non-async code. 148 Performance will suffer with large files and hard drives. 149 See [aiolibs/aiohttp#7174](https://github.com/aio-libs/aiohttp/issues/7174) 150 151 Examples 152 -------- 153 154 Upload a single file: 155 156 ```python 157 chris = await ChrisClient.from_login( 158 username='chris', 159 password='chris1234', 160 url='https://cube.chrisproject.org/api/v1/' 161 ) 162 file = await chris.upload_file("./my_data.dat", 'dir/my_data.dat') 163 assert file.fname == 'chris/uploads/dir/my_data.dat' 164 ``` 165 166 Upload (in parallel) all `*.txt` files in a directory 167 `'incoming'` to `chris/uploads/big_folder`: 168 169 ```python 170 upload_jobs = ( 171 chris.upload_file(p, f'big_folder/{p}') 172 for p in Path('incoming') 173 ) 174 await asyncio.gather(upload_jobs) 175 ``` 176 177 Parameters 178 ---------- 179 local_file 180 Path of an existing local file to upload. 181 upload_path 182 A subpath of `{username}/uploads/` where to upload the file to in *CUBE* 183 """ 184 upload_path = await self._add_upload_prefix(upload_path) 185 local_file = Path(local_file) 186 with local_file.open("rb") as f: 187 data = aiohttp.FormData() 188 data.add_field("upload_path", upload_path) 189 data.add_field("fname", f, filename=local_file.name) 190 sent = self.s.post(self.collection_links.uploadedfiles, data=data) 191 return await deserialize_res( 192 sent, self, {"fname": local_file, "upload_path": upload_path}, File 193 ) 194 195 # read_stream = _file_sender(local_file, chunk_size) 196 # file_length = os.stat(local_file).st_size 197 # return await self.upload_stream(read_stream, upload_path, str(local_file), file_length)
Upload a local file to ChRIS.
Uses non-async code.
The file is read using non-async code. Performance will suffer with large files and hard drives. See aiolibs/aiohttp#7174
Examples
Upload a single file:
chris = await ChrisClient.from_login(
username='chris',
password='chris1234',
url='https://cube.chrisproject.org/api/v1/'
)
file = await chris.upload_file("./my_data.dat", 'dir/my_data.dat')
assert file.fname == 'chris/uploads/dir/my_data.dat'
Upload (in parallel) all *.txt
files in a directory
'incoming'
to chris/uploads/big_folder
:
upload_jobs = (
chris.upload_file(p, f'big_folder/{p}')
for p in Path('incoming')
)
await asyncio.gather(upload_jobs)
Parameters
- local_file: Path of an existing local file to upload.
- upload_path: A subpath of
{username}/uploads/
where to upload the file to in CUBE
235 @http.get("user") 236 async def user(self) -> User: 237 """Gets the user's information.""" 238 ...
Gets the user's information.
244 async def username(self) -> Username: 245 """ 246 Gets the username. In contrast to `self.user`, this method will use a cached API call. 247 """ 248 user = await self._user # this is weird 249 return user.username
Gets the username. In contrast to self.user
, this method will use a cached API call.
262 async def get_all_compute_resources(self) -> Sequence[ComputeResource]: 263 """ 264 Get all compute resources. 265 266 This method exists for convenience. 267 The number of compute resources of a CUBE is typically small so it's ok. 268 269 See also 270 -------- 271 `search_compute_resources` : 272 """ 273 return await acollect(self.search_compute_resources())
Get all compute resources.
This method exists for convenience. The number of compute resources of a CUBE is typically small so it's ok.
See also
Inherited Members
- chris.link.collection_client.CollectionJsonApiClient
- CollectionJsonApiClient
- url
- collection_links
- chris.link.linked.Linked
- max_search_requests