"""
Includes dependencies of services to be used when calling
:func:`ourobori.apps.http.run_service`.
"""
from typing import NoReturn
from typing import Optional
from typing import Union
import urllib.parse
import attr
from aiohttp import BasicAuth
from aiohttp import ClientSession
from pydantic import BaseModel
from ourobori.services.errors import ServiceCallException
from ourobori.services.errors import ServiceNotAliveException
from ourobori.services.errors import prepare_exception_ctx
[docs]@attr.s(auto_attribs=True)
class RestService:
"""
This class represents a rest-service-dependency.
It should be used as a dependency on service-startup in the ``app.py``
passing to the :func:`ourobori.apps.http.run_service`.
Attributes:
name: The name of the service
base_url: The url-of the service including the hostname and port.
alive_api: The route to the alive-api.
alive_params: If the alive_api requires parameters to be called (like
the ping-api) they should be defined here.
auth: If a BasicAuthentication is required to access the service it
it should be defined here.
"""
name: str
base_url: str
alive_api: str
alive_params: Optional[BaseModel] = None
auth: Optional[BasicAuth] = None
[docs] async def call_api(self,
*,
api: str,
data: dict,
method: Optional[str] = 'post',
json_response: Optional[bool] = True) -> Union[str, dict]:
"""
Calls the passed api with the passed data using the defined method.
If the json_response is set to False the result will be a either of
type str, or of type dict.
Parameters:
api: The api-method to call.
data: The data to use for the api-method-call.
method: The method to use for the call. Can be either 'post' or
'get'.
json_response: Flag if the result of the api-method-call will be
json or not.
Returns: The result for the api-call.
Raises: ServiceCallException
"""
url = urllib.parse.urljoin(self.base_url, api)
async with ClientSession(auth=self.auth) as session:
if method == 'post':
async with session.post(url, json=data) as response:
result = await _extract_http_response(
json_response=json_response,
response=response,
servicename=self.name,
api=api,
data=data)
else:
async with session.get(url, json=data) as response:
result = await _extract_http_response(
json_response=json_response,
response=response,
servicename=self.name,
api=api,
data=data)
return result
[docs] async def is_alive(self) -> NoReturn:
"""
Checks if the service is alive.
Raises: ServiceNotAliveException
"""
url = urllib.parse.urljoin(self.base_url, self.alive_api)
json_data = dict(self.alive_params) if self.alive_params else None
if json_data:
request_args = dict(json=json_data)
else:
request_args = dict()
async with ClientSession(auth=self.auth) as session:
async with session.post(url, **request_args) as response:
if not response.status == 200:
exc_params = dict(name=self.name)
exc = prepare_exception_ctx(
exception_class=ServiceNotAliveException,
exception_params=exc_params)
raise exc