search
HomeBackend DevelopmentPython TutorialWhy is the Sqlalchemy database connection not closed correctly? How to solve this problem?

Why is the Sqlalchemy database connection not closed correctly? How to solve this problem?

Correct method of closing SQLAlchemy database connection and troubleshooting

When using Python's SQLAlchemy library for database operations, it is crucial to ensure that the database connection is properly closed to avoid resource leaks and performance issues. This article analyzes a common SQLAlchemy connection closure problem and provides solutions.

The following code snippet shows an example of possible connection closing problems:

 from sqlalchemy import create_engine, url, delete, update, select, exists
from sqlalchemy.orm import sessionmaker, scoped_session
from core.database.base import base # Assume this is your database base class from lib.type import type # Assume this is your type definition from typing import Any
from flask import g, current_app

import importlib
import re


class Database: # Change the class name to capitalize the initial letter, comply with Python specification env = None

    def set(self, key: str, value: Any):
        """
        Set the property value, set to g.application or g.platform according to the environment variable
        """
        if self.env == "application":
            g.application = self.container._replace(**{key: value})
        elif self.env == 'platform':
            g.platform = self.container._replace(**{key: value})

    @property
    def container(self):
        """
        Returns g.application or g.platform container """
        if self.env == "application":
            if "application" not in g:
                g.application = type.application(None, None, None)
            return g.application
        elif self.env == 'platform':
            if "platform" not in g:
                g.platform = type.platform(None, None)
            return g.platform

    @property
    def database_conf(self):
        """
        Get database configuration """
        return base.setting(current_app.config["database"])

    @property
    def __database_core(self):
        """
        Create a database session and cache it to instance attribute """
        if not hasattr(self, '_database_core'):
            self._database_core = self.__create_session(**self.database_conf)
        return self._database_core

    @property
    def __create_engine(self):
        """
        Get the database engine and cache it to the instance attribute """
        return self.__database_core.engine

    @property
    def __create_database(self):
        """
        Get the database session and cache it to the instance attribute """
        return self.__database_core.session

    def __create_session(self, **config):
        """
        Create a database session """
        engine = self.create_engine(**config)
        session = scoped_session(sessionmaker(bind=engine, autoflush=True))
        return type.database(engine=engine, session=session())

    @classmethod
    def create_engine(cls, **kwargs):
        """
        Create a database engine """
        return create_engine(url.create("mysql pymysql", **kwargs), echo=True, isolation_level="autocommit")

    @staticmethod
    def create_all(models: list, engine=None):
        """
        Create tables for all models """
        tables = [Database.get_model(model).__table__ for model in models]
        base.metadata.create_all(bind=engine, tables=tables)

    def create_table(self, tables: list):
        """
        Create a table for the specified model """
        Database.create_all(models=tables, engine=self.__create_engine)

    @staticmethod
    def get_model(model: str):
        """
        Get the model object """
        module = importlib.import_module(f"model.{model.split('_')[0]}.{model}")
        class_name = ''.join(re.findall(r"[a-za-z] ", model.split(".")[-1].title()))
        return getattr(module, class_name)()

    @property
    def database(self):
        """
        Get database session """
        return self.__create_database

    def table_data_query_all(self, model: Any, condition: list = None, order: list = None, limit: int = 500,
                             fields: list = None) -> list[dict]:
        """
        Query all data """
        query = select(model)
        if fields:
            query = query.with_only_columns(*fields)
        if condition:
            query = query.filter(*condition)
        if order:
            query = query.order_by(*order)
        results = [row.dict() for row in self.database.execute(query.limit(limit)).scalars()]
        Return results

    def table_data_query_one(self, model: Any, condition: list = None) -> dict:
        """
        Query a single piece of data """
        result = self.database.execute(select(model).filter(*condition).limit(1)).scalar_one_or_none()
        return None if result is None else result.dict()

    def table_data_query_exists(self, condition: list) -> bool:
        """
        Query whether the data exists """
        return self.database.query(exists().where(*condition)).scalar()

    def table_data_insert_all(self, models: list) -> None:
        """
        Batch insertion of data """
        with self.database as db:
            db.add_all(models)
            db.commit()

    def table_data_insert_one(self, model, data: bool = False) -> int | dict:
        """
        Insert a single piece of data """
        with self.database as db:
            db.add(model)
            db.commit()
            return model.dict() if data else model.id

    def table_data_update(self, model: Any, condition: list, data: dict) -> None:
        """
        Update data """
        with self.database as db:
            db.execute(update(model).where(*condition).values(**data))
            db.commit() # Def table_data_delete(self, model: Any, condition: list) -> None:
        """
        Delete data """
        with self.database as db:
            db.execute(delete(model).where(*condition))
            db.commit() # Def close(self):
        """
        Close the database connection """
        if hasattr(self, '_database_core'):
            self._database_core.session.close()
            self._database_core.engine.dispose()
            del self._database_core

    def __del__(self):
        """
        Destructor, ensure that the connection is closed """
        self.close()

