"""
Includes dependencies of databases to be used when calling
:func:`ourobori.apps.http.run_service`.
"""
from typing import NoReturn
from typing import Optional
import attr
import asyncpg
from aiohttp import web
from ourobori.services.errors import DatabaseNotAliveException
from ourobori.services.errors import DatabaseTransactionException
[docs]@attr.s(auto_attribs=True)
class PSQLDataBase:
"""
This class represents a postgresql-connection using asyncpg.
It should be used as a dependency on service-startup in the ``app.py``
passing to the :func:`ourobori.apps.http.run_service`.
Attributes:
dsn: The dsn to use for this database
name: The name of the database
pool_name: The name to access the connection-pool from the ``app``
"""
dsn: str
name: str
pool_name: str = attr.ib()
app: web.Application = None
[docs] @pool_name.default
def set_pool_name(self) -> str:
"""
Sets the attribute pool_name .
Returns: the pool_name
"""
return f'{self.name}_pool'
[docs] async def prepare(self, *, app: web.Application) -> NoReturn:
"""
Prepares the connection pool used in the :func:`run_sql` and saves
it inside the app.
Parameters:
app: The app to save the connection-pool.
"""
app[self.pool_name] = await asyncpg.create_pool(self.dsn)
self.app = app
[docs] async def run_sql(self,
*,
sql_cmd: str,
action: Optional[str] = 'select'):
"""
Runs the passed ``sql_cmd`` and either returns the selected
data if ``action = 'select'`` or executes the sql_cmd if
``action = 'execute'``.
Parameters:
sql_cmd: The sql-command to run
action: The action of the sql-command. Can be 'select' or
'execute'.
"""
assert action in ('select', 'execute')
pool = self.app[self.pool_name]
# Take a connection from the pool.
try:
async with pool.acquire() as connection:
# Open a transaction.
async with connection.transaction():
# Run the query passing the request argument.
if action == 'select':
result = await connection.fetchall(sql_cmd)
elif action == 'execute':
result = await connection.execute(sql_cmd)
except Exception as err:
exc_params = dict(name=self.name,
transaction=sql_cmd,
err=err)
err_to_raise = DatabaseTransactionException(
DatabaseTransactionException.description.format(**exc_params))
err_to_raise.description = err_to_raise.description.format(
**exc_params)
err_to_raise.data['msg'] = err_to_raise.data['msg'].format(
**exc_params)
raise err_to_raise
return result
[docs] async def is_alive(self) -> NoReturn:
"""
Checks if the database can be accessed.
Only a wrapper around a custom select and the :func:`self.run_sql`.
Raises: DatabaseNotAliveException
"""
sql_cmd = """
SELECT datname FROM pg_database
WHERE datistemplate = false;
"""
try:
await self.run_sql(sql_cmd=sql_cmd)
except Exception as err:
exc_params = dict(name=self.name,
dsn=self.dsn)
err_to_raise = DatabaseNotAliveException(
DatabaseNotAliveException.description.format(**exc_params))
err_to_raise.description = err_to_raise.description.format(
**exc_params)
err_to_raise.data['msg'] = err_to_raise.data['msg'].format(
**exc_params)
raise err_to_raise