1 year ago

#368457

test-img

Scrimpy

Closing MYSQL connection pool properly in Python

I am using mysql.connector.pooling to create a pooled group of connections to use in my multithreaded/multiprocessing based application. The following code creates the records as expect but I notice at the end of the run that the /var/log/mysql/error.log has an entry for each thread which is "2022-04-02T10:14:08.264395Z 2513 [Note] Aborted connection 2513 to db: 'my_db' user: 'my_user' host: 'localhost' (Got an error reading communication packets)"

After reading a number of articles I believe that the close I call in my thread to close a pooled connection is only returning the connection back to the pool and so the whole pool/connections are not actually closed properly.

Not sure how to close the pool properly. Looking for help.

import mysql.connector
from mysql.connector.pooling import MySQLConnectionPool
from mysql.connector import errorcode
from mysql.connector.errors import Error
import multiprocessing as mp
import random
import datetime
from timeit import default_timer as timer

my_db = mysql.connector.connect(host="localhost",
                                user="my_user",
                                password="my_password",
                                database="my_db")


def init_pools():
    global pool_for_db
    try:
        db_config = {
            "host": "localhost",
            "user": "my_user",
            "password": "my_password",
            "database": "my_db"
        }
        pool_for_my_db = MySQLConnectionPool(pool_name="pool_for_my_db",
                                               pool_size=1,
                                               **db_config)

    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Invalid password on main database")
            exit(0)
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database not initialised")
            exit(0)


def sql_select(item):
    connection = pool_for_my_db.get_connection()
    my_db_cursor = connection.cursor()
    my_db_cursor.execute("SELECT * FROM connection_test WHERE row2 = " + str(item))
    connection.close()


def clear_table():
    my_db_cursor = my_db.cursor()
    sql_statement = "SELECT COUNT(*) FROM connection_test"
    my_db_cursor.execute(sql_statement)
    count = my_db_cursor.fetchone()
    print('Count is', count)

    my_db_cursor = my_db.cursor()
    sql_statement = "DELETE FROM connection_test"
    my_db_cursor.execute(sql_statement)
    my_db.commit()
    print(my_db_cursor.rowcount, "record(s) deleted")


def table_insert(item):
    try:
        connection = pool_for_my_db.get_connection()
        my_db_cursor = connection.cursor()

        sql_statement = "INSERT INTO connection_test (row1, row2, row3) VALUES (%s,%s,%s)"
        values = (pool_for_my_db.pool_name, item, datetime.datetime.now())
        my_db_cursor.execute(sql_statement, values)
        connection.commit()
        connection.close()
    except mysql.connector.Error as e:
        print("Error code:", e.errno)  # error number
        print("SQLSTATE value:", e.sqlstate)  # SQLSTATE value
        print("Error message:", e.msg)  # error message
        print("Error:", e)  # errno, sqlstate, msg values
        s = str(e)
        print("Error:", s)

my_list = [*range(0, 100000)]
mp.set_start_method("fork")
#
# clear_table()
#
# start = timer()
# print('Starting 16 thread run')
# with mp.Pool(16, initializer=init_pools) as p:
#     p.map(table_insert, my_list)
# p.close()
# p.join()
# end = timer()
# print("Time for 16 run", end-start)

# clear_table()

# start = timer()
# print('Starting 32 thread run')
# with mp.Pool(32, initializer=init_pools) as p:
#     p.map(table_insert, my_list)
# p.close()
# p.join()
# end = timer()
# print("Time for 32 run", end-start)
#
# clear_table()
#
# start = timer()
# print('Starting 48 thread run')
# with mp.Pool(48, initializer=init_pools) as p:
#     p.map(table_insert, my_list)
# p.close()
# p.join()
# end = timer()
# print("Time for 48 run", end-start)
#
clear_table()

start = timer()
print('Starting 64 thread run')
with mp.Pool(64, initializer=init_pools) as p:
    p.map(table_insert, my_list)
p.close()
p.join()
end = timer()
print("Time for 64 run", end-start)

python

mysql

multithreading

connection-pooling

0 Answers

Your Answer

Accepted video resources