Improvement instructions:

  1. Class name specification: Change database to Database , comply with Python naming specifications.
  2. Property cache: Use @property and instance property cache _database_core to avoid repeated session creation.
  3. Explicit commit: Added db.commit() in table_data_update and table_data_delete to ensure transaction commits.
  4. Resource release: close() method explicitly calls session.close() and engine.dispose() to release the resource. del self._database_core deletes cached session object.
  5. Exception handling: Consider adding try...except block to handle potential exceptions, such as database connection errors.
  6. Use of scoped_session : scoped_session is usually used in Flask applications with g objects, ensuring that each request uses an independent session and is automatically closed when the request ends. However, the code does not reflect the Flask request context management, so dispose() is necessary. dispose() may not be required if using Flask's context management, but session.close() is still necessary.

Solution:

The main problem is the use of scoped_session and the timing of resource release. scoped_session itself does not guarantee automatic closing of the connection, it only manages the scope of the session. self.database.get_bind().dispose() may not work in some cases because it may not properly close the underlying database connection.

Therefore, it is necessary to call the close() method where appropriate, or call close() method in the class's destructor __del__ to ensure that the connection is closed correctly. However, relying on __del__ is not a best practice, because Python's garbage collection mechanism is unpredictable. It is recommended to explicitly call instance.close() after using Database instance.

Best Practices:

  • Use a context manager ( with statement) to manage database sessions: This ensures that the session is automatically closed after the code block is executed.
  • In Flask applications, using extension libraries such as Flask-SQLAlchemy, it is possible to manage database connections and sessions more easily. These libraries usually handle the closing and release of connections automatically.

Through the above improvements, the problem that SQLAlchemy database connection cannot be closed correctly can be effectively solved, and the code is robust and maintainable. Remember, explicitly closing connections is best practice and avoid relying on garbage collection mechanisms.

The above is the detailed content of Why is the Sqlalchemy database connection not closed correctly? How to solve this problem?. For more information, please follow other related articles on the PHP Chinese website!

Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
图文详解mysql架构原理图文详解mysql架构原理May 17, 2022 pm 05:54 PM

本篇文章给大家带来了关于mysql的相关知识,其中主要介绍了关于架构原理的相关内容,MySQL Server架构自顶向下大致可以分网络连接层、服务层、存储引擎层和系统文件层,下面一起来看一下,希望对大家有帮助。

mysql的msi与zip版本有什么区别mysql的msi与zip版本有什么区别May 16, 2022 pm 04:33 PM

mysql的msi与zip版本的区别:1、zip包含的安装程序是一种主动安装,而msi包含的是被installer所用的安装文件以提交请求的方式安装;2、zip是一种数据压缩和文档存储的文件格式,msi是微软格式的安装包。

mysql怎么去掉第一个字符mysql怎么去掉第一个字符May 19, 2022 am 10:21 AM

方法:1、利用right函数,语法为“update 表名 set 指定字段 = right(指定字段, length(指定字段)-1)...”;2、利用substring函数,语法为“select substring(指定字段,2)..”。

mysql怎么替换换行符mysql怎么替换换行符Apr 18, 2022 pm 03:14 PM

在mysql中,可以利用char()和REPLACE()函数来替换换行符;REPLACE()函数可以用新字符串替换列中的换行符,而换行符可使用“char(13)”来表示,语法为“replace(字段名,char(13),'新字符串') ”。

mysql怎么将varchar转换为int类型mysql怎么将varchar转换为int类型May 12, 2022 pm 04:51 PM

转换方法:1、利用cast函数,语法“select * from 表名 order by cast(字段名 as SIGNED)”;2、利用“select * from 表名 order by CONVERT(字段名,SIGNED)”语句。

MySQL复制技术之异步复制和半同步复制MySQL复制技术之异步复制和半同步复制Apr 25, 2022 pm 07:21 PM

本篇文章给大家带来了关于mysql的相关知识,其中主要介绍了关于MySQL复制技术的相关问题,包括了异步复制、半同步复制等等内容,下面一起来看一下,希望对大家有帮助。

mysql怎么判断是否是数字类型mysql怎么判断是否是数字类型May 16, 2022 am 10:09 AM

在mysql中,可以利用REGEXP运算符判断数据是否是数字类型,语法为“String REGEXP '[^0-9.]'”;该运算符是正则表达式的缩写,若数据字符中含有数字时,返回的结果是true,反之返回的结果是false。

带你把MySQL索引吃透了带你把MySQL索引吃透了Apr 22, 2022 am 11:48 AM

本篇文章给大家带来了关于mysql的相关知识,其中主要介绍了mysql高级篇的一些问题,包括了索引是什么、索引底层实现等等问题,下面一起来看一下,希望对大家有帮助。

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
3 weeks agoBy尊渡假赌尊渡假赌尊渡假赌

Hot Tools

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Integrate Eclipse with SAP NetWeaver application server.

Dreamweaver Mac version

Dreamweaver Mac version

Visual web development tools

SecLists

SecLists

SecLists is the ultimate security tester's companion. It is a collection of various types of lists that are frequently used during security assessments, all in one place. SecLists helps make security testing more efficient and productive by conveniently providing all the lists a security tester might need. List types include usernames, passwords, URLs, fuzzing payloads, sensitive data patterns, web shells, and more. The tester can simply pull this repository onto a new test machine and he will have access to every type of list he needs.

SublimeText3 Linux new version

SublimeText3 Linux new version

SublimeText3 Linux latest version

EditPlus Chinese cracked version

EditPlus Chinese cracked version

Small size, syntax highlighting, does not support code prompt function