Source code for ourobori.apps.dependencies.services

"""
Includes dependencies of services to be used when calling
:func:`ourobori.apps.http.run_service`.
"""


from typing import NoReturn
from typing import Optional
from typing import Union
import urllib.parse

import attr
from aiohttp import BasicAuth
from aiohttp import ClientSession
from pydantic import BaseModel

from ourobori.services.errors import ServiceCallException
from ourobori.services.errors import ServiceNotAliveException
from ourobori.services.errors import prepare_exception_ctx


[docs]async def _extract_http_response(*, json_response: bool, response, servicename: str, api: str, data: dict) -> Union[str, dict]: """ Extracts the json-data or text of the response, depending on the parameter json_response. Parameters: json_response: Flag if the response data is json or not. response: The response of the api-call. servicename: The name of the service called. api: The name of the called api. data: The original data passed to the api-call. Returns: result: The extracted content of the response. Raises: ServiceCallException """ try: if json_response: result = await response.json() else: result = await response.text() except Exception as err: exc_params = dict(name=servicename, api=api, params=data, err=err) exc = prepare_exception_ctx(exception_class=ServiceCallException, exception_params=exc_params) raise exc return result
[docs]@attr.s(auto_attribs=True) class RestService: """ This class represents a rest-service-dependency. It should be used as a dependency on service-startup in the ``app.py`` passing to the :func:`ourobori.apps.http.run_service`. Attributes: name: The name of the service base_url: The url-of the service including the hostname and port. alive_api: The route to the alive-api. alive_params: If the alive_api requires parameters to be called (like the ping-api) they should be defined here. auth: If a BasicAuthentication is required to access the service it it should be defined here. """ name: str base_url: str alive_api: str alive_params: Optional[BaseModel] = None auth: Optional[BasicAuth] = None
[docs] async def call_api(self, *, api: str, data: dict, method: Optional[str] = 'post', json_response: Optional[bool] = True) -> Union[str, dict]: """ Calls the passed api with the passed data using the defined method. If the json_response is set to False the result will be a either of type str, or of type dict. Parameters: api: The api-method to call. data: The data to use for the api-method-call. method: The method to use for the call. Can be either 'post' or 'get'. json_response: Flag if the result of the api-method-call will be json or not. Returns: The result for the api-call. Raises: ServiceCallException """ url = urllib.parse.urljoin(self.base_url, api) async with ClientSession(auth=self.auth) as session: if method == 'post': async with session.post(url, json=data) as response: result = await _extract_http_response( json_response=json_response, response=response, servicename=self.name, api=api, data=data) else: async with session.get(url, json=data) as response: result = await _extract_http_response( json_response=json_response, response=response, servicename=self.name, api=api, data=data) return result
[docs] async def is_alive(self) -> NoReturn: """ Checks if the service is alive. Raises: ServiceNotAliveException """ url = urllib.parse.urljoin(self.base_url, self.alive_api) json_data = dict(self.alive_params) if self.alive_params else None if json_data: request_args = dict(json=json_data) else: request_args = dict() async with ClientSession(auth=self.auth) as session: async with session.post(url, **request_args) as response: if not response.status == 200: exc_params = dict(name=self.name) exc = prepare_exception_ctx( exception_class=ServiceNotAliveException, exception_params=exc_params) raise exc