Source code for ourobori.apps.dependencies.dbs

"""
Includes dependencies of databases to be used when calling
:func:`ourobori.apps.http.run_service`.
"""


from typing import NoReturn
from typing import Optional

import attr
import asyncpg
from aiohttp import web

from ourobori.services.errors import DatabaseNotAliveException
from ourobori.services.errors import DatabaseTransactionException


[docs]@attr.s(auto_attribs=True) class PSQLDataBase: """ This class represents a postgresql-connection using asyncpg. It should be used as a dependency on service-startup in the ``app.py`` passing to the :func:`ourobori.apps.http.run_service`. Attributes: dsn: The dsn to use for this database name: The name of the database pool_name: The name to access the connection-pool from the ``app`` """ dsn: str name: str pool_name: str = attr.ib() app: web.Application = None
[docs] @pool_name.default def set_pool_name(self) -> str: """ Sets the attribute pool_name . Returns: the pool_name """ return f'{self.name}_pool'
[docs] async def prepare(self, *, app: web.Application) -> NoReturn: """ Prepares the connection pool used in the :func:`run_sql` and saves it inside the app. Parameters: app: The app to save the connection-pool. """ app[self.pool_name] = await asyncpg.create_pool(self.dsn) self.app = app
[docs] async def run_sql(self, *, sql_cmd: str, action: Optional[str] = 'select'): """ Runs the passed ``sql_cmd`` and either returns the selected data if ``action = 'select'`` or executes the sql_cmd if ``action = 'execute'``. Parameters: sql_cmd: The sql-command to run action: The action of the sql-command. Can be 'select' or 'execute'. """ assert action in ('select', 'execute') pool = self.app[self.pool_name] # Take a connection from the pool. try: async with pool.acquire() as connection: # Open a transaction. async with connection.transaction(): # Run the query passing the request argument. if action == 'select': result = await connection.fetchall(sql_cmd) elif action == 'execute': result = await connection.execute(sql_cmd) except Exception as err: exc_params = dict(name=self.name, transaction=sql_cmd, err=err) err_to_raise = DatabaseTransactionException( DatabaseTransactionException.description.format(**exc_params)) err_to_raise.description = err_to_raise.description.format( **exc_params) err_to_raise.data['msg'] = err_to_raise.data['msg'].format( **exc_params) raise err_to_raise return result
[docs] async def is_alive(self) -> NoReturn: """ Checks if the database can be accessed. Only a wrapper around a custom select and the :func:`self.run_sql`. Raises: DatabaseNotAliveException """ sql_cmd = """ SELECT datname FROM pg_database WHERE datistemplate = false; """ try: await self.run_sql(sql_cmd=sql_cmd) except Exception as err: exc_params = dict(name=self.name, dsn=self.dsn) err_to_raise = DatabaseNotAliveException( DatabaseNotAliveException.description.format(**exc_params)) err_to_raise.description = err_to_raise.description.format( **exc_params) err_to_raise.data['msg'] = err_to_raise.data['msg'].format( **exc_params) raise err_to_raise