Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection Pool thread safety, secure connection, SSL, gather? #86

Open
devMarlee opened this issue Nov 17, 2023 · 2 comments
Open

Connection Pool thread safety, secure connection, SSL, gather? #86

devMarlee opened this issue Nov 17, 2023 · 2 comments

Comments

@devMarlee
Copy link

devMarlee commented Nov 17, 2023

Hello @long2ice and contributors,
I am implementing connection pool in my Flask application. I am inserting a lot of files into Clickhouse at one time and wait for another long time. So I create a connection pool in each request and close it when the request is done.

My Flask app is running in Gunicorn with multiple threads.

However, i am getting an error like:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/sslproto.py", line 685, in _process_write_backlog
    self._transport.write(chunk)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 916, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 711, in _fatal_error
    self._force_close(exc)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 723, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2023-11-17 08:27:46,675 - ERROR - Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x4036c5c8b0>
transport: <_SelectorSocketTransport closing fd=26>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/sslproto.py", line 685, in _process_write_backlog
    self._transport.write(chunk)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 916, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 711, in _fatal_error
    self._force_close(exc)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 723, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

I only get this request from the second request and onwards (not on the first request). I am wondering if the problem could be related to something with thread-safety, SSL connections, or perhaps use of asyncio.gather with the connection pool

My route is like this:

@some_route("/route_name", methods=[POST])
async def insert():
    # some inits here
    list_of_files = [some list of files]
    async with create_pool(minSize=50, maxSize=100, host=host, port=port, user=username, password=pw, secure=True) as pool:
        tasks = []
        for files in list_of_files:
            tasks.append(execute_with_pool(pool, file))
        responses = await asyncio.gather(*tasks)
        response_list.append(responses)
    
    pool.close()
    await pool.wait_closed()
    return response_list
    
    async def execute_with_pool(pool, file):
        cmd = # I create some CH command with the filename
        try:
            async with pool.acquire() as conn:
                 async with conn.cursor() as cursor:
                    response = await cursor.execute(cmd)
                 return Response(response)
        except:
               # some exception handling

Is this an incorrect way of using the connection pool?

Using python3.8, linux OS, asynch=0.2.2

@ljluestc
Copy link

ljluestc commented Sep 2, 2024

from flask import Flask, request, jsonify, Response
import asyncio
from asyncpg import create_pool, Pool
from asyncpg.connection import Connection

app = Flask(__name__)

# Initialize the connection pool globally
pool: Pool = None

async def init_pool():
    global pool
    pool = await create_pool(
        min_size=50, max_size=100, 
        host='your_host', port='your_port', 
        user='your_username', password='your_password', 
        ssl=True
    )

@app.before_first_request
async def setup_pool():
    await init_pool()

@app.route('/route_name', methods=['POST'])
async def insert():
    list_of_files = request.json.get('files', [])
    response_list = []

    async def execute_with_pool(file):
        cmd = f"YOUR SQL COMMAND FOR {file}"  # Adjust this to your actual command
        try:
            async with pool.acquire() as conn:
                async with conn.transaction():
                    response = await conn.execute(cmd)
                    return response
        except Exception as e:
            print(f"Exception occurred: {e}")
            return str(e)
    
    tasks = [execute_with_pool(file) for file in list_of_files]
    responses = await asyncio.gather(*tasks)
    return jsonify(responses)

@app.teardown_appcontext
async def close_pool(exception=None):
    global pool
    if pool:
        await pool.close()

if __name__ == '__main__':
    app.run(threaded=True)  # Gunicorn will handle multi-threading, this may not be necessary

@Rama3an
Copy link

Rama3an commented Sep 5, 2024

Hello!! Did you manage to solve the problem? I just encountered exactly the same thing :)
@devMarlee

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants