chris.util.search

  1import copy
  2import logging
  3from dataclasses import dataclass
  4from typing import (
  5    Optional,
  6    TypeVar,
  7    AsyncGenerator,
  8    Type,
  9    AsyncIterable,
 10    Any,
 11    Generic,
 12    AsyncIterator,
 13)
 14
 15import aiohttp
 16import yarl
 17from serde import deserialize
 18from serde.json import from_json
 19
 20from chris.link.linked import deserialize_linked, Linked
 21from chris.util.errors import BaseClientError, raise_for_status, NonsenseResponseError
 22
 23logger = logging.getLogger(__name__)
 24
 25T = TypeVar("T")
 26
 27
 28@deserialize
 29class _Paginated:
 30    """
 31    Response from a paginated endpoint.
 32    """
 33
 34    count: int
 35    next: Optional[str]
 36    previous: Optional[str]
 37    results: list[Any]
 38
 39
 40@dataclass
 41class Search(Generic[T], AsyncIterable[T]):
 42    """
 43    Abstraction over paginated collection responses from *CUBE*.
 44    `Search` objects are returned by methods for search endpoints of the *CUBE* API.
 45    It is an [asynchronous iterable](https://docs.python.org/3/glossary.html#term-asynchronous-iterable)
 46    which produces items from responses that return multiple results.
 47    HTTP requests are fired as-neede, they happen in the background during iteration.
 48    No request is made before the first time a `Search` object is called.
 49
 50    .. note:: Pagination is handled internally and automatically.
 51             The query parameters `limit` and `offset` can be explicitly given, but they shouldn't.
 52
 53    Examples
 54    --------
 55
 56    Use an `async for` loop to print the name of every feed:
 57
 58    ```python
 59    all_feeds = chris.search_feeds()  # returns a Search[Feed]
 60    async for feed in all_feeds:
 61        print(feed.name)
 62    ```
 63    """
 64
 65    base_url: str
 66    params: dict[str, Any]
 67    client: Linked
 68    Item: Type[T]
 69    max_requests: int = 100
 70
 71    def __aiter__(self) -> AsyncIterator[T]:
 72        return self._paginate(self.url)
 73
 74    async def first(self) -> Optional[T]:
 75        """
 76        Get the first item.
 77
 78        See also
 79        --------
 80        `get_only` : similar use, but more strict
 81        """
 82        return await anext(self._first_aiter(), None)
 83
 84    async def get_only(self, allow_multiple=False) -> T:
 85        """
 86        Get the *only* item from a search with one result.
 87
 88        Examples
 89        --------
 90
 91        This method is very commonly used for getting "one thing" from CUBE.
 92
 93        ```python
 94        await chris.search_plugins(name_exact="pl-dircopy", version="2.1.1").get_only()
 95        ```
 96
 97        In the example above, a search for plugins given (`name_exact`, `version`)
 98        is guaranteed to return either 0 or 1 result.
 99
100        Raises
101        ------
102        chris.util.search.NoneSearchError
103            If this search is empty.
104        chris.util.search.ManySearchError
105            If this search has more than one item and `allow_multiple` is `False`
106
107        See also
108        --------
109        `first` : does the same thing but without checks.
110
111        Parameters
112        ----------
113        allow_multiple: bool
114            if `True`, do not raise `ManySearchError` if `count > 1`
115        """
116        one = await self._get_one()
117        if one.count == 0:
118            raise NoneSearchError(self.url)
119        if not allow_multiple and one.count > 1:
120            raise ManySearchError(self.url)
121        if len(one.results) < 1:
122            raise NonsenseResponseError(
123                f"Response has count={one.count} but the results are empty.", one
124            )
125        return deserialize_linked(self.client, self.Item, one.results[0])
126
127    async def count(self) -> int:
128        """
129        Get the number of items in this collection search.
130
131        Examples
132        --------
133
134        `count` is useful for rendering a progress bar. TODO example with files
135        """
136        one = await self._get_one()
137        return one.count
138
139    async def _get_one(self) -> _Paginated:
140        async with self.client.s.get(self._first_url) as res:
141            await raise_for_status(res)
142            return from_json(_Paginated, await res.text())
143
144    def _paginate(self, url: yarl.URL) -> AsyncIterator[T]:
145        return _get_paginated(
146            client=self.client,
147            url=url,
148            item_type=self.Item,
149            max_requests=self.max_requests,
150        )
151
152    @property
153    def url(self) -> yarl.URL:
154        return self._search_url_with(self.params)
155
156    def _first_aiter(self) -> AsyncIterator[T]:
157        return self._paginate(self._first_url)
158
159    @property
160    def _first_url(self) -> yarl.URL:
161        params = copy.copy(self.params)
162        params["limit"] = 1
163        params["offset"] = 0
164        return self._search_url_with(params)
165
166    @property
167    def _search_url(self) -> yarl.URL:
168        return yarl.URL(self.base_url) / "search/"
169
170    def _search_url_with(self, query: dict[str, Any]):
171        return yarl.URL(self._search_url).with_query(query)
172
173
174async def _get_paginated(
175    client: Linked,
176    url: yarl.URL | str,
177    item_type: Type[T],
178    max_requests: int,
179) -> AsyncGenerator[T, None]:
180    """
181    Make HTTP GET requests to a paginated endpoint. Further requests to the
182    "next" URL are made in the background as needed.
183    """
184    logger.debug("GET, max_requests=%d, --> %s", max_requests, url)
185    if max_requests != -1 and max_requests == 0:
186        raise TooMuchPaginationError(
187            f"too many requests made to {url}. "
188            f"If this is expected, then pass the argument max_search_requests=-1 to "
189            f"the client constructor classmethod."
190        )
191    async with client.s.get(url) as res:  # N.B. not checking for 4XX, 5XX statuses
192        data: _Paginated = from_json(_Paginated, await res.text())
193        for element in data.results:
194            yield deserialize_linked(client, item_type, element)
195    if data.next is not None:
196        next_results = _get_paginated(client, data.next, item_type, max_requests - 1)
197        async for next_element in next_results:
198            yield next_element
199
200
201async def acollect(async_iterable: AsyncIterable[T]) -> list[T]:
202    """
203    Simple helper to convert a `Search` to a [`list`](https://docs.python.org/3/library/stdtypes.html#list).
204
205    Using this function is not recommended unless you can assume the collection is small.
206    """
207    # nb: using tuple here causes
208    #     TypeError: 'async_generator' object is not iterable
209    # return tuple(e async for e in async_iterable)
210    return [e async for e in async_iterable]
211
212
213class TooMuchPaginationError(BaseClientError):
214    """Specified maximum number of requests exceeded while retrieving results from a paginated resource."""
215
216    pass
217
218
219class GetOnlyError(BaseClientError):
220    """Search does not have exactly one result."""
221
222    pass
223
224
225class NoneSearchError(GetOnlyError):
226    """A search expected to have at least one element, has none."""
227
228    pass
229
230
231class ManySearchError(GetOnlyError):
232    """A search expected to have only one result, has several."""
233
234    pass
async def acollect(async_iterable: AsyncIterable[~T]) -> list[~T]:
202async def acollect(async_iterable: AsyncIterable[T]) -> list[T]:
203    """
204    Simple helper to convert a `Search` to a [`list`](https://docs.python.org/3/library/stdtypes.html#list).
205
206    Using this function is not recommended unless you can assume the collection is small.
207    """
208    # nb: using tuple here causes
209    #     TypeError: 'async_generator' object is not iterable
210    # return tuple(e async for e in async_iterable)
211    return [e async for e in async_iterable]

Simple helper to convert a Search to a list.

Using this function is not recommended unless you can assume the collection is small.

class TooMuchPaginationError(chris.util.errors.BaseClientError):
214class TooMuchPaginationError(BaseClientError):
215    """Specified maximum number of requests exceeded while retrieving results from a paginated resource."""
216
217    pass

Specified maximum number of requests exceeded while retrieving results from a paginated resource.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
class GetOnlyError(chris.util.errors.BaseClientError):
220class GetOnlyError(BaseClientError):
221    """Search does not have exactly one result."""
222
223    pass

Search does not have exactly one result.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
class NoneSearchError(GetOnlyError):
226class NoneSearchError(GetOnlyError):
227    """A search expected to have at least one element, has none."""
228
229    pass

A search expected to have at least one element, has none.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
class ManySearchError(GetOnlyError):
232class ManySearchError(GetOnlyError):
233    """A search expected to have only one result, has several."""
234
235    pass

A search expected to have only one result, has several.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback