Source code for ourobori.apps.dependencies.dbs

Includes dependencies of databases to be used when calling

from typing import NoReturn
from typing import Optional

import attr
import asyncpg
from aiohttp import web

from import DatabaseNotAliveException
from import DatabaseTransactionException

[docs]@attr.s(auto_attribs=True) class PSQLDataBase: """ This class represents a postgresql-connection using asyncpg. It should be used as a dependency on service-startup in the ```` passing to the :func:`ourobori.apps.http.run_service`. Attributes: dsn: The dsn to use for this database name: The name of the database pool_name: The name to access the connection-pool from the ``app`` """ dsn: str name: str pool_name: str = attr.ib() app: web.Application = None
[docs] @pool_name.default def set_pool_name(self) -> str: """ Sets the attribute pool_name . Returns: the pool_name """ return f'{}_pool'
[docs] async def prepare(self, *, app: web.Application) -> NoReturn: """ Prepares the connection pool used in the :func:`run_sql` and saves it inside the app. Parameters: app: The app to save the connection-pool. """ app[self.pool_name] = await asyncpg.create_pool(self.dsn) = app
[docs] async def run_sql(self, *, sql_cmd: str, action: Optional[str] = 'select'): """ Runs the passed ``sql_cmd`` and either returns the selected data if ``action = 'select'`` or executes the sql_cmd if ``action = 'execute'``. Parameters: sql_cmd: The sql-command to run action: The action of the sql-command. Can be 'select' or 'execute'. """ assert action in ('select', 'execute') pool =[self.pool_name] # Take a connection from the pool. try: async with pool.acquire() as connection: # Open a transaction. async with connection.transaction(): # Run the query passing the request argument. if action == 'select': result = await connection.fetchall(sql_cmd) elif action == 'execute': result = await connection.execute(sql_cmd) except Exception as err: exc_params = dict(, transaction=sql_cmd, err=err) err_to_raise = DatabaseTransactionException( DatabaseTransactionException.description.format(**exc_params)) err_to_raise.description = err_to_raise.description.format( **exc_params)['msg'] =['msg'].format( **exc_params) raise err_to_raise return result
[docs] async def is_alive(self) -> NoReturn: """ Checks if the database can be accessed. Only a wrapper around a custom select and the :func:`self.run_sql`. Raises: DatabaseNotAliveException """ sql_cmd = """ SELECT datname FROM pg_database WHERE datistemplate = false; """ try: await self.run_sql(sql_cmd=sql_cmd) except Exception as err: exc_params = dict(, dsn=self.dsn) err_to_raise = DatabaseNotAliveException( DatabaseNotAliveException.description.format(**exc_params)) err_to_raise.description = err_to_raise.description.format( **exc_params)['msg'] =['msg'].format( **exc_params) raise err_to_raise