Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL - asyncio.Lock get error #118

Open
watsy0007 opened this issue Jul 2, 2019 · 9 comments
Open

MySQL - asyncio.Lock get error #118

watsy0007 opened this issue Jul 2, 2019 · 9 comments

Comments

@watsy0007
Copy link

watsy0007 commented Jul 2, 2019

when use databases and aiomysql I get Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:370> cb=[set.discard()]> got Future <Future pending> attached to a different loop

detail:

Traceback (most recent call last):
  File "/app/services/currency.py", line 120, in update_currency_ticker_by_id
    return await database.execute(q)
  File "/usr/local/lib/python3.7/site-packages/databases/core.py", line 122, in execute
    return await connection.execute(query, values)
  File "/usr/local/lib/python3.7/site-packages/databases/core.py", line 209, in execute
    async with self._query_lock:
  File "/usr/local/lib/python3.7/asyncio/locks.py", line 92, in __aenter__
    await self.acquire()
  File "/usr/local/lib/python3.7/asyncio/locks.py", line 192, in acquire
    await fut
RuntimeError: Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /usr/local/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:370> cb=[set.discard()]> got Future <Future pending> attached to a different loop

packages:

aiomysql==0.0.20
databases==0.2.3
alembic==1.0.11
fastapi==0.31.0

db.py

from databases import Database
database = Database(DB_URL, min_size=5, max_size=20)

execute code snippet

async def update_currency_ticker_by_id(currency_id: int, ticker: CurrencyTickerUpdateRequest):
    tbl_c = currency
    values = {k: v for k, v in ticker.dict().items() if v is not None}
    if values:
        q = tbl_c.update().where(tbl_c.c.id == currency_id).values(values)
        try:
            return await database.execute(q)
        except Exception as e:
            print('update error', e, "detail\n", traceback.format_exc())

run in docker with base image: python 3.7.3-alpine.

why did this happend? and how to fix it ... it's urgent.

@lexee
Copy link

lexee commented Jul 3, 2019

got this error 2. is there any update here?

@tomchristie
Copy link
Member

Can either of you reduce this to a really simple, reproducible example case?
@lexee Are you also seeing this using FastAPI same as @watsy0007, or not?

@watsy0007
Copy link
Author

watsy0007 commented Jul 10, 2019

from fastapi import FastAPI
from databases import Database
from os import environ as env

database = Database(env.get('DB_URL', 'mysql://root:@127.0.0.1:3306/db'), min_size=5, max_size=10)

app = FastAPI()

@database.transaction()
async def blocked():
    return {}

@app.get('/')
async def index():
    return await blocked()

@app.on_event('startup')
async def setup_db():
    await database.connect()

test.py

import threading
import requests


[threading.Thread(target=requests.get, args=["http://127.0.0.1:8000"]).start() for _ in range(5)]

when I comment @database.transaction(). it's works.

@lexee @tomchristie

@tomchristie
Copy link
Member

Any reason the issue got closed?

@watsy0007 watsy0007 reopened this Jul 11, 2019
@ningzio
Copy link

ningzio commented May 13, 2020

i have the same problem, do we have any solution?

@tomchristie
Copy link
Member

@NingziSlay Can you confirm if this is specific to MySQL?

@ningzio
Copy link

ningzio commented May 14, 2020

@NingziSlay Can you confirm if this is specific to MySQL?

sorry about poor English, hope you could understand..
yes, i only use mysql to my project.

i use fastapi too,
if request come one by one, it works fine.
but it cannot work concurrently

here is main.py

from fastapi import FastAPI
from apps import apis
from libs.db_tools import session

app = FastAPI()

app.include_router(apis, prefix="/api")


@app.on_event("startup")
async def startup():
    await session.connect()


@app.on_event("shutdown")
async def shutdown():
    await session.disconnect()


if __name__ == '__main__':
    import uvicorn

    uvicorn.run(
        app,
        host='0.0.0.0',
        port=8080,
    )

libs.db_tools.py

session = databases.Database("mysql+pymysql://root:[email protected]/beat_dev?charset=utf8&use_unicode=1")

i write this

from libs.db_tools import session
...
@router.get("/test")
async def test():
    query = Track.select()
    return await session.fetch_all(query)

and use ab

ab -n 4 -c 2 http://127.0.0.1:8000/test

will raise exception

...
File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/middleware/errors.py", line 178, in __call__
    raise exc from None
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/middleware/errors.py", line 156, in __call__
    await self.app(scope, receive, _send)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/exceptions.py", line 73, in __call__
    raise exc from None
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/exceptions.py", line 62, in __call__
    await self.app(scope, receive, sender)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 590, in __call__
    await route(scope, receive, send)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 208, in __call__
    await self.app(scope, receive, send)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/starlette/routing.py", line 41, in app
    response = await func(request)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/fastapi/routing.py", line 133, in app
    raw_response = await dependant.call(**values)
  File "/Users/ningzi/workspace/beats/apps/access.py", line 68, in test
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/databases/core.py", line 131, in fetch_all
    return await connection.fetch_all(query, values)
  File "/Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/databases/core.py", line 217, in fetch_all
    async with self._query_lock:
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/locks.py", line 92, in __aenter__
    await self.acquire()
  File "/usr/local/Cellar/python/3.7.7/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/locks.py", line 192, in acquire
    await fut
RuntimeError: Task <Task pending coro=<RequestResponseCycle.run_asgi() running at /Users/ningzi/workspace/beats/.beats/lib/python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py:385> cb=[set.discard()]> got Future <Future pending> attached to a different loop

@tomchristie tomchristie changed the title asyncio.Lock get error MySQL - asyncio.Lock get error May 14, 2020
@ariesdevil
Copy link

ariesdevil commented Aug 27, 2020

Is there any progress on this issue?

@ningzio
Copy link

ningzio commented Sep 16, 2020

I change my code like this, and it's work.

class SessionMaker(object):
    _session = None

    @classmethod
    def get_session(cls):
        if not cls._session:
            cls._session = databases.Database(settings.DB_URL)
        return cls._session

on main.py

@app.on_event("startup")
async def startup():
    session = SessionMaker.get_session()
    await session.connect()


@app.on_event("shutdown")
async def shutdown():
    session = SessionMaker.get_session()
    await session.disconnect()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants