How to create an asynpg components for 5.x?


#1

Hi guys, simple question here…with I am sure a not so simple answer. I am trying to jump into the async side of things(total noob), but I am struggling to even get a connection pool in asynpg setup and working as a component.

Have any of you solved this problem already and can share your solution with an explanation as to why it works and why it is necessary to implement as such?


#2

That’s more or less what we use in production, with a pooled connection:

DBPool = typing.TypeVar("DBPool") 

async def get_asyncpg_pool():
    """
    Returns global asyncpg pool
    """
    logger.info("Creating new client Pool")
    return await asyncpg.create_pool(**POOL_CONN_ARGS)

class DBPoolComponent(Component):
    _pool = None

    def __init__(self):
        self.pool_lock = asyncio.Lock()

    async def resolve(self):
        if self._pool is None:
            async with self.pool_lock:
                self._pool = (
                    await get_asyncpg_pool() if self._pool is None else self._pool
                )
        return self._pool

    def can_handle_parameter(self, parameter: inspect.Parameter) -> bool:
        return parameter.annotation is DBPool

def my_db_handler(pool: DBPool) -> str:
        return await pool.fetchval(
            """
            SELECT 'Gotcha'
            """
         )

This example works well if you are fine with autocommit and delegates to a global pool (the same for all requests) the SQL queries. If you are less concerned about the number of connections spent in overall a more simplistic approach can be done:

class DBComponent(Component):

    async def resolve(self):
        return await asyncpg.connect('postgresql://postgres@localhost/test', **CONN_ARGS)

    def can_handle_parameter(self, parameter: inspect.Parameter) -> bool:
        return parameter.annotation is DBClient

If you wanna manage transactions you can do it through hooks, i.e django style open and commit/rollback for every request, then just create a hook and open a transaction in on_request and commit or rolback in on_response:

class DBRequestHook:

     def __init__(self):
           self.tr = None

     async def on_request(connection: DBComponent):
           self.tr = connection.transaction()
           await self.tr.start()

     async def on_response(exception: Exception):
          if exception is not None:
              await self.tr.rollback()
          else:
              await self.tr.commit()
          self.tr = None

#3

Thank you! I like reading your posts on this forum, I always learn a lot from your solutions.

Looks like after studying what you did, I was 80% of the way there. The asyncio.Lock() must be the missing piece. I think my pool create was awaited and then the web request tried using the None object that my Component was initialized with as my Pool component.

For the other noobs that stumble on this post, I am going to leave this article here from one of the Django Channel maintainers. It has a great explanation of Python’s async model and is well worth the read if you are new to the paradigm.


#4

Without the lock you can end up with more than one pool in the system dangling which will consume min size pool connections even if you are not even using it anymore, this happens in real environments under real load, hard to replicate in local with dev settings, consider that in asyncio by nature almost everything, except the ones that contains threadsafe in their signature, are NOT thread safe, and class attributes are not thread safe either!