Flask剖析(二):Flask-Sqlalchemy与多线程、多历程
2019-11-18杂谈搜奇网93°c
A+ A-原创作者:flowell,转载请标明出处:https://www.cnblogs.com/flowell/p/multiprocessing_flask_sqlalchemy.html
Sqlalchemy
flask-sqlalchemy的session是线程平安的,但在多历程环境下,要确保派生子历程时,父历程不存在任何的数据库衔接,能够经由过程挪用db.get_engine(app=app).dispose()来手动烧毁已建立的engine,然后再派生子历程。
近来线上的项目总是会报出数据库衔接相干的毛病,比方“Command out of Sync”,“Mysql server has gone away”,“Lost databse connection”,“Package sequence out of order”等等,终究处理下来,发明以上毛病能够分为两种,一种是和衔接丧失有关的,一种是和衔接被多个线程(历程)同时运用了有关。
我们项目基于flask,有多线程的场景,也有多历程的场景。orm用的是flask的拓展flask-sqlalchemy。flask-sqlalchemy的运用必需基于flask的app实例,也就是说要在app高低文中才运用flask-sqlalchemy,所以在某些离线(非web)场景下,我们也用到了原生的Sqlalchemy。
原生的Sqlalchemy的运用体式格局是
engine = create_engine(db_url) Session = sessionmaker(bind=engine) session = Session()
session.query(xxx)
首先要建立一个engine,engine望文生义就是和数据库衔接的引擎。在现实提议查询前,是不会建立任何connection的。建立engine时能够经由过程指定poolclass参数来指定engine运用的衔接池。默许是QueuePool,也能够设置为NullPool(不运用衔接池)。为了轻易明白,能够把engine视为治理衔接池的对象。
sqlalchemy中session和我们日常平凡数据库里说的session是两个差别的观点,在日常平凡数据库中,session的生命周期从衔接上数据库最先,到断开和数据库的衔接位置。然则sqlalchemy中的session更多的是一种治理衔接的对象,它从衔接池掏出一个衔接,运用衔接,然后开释衔接,而本身也跟随着烧毁。sqlalchemy中的Connection对象是治理真正数据库衔接的对象,真正的数据库衔接在sqlalchemy中是DBAPI。
默许地,假如不传入poolclass,则运用QueuePool(具有肯定数目的衔接池),假如不指定pool_recycle参数,则默许数据库衔接不会革新。也就是说衔接假如不实用,则一向不去革新它。然则题目来了,在Mysql中,输入“show variables like "%timeout%"; ” ,能够看到有一个waittimeout,另有interacttimeout,默许值为28800(8小时),这两个值代表着,假如8个小时内某个数据库衔接都不和mysql联络,那末就会断掉这个衔接。所以,8个小时过去了,Mysql把衔接断掉了,然则sqlalchemy客户端这边却还保持着这个衔接。当某个时刻该衔接从衔接池被掏出运用时,就会抛出“Mysql server has gone away”等衔接丧失的信息。
处理这个题目的要领很简单,只需传入pool_recycle参数即可。特别地,在flask-sqlalchemy中不会涌现这类题目,由于falsk-sqlalchemy拓展自动地帮我们注入了pool_recycle参数,默许为7200秒。
def apply_driver_hacks(self, app, sa_url, options): """This method is called before engine creation and used to inject driver specific hacks into the options. The `options` parameter is a dictionary of keyword arguments that will then be used to call the :func:`sqlalchemy.create_engine` function. The default implementation provides some saner defaults for things like pool sizes for MySQL and sqlite. Also it injects the setting of `SQLALCHEMY_NATIVE_UNICODE`. """ if sa_url.drivername.startswith('mysql'): sa_url.query.setdefault('charset', 'utf8') if sa_url.drivername != 'mysql+gaerdbms': options.setdefault('pool_size', 10) options.setdefault('pool_recycle', 7200) # 默许7200秒革新衔接 elif sa_url.drivername == 'sqlite': pool_size = options.get('pool_size') detected_in_memory = False if sa_url.database in (None, '', ':memory:'): detected_in_memory = True from sqlalchemy.pool import StaticPool options['poolclass'] = StaticPool if 'connect_args' not in options: options['connect_args'] = {} options['connect_args']['check_same_thread'] = False # we go to memory and the pool size was explicitly set # to 0 which is fail. Let the user know that if pool_size == 0: raise RuntimeError('SQLite in memory database with an ' 'empty queue not possible due to data ' 'loss.') # if pool size is None or explicitly set to 0 we assume the # user did not want a queue for this sqlite connection and # hook in the null pool. elif not pool_size: from sqlalchemy.pool import NullPool options['poolclass'] = NullPool # if it's not an in memory database we make the path absolute. if not detected_in_memory: sa_url.database = os.path.join(app.root_path, sa_url.database) unu = app.config['SQLALCHEMY_NATIVE_UNICODE'] if unu is None: unu = self.use_native_unicode if not unu: options['use_native_unicode'] = False if app.config['SQLALCHEMY_NATIVE_UNICODE'] is not None: warnings.warn( "The 'SQLALCHEMY_NATIVE_UNICODE' config option is deprecated and will be removed in" " v3.0. Use 'SQLALCHEMY_ENGINE_OPTIONS' instead.", DeprecationWarning ) if not self.use_native_unicode: warnings.warn( "'use_native_unicode' is deprecated and will be removed in v3.0." " Use the 'engine_options' parameter instead.", DeprecationWarning )
sessionmaker是Session定制要领,我们把engine传入sessionmaker中,就能够获得一个session工场,经由过程工场来生产真正的session对象。然则这类生产出来的session是线程不平安的,sqlalchemy供应了scoped_session来协助我们生产线程平安的session,道理类似于Local,就是代办session,经由过程线程的id来找到真正属于本线程的session。
flask-sqlalchemy就是运用了scoped_session来保证线程平安,详细的代码能够在Sqlalchemy中看到,组织session时,运用了scoped_session。
def create_scoped_session(self, options=None): """Create a :class:`~sqlalchemy.orm.scoping.scoped_session` on the factory from :meth:`create_session`. An extra key ``'scopefunc'`` can be set on the ``options`` dict to specify a custom scope function. If it's not provided, Flask's app context stack identity is used. This will ensure that sessions are created and removed with the request/response cycle, and should be fine in most cases. :param options: dict of keyword arguments passed to session class in ``create_session`` """ if options is None: options = {} scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__) options.setdefault('query_cls', self.Query) return orm.scoped_session( self.create_session(options), scopefunc=scopefunc ) def create_session(self, options): """Create the session factory used by :meth:`create_scoped_session`. The factory **must** return an object that SQLAlchemy recognizes as a session, or registering session events may raise an exception. Valid factories include a :class:`~sqlalchemy.orm.session.Session` class or a :class:`~sqlalchemy.orm.session.sessionmaker`. The default implementation creates a ``sessionmaker`` for :class:`SignallingSession`. :param options: dict of keyword arguments passed to session class """ return orm.sessionmaker(class_=SignallingSession, db=self, **options)
多历程和数据库衔接
多历程环境下,要注意和数据库衔接相干的操纵。
说到多历程,python里最经常使用的就是multiprocessing。multiprocessing在windows下和linux的表现有所区别,在此只议论linux下的表现。linux下多历程经由过程fork()来派生,要明白我下面说的必需先弄懂fork()是什么东西。粗略地说,每一个历程都有本身的一个空间,称为历程空间,每一个历程的历程空间都是自力的,历程与历程之间互不滋扰。fork()的作用,就是将一个历程的历程空间,完完全全地copy一份,copy出来的就是子历程了,所以我们说子历程和父历程有着如出一辙的地点空间。地点空间就是历程运转的空间,这空间里会有历程已翻开的文件描述符,文件描述符会间接地指向历程已翻开的文件。也就是说,fork()以后,父历程,子历程会有雷同的文件描述符,指向雷同的一个文件。为何?由于文件是存在硬盘里的,fork()时copy的内存中的历程空间,并没有把文件也copy一份。这就致使了,父历程,子历程,同时指向同一个文件,他们恣意一个都能够对这个文件举行操纵。这和本文说的数据库有啥关联?顺着这个思绪想,数据库衔接是否是一个TCP衔接?TCP衔接是否是一个socket?socket在linux下是什么,就是一个文件。所以说,假如父历程在fork()之前翻开了数据库衔接,那末子历程也会具有这个翻开的衔接。
两个历程同时写一个衔接会致使数据杂沓,所以会涌现“Command out of sync”的毛病,两个历程同时读一个衔接,会致使一个历程读到了,另一个没读到,就是“No result”。一个历程封闭了衔接,另一个历程并不知道,它试图去操纵衔接时,就会涌现“Lost database connection”的毛病。
在此议论的场景是,父历程在派生子历程之前,父历程具有已翻开的数据库衔接。派生出子历程以后,子历程也就具有了响应的衔接。假如在fork()之前父历程没有翻开数据库衔接,那末也不必忧郁这个题目。比方Celery运用的prefork池,虽然是多历程模子,然则celery在派子历程前时不会翻开数据库衔接的,所以不必忧郁在celery使命中会涌现数据库衔接杂沓的题目。
我做的项目里的多历程的场景之一就是运用tornado来跑web运用,在派生多个web运用实例时,确保此前建立的数据库衔接被烧毁。
app = Flask() db = Sqlalchemy() db.init_app(app) ... ... db.get_engine(app=app).dispose() # 先烧毁已有的engine,确保父历程没有数据库衔接 ... ... fork() # 派生子历程
# 比方
tornado.start() # 启动多个web实例历程