aiochris
ChRIS Python client library built on aiohttp (async HTTP client) and pyserde (dataclasses deserializer).
Installation
Requires Python 3.11 or 3.12.
pip install aiochris
# or
rye add aiochris
Brief Example
from aiochris import ChrisClient
chris = await ChrisClient.from_login(
username='chris',
password='chris1234',
url='https://cube.chrisproject.org/api/v1/'
)
dircopy = await chris.search_plugins(name_exact='pl-brainmgz', version='2.0.3').get_only()
plinst = await dircopy.create_instance(compute_resource_name='host')
await plinst.set(title="copies brain image files into feed")
Introduction
aiochris
provides three core classes: AnonChrisClient
, ChrisClient
, and ChrisAdminClient
.
These clients differ in permissions.
Methods are only defined for what the client has permission to see or do.
anon_client = await AnonChrisClient.from_url('https://cube.chrisproject.org/api/v1/')
# ok: can search for plugins without logging in...
plugin = await anon_client.search_plugins(name_exact='pl-mri10yr06mo01da_normal').first()
# IMPOSSIBLE! AnonChrisClient.create_instance not defined...
await plugin.create_instance()
# IMPOSSIBLE! authentication required for ChrisClient
authed_client = await ChrisClient.from_url('https://cube.chrisproject.org/api/v1/')
authed_client = await ChrisClient.from_login(
url='https://cube.chrisproject.org/api/v1/',
username='chris',
password='chris1234'
)
# authenticated client can also search for plugins
plugin = await authed_client.search_plugins(name_exact='pl-mri10yr06mo01da_normal').first()
await plugin.create_instance() # works!
Client Constructors
AnonChrisClient.from_url
: create a CUBE client without logging in.ChrisClient.from_login
: create a CUBE client using a username and password.ChrisClient.from_token
: create a CUBE client using a token from/api/v1/auth-token/
.ChrisClient.from_chrs
: create a CUBE client using logins saved bychrs
.ChrisAdminClient.from_login
: create an admin client using a username and password.ChrisAdminClient.from_token
: create an admin client using a token from/api/v1/auth-token/
.
aiochris in Jupyter Notebook
Jupyter and IPython support top-level await
. This, in conjunction with ChrisClient.from_chrs
,
make aiochris
a great way to use _ChRIS_ interactively with code.
For a walkthrough, see https://github.com/FNNDSC/aiochris/blob/master/examples/aiochris_as_a_shell.ipynb
Working with aiohttp
aiochris
hides the implementation detail that it is built upon aiohttp
,
however one thing is important to keep in mind:
be sure to call ChrisClient.close
at the end of your program.
chris = await ChrisClient.from_login(...)
# -- snip --
await chris.close()
You can also use an asynchronous context manager.
async with (await ChrisClient.from_login(...)) as chris:
await chris.upload_file('./something.dat', 'something.dat')
...
Efficiency with Multiple Clients
If using more than one aiohttp
client in an application, it's more efficient
to use the same
connector.
One connector instance should be shared among every client object,
including all aiochris
clients and other aiohttp
clients.
Example: efficiently using multiple aiohttp clients
import aiohttp
from aiochris import ChrisClient
with aiohttp.TCPConnector() as connector:
chris_client1 = await ChrisClient.from_login(
url='https://example.com/cube/api/v1/',
username='user1',
password='user1234',
connector=connector,
connector_owner=False
)
chris_client2 = await ChrisClient.from_login(
url='https://example.com/cube/api/v1/',
username='user2',
password='user4321',
connector=connector,
connector_owner=False
)
plain_http_client = aiohttp.ClientSession(connector=connector, connector_owner=False)
Advice for Getting Started
Searching for things (plugins, plugin instances, files) in CUBE is a common task,
and CUBE often returns multiple items per response.
Hence, it is important to understand how the Search
helper class works.
It simplifies how we interact with paginated collection responses from CUBE.
When performing batch operations, use
asyncio.gather
to run async functions concurrently.
aiochris
uses many generic types, so it is recommended you use an IDE
with good support for type hints, such as
PyCharm
or VSCodium with
Pylance configured.
Examples
Create a client given username and password
from aiochris import ChrisClient
chris = await ChrisClient.from_login(
url='https://cube.chrisproject.org/api/v1/',
username='chris',
password='chris1234'
)
Search for a plugin
# it's recommended to specify plugin version
plugin = await chris.search_plugins(name_exact="pl-dcm2niix", version="0.1.0").get_only()
# but if you don't care about plugin version...
plugin = await chris.search_plugins(name_exact="pl-dcm2niix").first()
Create a feed by uploading a file
uploaded_file = await chris.upload_file('./brain.nii', 'my_experiment/brain.nii')
dircopy = await chris.search_plugins(name_exact='pl-dircopy', version="2.1.1").get_only()
plinst = await dircopy.create_instance(dir=uploaded_file.parent)
feed = await plinst.get_feed()
await feed.set(name="An experiment on uploaded file brain.nii")
Run a ds-type ChRIS plugin
# search for plugin to run
plugin = await chris.search_plugins(name_exact="pl-dcm2niix", version="0.1.0").get_only()
# search for parent node
previous = await chris.plugin_instances(id=44).get_only()
await plugin.create_instance(
previous=previous, # required. alternatively, specify previous_id
title="convert DICOM to NIFTI", # optional
compute_resource_name="galena", # optional
memory_limit="2000Mi", # optional
d=9, # optional parameter of pl-dcm2niix
)
Search for plugin instances
finished_freesurfers = chris.plugin_instances(
plugin_name_exact='pl-fshack',
status='finishedSuccessfully'
)
async for p in finished_freesurfers:
print(f'"{p.title}" finished on date: {p.end_date}')
Delete all plugin instances with a given title, in parallel
import asyncio
from aiochris import ChrisClient, acollect
chris = ChrisClient.from_login(...)
search = chris.plugin_instances(title="delete me")
plugin_instances = await acollect(search)
await asyncio.gather(*(p.delete() for p in plugin_instances))
Enable Debug Logging
A log message will be printed to stderr before every HTTP request is sent.
import logging
logging.basicConfig(level=logging.DEBUG)
1""" 2.. include:: ./home.md 3 4.. include:: ./examples.md 5""" 6 7import aiochris.client 8import aiochris.models 9import aiochris.util 10from aiochris.util.search import Search, acollect 11from aiochris.client.normal import ChrisClient 12from aiochris.client.anon import AnonChrisClient 13from aiochris.client.admin import ChrisAdminClient 14from aiochris.enums import Status, ParameterTypeName 15 16__all__ = [ 17 "AnonChrisClient", 18 "ChrisClient", 19 "ChrisAdminClient", 20 "Search", 21 "acollect", 22 "Status", 23 "ParameterTypeName", 24 "client", 25 "models", 26 "util", 27 "errors", 28 "types", 29]
13class AnonChrisClient(BaseChrisClient[AnonymousCollectionLinks, "AnonChrisClient"]): 14 """ 15 An anonymous ChRIS client. It has access to read-only GET resources, 16 such as being able to search for plugins. 17 """ 18 19 @classmethod 20 async def from_url( 21 cls, 22 url: str, 23 max_search_requests: int = 100, 24 connector: Optional[aiohttp.BaseConnector] = None, 25 connector_owner: bool = True, 26 ) -> "AnonChrisClient": 27 """ 28 Create an anonymous client. 29 30 See `aiochris.client.base.BaseChrisClient.new` for parameter documentation. 31 """ 32 return await cls.new( 33 url=url, 34 max_search_requests=max_search_requests, 35 connector=connector, 36 connector_owner=connector_owner, 37 ) 38 39 @http.search("plugins") 40 def search_plugins(self, **query) -> Search[PublicPlugin]: 41 """ 42 Search for plugins. 43 44 Since this client is not logged in, you cannot create plugin instances using this client. 45 """ 46 ...
An anonymous ChRIS client. It has access to read-only GET resources, such as being able to search for plugins.
19 @classmethod 20 async def from_url( 21 cls, 22 url: str, 23 max_search_requests: int = 100, 24 connector: Optional[aiohttp.BaseConnector] = None, 25 connector_owner: bool = True, 26 ) -> "AnonChrisClient": 27 """ 28 Create an anonymous client. 29 30 See `aiochris.client.base.BaseChrisClient.new` for parameter documentation. 31 """ 32 return await cls.new( 33 url=url, 34 max_search_requests=max_search_requests, 35 connector=connector, 36 connector_owner=connector_owner, 37 )
Create an anonymous client.
See aiochris.client.base.BaseChrisClient.new
for parameter documentation.
39 @http.search("plugins") 40 def search_plugins(self, **query) -> Search[PublicPlugin]: 41 """ 42 Search for plugins. 43 44 Since this client is not logged in, you cannot create plugin instances using this client. 45 """ 46 ...
Search for plugins.
Since this client is not logged in, you cannot create plugin instances using this client.
Inherited Members
- aiochris.link.collection_client.CollectionJsonApiClient
- CollectionJsonApiClient
- url
- collection_links
- aiochris.link.linked.Linked
- s
- max_search_requests
15class ChrisClient(AuthenticatedClient[CollectionLinks, "ChrisClient"]): 16 """ 17 A normal user *ChRIS* client, who may upload files and create plugin instances. 18 """ 19 20 @classmethod 21 async def create_user( 22 cls, 23 url: ChrisURL | str, 24 username: Username | str, 25 password: Password | str, 26 email: str, 27 session: Optional[aiohttp.ClientSession] = None, 28 ) -> UserData: 29 payload = { 30 "template": { 31 "data": [ 32 {"name": "email", "value": email}, 33 {"name": "username", "value": username}, 34 {"name": "password", "value": password}, 35 ] 36 } 37 } 38 headers = { 39 "Content-Type": "application/vnd.collection+json", 40 "Accept": "application/json", 41 } 42 async with _optional_session(session) as session: 43 res = await session.post(url + "users/", json=payload, headers=headers) 44 await raise_for_status(res) 45 return from_json(UserData, await res.text())
A normal user ChRIS client, who may upload files and create plugin instances.
20 @classmethod 21 async def create_user( 22 cls, 23 url: ChrisURL | str, 24 username: Username | str, 25 password: Password | str, 26 email: str, 27 session: Optional[aiohttp.ClientSession] = None, 28 ) -> UserData: 29 payload = { 30 "template": { 31 "data": [ 32 {"name": "email", "value": email}, 33 {"name": "username", "value": username}, 34 {"name": "password", "value": password}, 35 ] 36 } 37 } 38 headers = { 39 "Content-Type": "application/vnd.collection+json", 40 "Accept": "application/json", 41 } 42 async with _optional_session(session) as session: 43 res = await session.post(url + "users/", json=payload, headers=headers) 44 await raise_for_status(res) 45 return from_json(UserData, await res.text())
Inherited Members
- aiochris.link.collection_client.CollectionJsonApiClient
- CollectionJsonApiClient
- url
- collection_links
- aiochris.client.authed.AuthenticatedClient
- from_login
- from_token
- from_chrs
- search_feeds
- search_plugins
- plugin_instances
- upload_file
- user
- username
- search_compute_resources
- get_all_compute_resources
- search_pacsfiles
- aiochris.link.linked.Linked
- s
- max_search_requests
33class ChrisAdminClient(AuthenticatedClient[AdminCollectionLinks, "ChrisAdminClient"]): 34 """ 35 A client who has access to `/chris-admin/`. Admins can register new plugins and 36 add new compute resources. 37 """ 38 39 @http.post("admin") 40 async def _register_plugin_from_store_raw( 41 self, plugin_store_url: str, compute_names: str 42 ) -> Plugin: ... 43 44 async def register_plugin_from_store( 45 self, plugin_store_url: PluginUrl, compute_names: Iterable[ComputeResourceName] 46 ) -> Plugin: 47 """ 48 Register a plugin from a ChRIS Store. 49 """ 50 return await self._register_plugin_from_store_raw( 51 plugin_store_url=plugin_store_url, compute_names=",".join(compute_names) 52 ) 53 54 async def add_plugin( 55 self, 56 plugin_description: str | dict, 57 compute_resources: str 58 | ComputeResource 59 | Iterable[ComputeResource | ComputeResourceName], 60 ) -> Plugin: 61 """ 62 Add a plugin to *CUBE*. 63 64 Examples 65 -------- 66 67 ```python 68 cmd = ['docker', 'run', '--rm', 'fnndsc/pl-mri-preview', 'chris_plugin_info'] 69 output = subprocess.check_output(cmd, text=True) 70 desc = json.loads(output) 71 desc['name'] = 'pl-mri-preview' 72 desc['public_repo'] = 'https://github.com/FNNDSC/pl-mri-preview' 73 desc['dock_image'] = 'fnndsc/pl-mri-preview' 74 75 await chris_admin.add_plugin(plugin_description=desc, compute_resources='host') 76 ``` 77 78 The example above is just for show. It's not a good example for several reasons: 79 80 - Calls blocking function `subprocess.check_output` in asynchronous context 81 - It is preferred to use a versioned string for `dock_image` 82 - `host` compute environment is not guaranteed to exist. Instead, you could 83 call `aiochris.client.authed.AuthenticatedClient.search_compute_resources` 84 or `aiochris.client.authed.AuthenticatedClient.get_all_compute_resources`: 85 86 ```python 87 all_computes = await chris_admin.get_all_compute_resources() 88 await chris_admin.add_plugin(plugin_description=desc, compute_resources=all_computes) 89 ``` 90 91 Parameters 92 ---------- 93 plugin_description: str | dict 94 JSON description of a plugin. 95 [spec](https://github.com/FNNDSC/CHRIS_docs/blob/5078aaf934bdbe313e85367f88aff7c14730a1d4/specs/ChRIS_Plugins.adoc#descriptor_file) 96 compute_resources 97 Compute resources to register the plugin to. Value can be either a comma-separated `str` of names, 98 a `aiochris.models.public.ComputeResource`, a sequence of `aiochris.models.public.ComputeResource`, 99 or a sequence of compute resource names as `str`. 100 """ 101 compute_names = _serialize_crs(compute_resources) 102 if not isinstance(plugin_description, str): 103 plugin_description = json.dumps(plugin_description) 104 data = aiohttp.FormData() 105 data.add_field( 106 "fname", 107 io.StringIO(plugin_description), 108 filename="aiochris_add_plugin.json", 109 ) 110 data.add_field("compute_names", compute_names) 111 async with self.s.post(self.collection_links.admin, data=data) as res: 112 await raise_for_status(res) 113 return deserialize_linked(self, Plugin, await res.json()) 114 115 async def create_compute_resource( 116 self, 117 name: str | ComputeResourceName, 118 compute_url: str | PfconUrl, 119 compute_user: str, 120 compute_password: str, 121 compute_innetwork: bool = None, 122 description: str = None, 123 compute_auth_url: str = None, 124 compute_auth_token: str = None, 125 max_job_exec_seconds: str = None, 126 ) -> ComputeResource: 127 """ 128 Define a new compute resource. 129 """ 130 return await (await self._admin).create_compute_resource( 131 name=name, 132 compute_url=compute_url, 133 compute_user=compute_user, 134 compute_password=compute_password, 135 compute_innetwork=compute_innetwork, 136 description=description, 137 compute_auth_url=compute_auth_url, 138 compute_auth_token=compute_auth_token, 139 max_job_exec_seconds=max_job_exec_seconds, 140 ) 141 142 @async_cached_property 143 async def _admin(self) -> _AdminApiClient: 144 """ 145 Get a (sub-)client for `/chris-admin/api/v1/` 146 """ 147 res = await self.s.get(self.collection_links.admin) 148 body = await res.json() 149 links = from_dict(AdminApiCollectionLinks, body["collection_links"]) 150 return _AdminApiClient( 151 url=self.collection_links.admin, 152 s=self.s, 153 collection_links=links, 154 max_search_requests=self.max_search_requests, 155 )
A client who has access to /chris-admin/
. Admins can register new plugins and
add new compute resources.
44 async def register_plugin_from_store( 45 self, plugin_store_url: PluginUrl, compute_names: Iterable[ComputeResourceName] 46 ) -> Plugin: 47 """ 48 Register a plugin from a ChRIS Store. 49 """ 50 return await self._register_plugin_from_store_raw( 51 plugin_store_url=plugin_store_url, compute_names=",".join(compute_names) 52 )
Register a plugin from a ChRIS Store.
54 async def add_plugin( 55 self, 56 plugin_description: str | dict, 57 compute_resources: str 58 | ComputeResource 59 | Iterable[ComputeResource | ComputeResourceName], 60 ) -> Plugin: 61 """ 62 Add a plugin to *CUBE*. 63 64 Examples 65 -------- 66 67 ```python 68 cmd = ['docker', 'run', '--rm', 'fnndsc/pl-mri-preview', 'chris_plugin_info'] 69 output = subprocess.check_output(cmd, text=True) 70 desc = json.loads(output) 71 desc['name'] = 'pl-mri-preview' 72 desc['public_repo'] = 'https://github.com/FNNDSC/pl-mri-preview' 73 desc['dock_image'] = 'fnndsc/pl-mri-preview' 74 75 await chris_admin.add_plugin(plugin_description=desc, compute_resources='host') 76 ``` 77 78 The example above is just for show. It's not a good example for several reasons: 79 80 - Calls blocking function `subprocess.check_output` in asynchronous context 81 - It is preferred to use a versioned string for `dock_image` 82 - `host` compute environment is not guaranteed to exist. Instead, you could 83 call `aiochris.client.authed.AuthenticatedClient.search_compute_resources` 84 or `aiochris.client.authed.AuthenticatedClient.get_all_compute_resources`: 85 86 ```python 87 all_computes = await chris_admin.get_all_compute_resources() 88 await chris_admin.add_plugin(plugin_description=desc, compute_resources=all_computes) 89 ``` 90 91 Parameters 92 ---------- 93 plugin_description: str | dict 94 JSON description of a plugin. 95 [spec](https://github.com/FNNDSC/CHRIS_docs/blob/5078aaf934bdbe313e85367f88aff7c14730a1d4/specs/ChRIS_Plugins.adoc#descriptor_file) 96 compute_resources 97 Compute resources to register the plugin to. Value can be either a comma-separated `str` of names, 98 a `aiochris.models.public.ComputeResource`, a sequence of `aiochris.models.public.ComputeResource`, 99 or a sequence of compute resource names as `str`. 100 """ 101 compute_names = _serialize_crs(compute_resources) 102 if not isinstance(plugin_description, str): 103 plugin_description = json.dumps(plugin_description) 104 data = aiohttp.FormData() 105 data.add_field( 106 "fname", 107 io.StringIO(plugin_description), 108 filename="aiochris_add_plugin.json", 109 ) 110 data.add_field("compute_names", compute_names) 111 async with self.s.post(self.collection_links.admin, data=data) as res: 112 await raise_for_status(res) 113 return deserialize_linked(self, Plugin, await res.json())
Add a plugin to CUBE.
Examples
cmd = ['docker', 'run', '--rm', 'fnndsc/pl-mri-preview', 'chris_plugin_info']
output = subprocess.check_output(cmd, text=True)
desc = json.loads(output)
desc['name'] = 'pl-mri-preview'
desc['public_repo'] = 'https://github.com/FNNDSC/pl-mri-preview'
desc['dock_image'] = 'fnndsc/pl-mri-preview'
await chris_admin.add_plugin(plugin_description=desc, compute_resources='host')
The example above is just for show. It's not a good example for several reasons:
- Calls blocking function
subprocess.check_output
in asynchronous context - It is preferred to use a versioned string for
dock_image
host
compute environment is not guaranteed to exist. Instead, you could callaiochris.client.authed.AuthenticatedClient.search_compute_resources
oraiochris.client.authed.AuthenticatedClient.get_all_compute_resources
:
all_computes = await chris_admin.get_all_compute_resources()
await chris_admin.add_plugin(plugin_description=desc, compute_resources=all_computes)
Parameters
- plugin_description (str | dict): JSON description of a plugin. spec
- compute_resources: Compute resources to register the plugin to. Value can be either a comma-separated
str
of names, aaiochris.models.public.ComputeResource
, a sequence ofaiochris.models.public.ComputeResource
, or a sequence of compute resource names asstr
.
115 async def create_compute_resource( 116 self, 117 name: str | ComputeResourceName, 118 compute_url: str | PfconUrl, 119 compute_user: str, 120 compute_password: str, 121 compute_innetwork: bool = None, 122 description: str = None, 123 compute_auth_url: str = None, 124 compute_auth_token: str = None, 125 max_job_exec_seconds: str = None, 126 ) -> ComputeResource: 127 """ 128 Define a new compute resource. 129 """ 130 return await (await self._admin).create_compute_resource( 131 name=name, 132 compute_url=compute_url, 133 compute_user=compute_user, 134 compute_password=compute_password, 135 compute_innetwork=compute_innetwork, 136 description=description, 137 compute_auth_url=compute_auth_url, 138 compute_auth_token=compute_auth_token, 139 max_job_exec_seconds=max_job_exec_seconds, 140 )
Define a new compute resource.
Inherited Members
- aiochris.link.collection_client.CollectionJsonApiClient
- CollectionJsonApiClient
- url
- collection_links
- aiochris.client.authed.AuthenticatedClient
- from_login
- from_token
- from_chrs
- search_feeds
- search_plugins
- plugin_instances
- upload_file
- user
- username
- search_compute_resources
- get_all_compute_resources
- search_pacsfiles
- aiochris.link.linked.Linked
- s
- max_search_requests
42@dataclass 43class Search(Generic[T], AsyncIterable[T]): 44 """ 45 Abstraction over paginated collection responses from *CUBE*. 46 `Search` objects are returned by methods for search endpoints of the *CUBE* API. 47 It is an [asynchronous iterable](https://docs.python.org/3/glossary.html#term-asynchronous-iterable) 48 which produces items from responses that return multiple results. 49 HTTP requests are fired as-neede, they happen in the background during iteration. 50 No request is made before the first time a `Search` object is called. 51 52 .. note:: Pagination is handled internally and automatically. 53 The query parameters `limit` and `offset` can be explicitly given, but they shouldn't. 54 55 Examples 56 -------- 57 58 Use an `async for` loop to print the name of every feed: 59 60 ```python 61 all_feeds = chris.search_feeds() # returns a Search[Feed] 62 async for feed in all_feeds: 63 print(feed.name) 64 ``` 65 """ 66 67 base_url: str 68 params: dict[str, Any] 69 client: Linked 70 Item: Type[T] 71 max_requests: int = 100 72 subpath: str = "search/" 73 74 def __aiter__(self) -> AsyncIterator[T]: 75 return self._paginate(self.url) 76 77 async def first(self) -> Optional[T]: 78 """ 79 Get the first item. 80 81 See also 82 -------- 83 `get_only` : similar use, but more strict 84 """ 85 return await anext(self._first_aiter(), None) 86 87 async def get_only(self, allow_multiple=False) -> T: 88 """ 89 Get the *only* item from a search with one result. 90 91 Examples 92 -------- 93 94 This method is very commonly used for getting "one thing" from CUBE. 95 96 ```python 97 await chris.search_plugins(name_exact="pl-dircopy", version="2.1.1").get_only() 98 ``` 99 100 In the example above, a search for plugins given (`name_exact`, `version`) 101 is guaranteed to return either 0 or 1 result. 102 103 Raises 104 ------ 105 aiochris.util.search.NoneSearchError 106 If this search is empty. 107 aiochris.util.search.ManySearchError 108 If this search has more than one item and `allow_multiple` is `False` 109 110 See also 111 -------- 112 `first` : does the same thing but without checks. 113 114 Parameters 115 ---------- 116 allow_multiple: bool 117 if `True`, do not raise `ManySearchError` if `count > 1` 118 """ 119 one = await self._get_one() 120 if one.count == 0: 121 raise NoneSearchError(self.url) 122 if not allow_multiple and one.count > 1: 123 raise ManySearchError(self.url) 124 if len(one.results) < 1: 125 raise NonsenseResponseError( 126 f"Response has count={one.count} but the results are empty.", one 127 ) 128 return deserialize_linked(self.client, self.Item, one.results[0]) 129 130 async def count(self) -> int: 131 """ 132 Get the number of items in this collection search. 133 134 Examples 135 -------- 136 137 `count` is useful for rendering a progress bar. TODO example with files 138 """ 139 one = await self._get_one() 140 return one.count 141 142 async def _get_one(self) -> _Paginated: 143 async with self.client.s.get(self._first_url) as res: 144 await raise_for_status(res) 145 return from_json(_Paginated, await res.text()) 146 147 def _paginate(self, url: yarl.URL) -> AsyncIterator[T]: 148 return _get_paginated( 149 client=self.client, 150 url=url, 151 item_type=self.Item, 152 max_requests=self.max_requests, 153 ) 154 155 @property 156 def url(self) -> yarl.URL: 157 return self._search_url_with(self.params) 158 159 def _first_aiter(self) -> AsyncIterator[T]: 160 return self._paginate(self._first_url) 161 162 @property 163 def _first_url(self) -> yarl.URL: 164 params = copy.copy(self.params) 165 params["limit"] = 1 166 params["offset"] = 0 167 return self._search_url_with(params) 168 169 @property 170 def _search_url(self) -> yarl.URL: 171 return yarl.URL(self.base_url) / self.subpath 172 173 def _search_url_with(self, query: dict[str, Any]): 174 return yarl.URL(self._search_url).with_query(query)
Abstraction over paginated collection responses from CUBE.
Search
objects are returned by methods for search endpoints of the CUBE API.
It is an asynchronous iterable
which produces items from responses that return multiple results.
HTTP requests are fired as-neede, they happen in the background during iteration.
No request is made before the first time a Search
object is called.
Pagination is handled internally and automatically.
The query parameters limit
and offset
can be explicitly given, but they shouldn't.
Examples
Use an async for
loop to print the name of every feed:
all_feeds = chris.search_feeds() # returns a Search[Feed]
async for feed in all_feeds:
print(feed.name)
87 async def get_only(self, allow_multiple=False) -> T: 88 """ 89 Get the *only* item from a search with one result. 90 91 Examples 92 -------- 93 94 This method is very commonly used for getting "one thing" from CUBE. 95 96 ```python 97 await chris.search_plugins(name_exact="pl-dircopy", version="2.1.1").get_only() 98 ``` 99 100 In the example above, a search for plugins given (`name_exact`, `version`) 101 is guaranteed to return either 0 or 1 result. 102 103 Raises 104 ------ 105 aiochris.util.search.NoneSearchError 106 If this search is empty. 107 aiochris.util.search.ManySearchError 108 If this search has more than one item and `allow_multiple` is `False` 109 110 See also 111 -------- 112 `first` : does the same thing but without checks. 113 114 Parameters 115 ---------- 116 allow_multiple: bool 117 if `True`, do not raise `ManySearchError` if `count > 1` 118 """ 119 one = await self._get_one() 120 if one.count == 0: 121 raise NoneSearchError(self.url) 122 if not allow_multiple and one.count > 1: 123 raise ManySearchError(self.url) 124 if len(one.results) < 1: 125 raise NonsenseResponseError( 126 f"Response has count={one.count} but the results are empty.", one 127 ) 128 return deserialize_linked(self.client, self.Item, one.results[0])
Get the only item from a search with one result.
Examples
This method is very commonly used for getting "one thing" from CUBE.
await chris.search_plugins(name_exact="pl-dircopy", version="2.1.1").get_only()
In the example above, a search for plugins given (name_exact
, version
)
is guaranteed to return either 0 or 1 result.
Raises
- aiochris.util.search.NoneSearchError: If this search is empty.
- aiochris.util.search.ManySearchError: If this search has more than one item and
allow_multiple
isFalse
See also
first
: does the same thing but without checks.
Parameters
- allow_multiple (bool):
if
True
, do not raiseManySearchError
ifcount > 1
130 async def count(self) -> int: 131 """ 132 Get the number of items in this collection search. 133 134 Examples 135 -------- 136 137 `count` is useful for rendering a progress bar. TODO example with files 138 """ 139 one = await self._get_one() 140 return one.count
Get the number of items in this collection search.
Examples
count
is useful for rendering a progress bar. TODO example with files
204async def acollect(async_iterable: AsyncIterable[T]) -> list[T]: 205 """ 206 Simple helper to convert a `Search` to a [`list`](https://docs.python.org/3/library/stdtypes.html#list). 207 208 Using this function is not recommended unless you can assume the collection is small. 209 """ 210 # nb: using tuple here causes 211 # TypeError: 'async_generator' object is not iterable 212 # return tuple(e async for e in async_iterable) 213 return [e async for e in async_iterable]
5class Status(enum.Enum): 6 """ 7 Possible statuses of a plugin instance. 8 """ 9 10 created = "created" 11 waiting = "waiting" 12 scheduled = "scheduled" 13 started = "started" 14 registeringFiles = "registeringFiles" 15 finishedSuccessfully = "finishedSuccessfully" 16 finishedWithError = "finishedWithError" 17 cancelled = "cancelled"
Possible statuses of a plugin instance.
Inherited Members
- enum.Enum
- name
- value
20class ParameterTypeName(enum.Enum): 21 """ 22 Plugin parameter types. 23 """ 24 25 string = "string" 26 integer = "integer" 27 float = "float" 28 boolean = "boolean"
Plugin parameter types.
Inherited Members
- enum.Enum
- name
- value