From f22e5656b1e9f261ad17448ec33745bfc5409e8c Mon Sep 17 00:00:00 2001 From: CACode Date: Thu, 6 May 2021 16:31:49 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E9=80=82=E9=85=8D=E5=A4=A7=E9=83=A8?= =?UTF-8?q?=E5=88=86=E6=95=B0=E6=8D=AE=E5=BA=93=E5=88=9B=E5=BB=BA=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CACodeFramework/MainWork/CACodeRepository.py | 65 +- CACodeFramework/exception/e_fields.py | 4 + CACodeFramework/field/MySqlDefault.py | 10 + CACodeFramework/opera/op_db.py | 29 +- CACodeFramework/util/Config.py | 8 +- CACodeFramework/util/DBPool/__init__.py | 6 + CACodeFramework/util/DBPool/persistent_db.py | 92 +++ CACodeFramework/util/DBPool/persistent_pg.py | 57 ++ CACodeFramework/util/DBPool/pooled_db.py | 408 ++++++++++++ CACodeFramework/util/DBPool/pooled_pg.py | 186 ++++++ .../util/DBPool/simple_pooled_db.py | 221 +++++++ .../util/DBPool/simple_pooled_pg.py | 137 ++++ CACodeFramework/util/DBPool/steady_db.py | 609 ++++++++++++++++++ CACodeFramework/util/DBPool/steady_pg.py | 328 ++++++++++ CACodeFramework/util/DbUtil.py | 30 +- CACodeFramework/util/Log.py | 12 +- CACodeFramework/util/ParseUtil.py | 75 ++- test/modules/BaseData.py | 8 - test/modules/DatabaseConf.py | 18 +- test/modules/DemoTable.py | 19 + test/testFunc.py | 24 +- 21 files changed, 2260 insertions(+), 86 deletions(-) create mode 100644 CACodeFramework/util/DBPool/__init__.py create mode 100644 CACodeFramework/util/DBPool/persistent_db.py create mode 100644 CACodeFramework/util/DBPool/persistent_pg.py create mode 100644 CACodeFramework/util/DBPool/pooled_db.py create mode 100644 CACodeFramework/util/DBPool/pooled_pg.py create mode 100644 CACodeFramework/util/DBPool/simple_pooled_db.py create mode 100644 CACodeFramework/util/DBPool/simple_pooled_pg.py create mode 100644 CACodeFramework/util/DBPool/steady_db.py create mode 100644 CACodeFramework/util/DBPool/steady_pg.py delete mode 100644 test/modules/BaseData.py create mode 100644 test/modules/DemoTable.py diff --git a/CACodeFramework/MainWork/CACodeRepository.py b/CACodeFramework/MainWork/CACodeRepository.py index e41c932..a0fdc94 100644 --- a/CACodeFramework/MainWork/CACodeRepository.py +++ b/CACodeFramework/MainWork/CACodeRepository.py @@ -2,6 +2,7 @@ import copy from CACodeFramework.cacode.Serialize import QuerySet from CACodeFramework.exception import e_fields +from CACodeFramework.field import MySqlDefault from CACodeFramework.opera import op_db from CACodeFramework.util.Log import CACodeLog @@ -14,19 +15,7 @@ import uuid from CACodeFramework.util.ParseUtil import ParseUtil -class LogObj(CACodeLog): - """ - 继承CACodeLog,配置 - """ - - def __init__(self, **kwargs): - """ - 继承原始父类 - """ - super(LogObj, self).__init__(**kwargs) - - -class Repository(object): +class Repository: """ - POJO类 - 继承该类表名此类为数据库的pojo类 @@ -106,24 +95,26 @@ class Repository(object): # 取得字段的名称 ParseUtil.set_field_compulsory(self, key='fields', data=kwargs, val=list(instance.getFields().keys())) # 获取sql方言配置 - ParseUtil.set_field_compulsory(self, key='sqlFields', data=kwargs, val=sqlFields) + ParseUtil.set_field_compulsory(self, key='sqlFields', data=kwargs, val=MySqlDefault.MySqlFields_Default()) # 数据源配置 ParseUtil.set_field_compulsory(self, key='config_obj', data=kwargs, val=config_obj) # 连接池 if hasattr(self, 'config_obj') and self.config_obj: - self.db_util = DbUtil.Db_opera(host=ParseUtil.fieldExit(self.config_obj, 'host'), - port=ParseUtil.fieldExit(self.config_obj, 'port'), - user=ParseUtil.fieldExit(self.config_obj, 'user'), - password=ParseUtil.fieldExit(self.config_obj, 'password'), - database=ParseUtil.fieldExit(self.config_obj, 'database'), - charset=ParseUtil.fieldExit(self.config_obj, 'charset'), + self.db_util = DbUtil.Db_opera(host=ParseUtil.fieldExist(self.config_obj, 'host'), + port=ParseUtil.fieldExist(self.config_obj, 'port'), + user=ParseUtil.fieldExist(self.config_obj, 'user'), + password=ParseUtil.fieldExist(self.config_obj, 'password'), + database=ParseUtil.fieldExist(self.config_obj, 'database'), + charset=ParseUtil.fieldExist(self.config_obj, 'charset'), + creator=ParseUtil.fieldExist(self.config_obj, 'creator', + raise_exception=True), POOL=None if 'POOL' not in kwargs.keys() else kwargs['POOL']) else: CACodeLog.err(AttributeError, e_fields.Miss_Attr('`config_obj` is missing')) ParseUtil.set_field_compulsory(self, key='result', data=kwargs, val=None) ParseUtil.set_field_compulsory(self, key='log_obj', data=kwargs, - val=LogObj(**log_conf) if log_conf is not None else None) + val=CACodeLog(**log_conf) if log_conf is not None else None) ParseUtil.set_field_compulsory(self, key='serializer', data=kwargs, val=serializer) # 移除name和msg键之后,剩下的就是对应的数据库字段 # 设置表名 @@ -185,7 +176,11 @@ class Repository(object): # 设置名称 name = str(uuid.uuid1()) # 开启任务 - kwargs = {'func': self.operation.__find_all__, '__task_uuid__': name, 't_local': self} + kwargs = { + 'func': self.operation.__find_all__, + '__task_uuid__': name, + 't_local': self + } result = self.operation.start(*self.fields, **kwargs) self.result = self.serializer(instance=self.instance, base_data=result) @@ -344,9 +339,9 @@ class Repository(object): 将当前储存的值存入数据库 """ kwargs['pojo'] = self - return self.insert_one(**kwargs) + return self.create(**kwargs) - def insert_one(self, **kwargs): + def create(self, **kwargs): """ 插入属性: 返回受影响行数 @@ -358,32 +353,12 @@ class Repository(object): # 设置名称 name = str(uuid.uuid1()) # 开启任务 - kwargs['func'] = self.operation.__insert_one__ + kwargs['func'] = self.operation.__insert__ kwargs['__task_uuid__'] = name kwargs['t_local'] = self self.result = self.operation.start(**kwargs) return self.result - def insert_many(self, **kwargs): - """ - 插入多行 - 这个是用insert_one插入多行 - :param kwargs:包含所有参数: - pojo_list:参照对象列表 - last_id:是否需要返回最后一行数据,默认False - sql:处理过并加上%s的sql语句 - params:需要填充的字段 - :return:list[rowcount,last_id if last_id=True] - """ - kwargs['config_obj'] = self.config_obj - kwargs = ParseUtil.print_sql(**kwargs) - kwargs = ParseUtil.last_id(**kwargs) - self.result = [] - for item in kwargs['pojo_list']: - kwargs['pojo'] = item - self.result.append(self.insert_one(**kwargs)) - return self.result - def copy(self): """ 复制对象进行操做 diff --git a/CACodeFramework/exception/e_fields.py b/CACodeFramework/exception/e_fields.py index d31400f..2146090 100644 --- a/CACodeFramework/exception/e_fields.py +++ b/CACodeFramework/exception/e_fields.py @@ -48,3 +48,7 @@ def Database_Operation(): def Parse_Error(msg): return mat('CACode-Parse', msg) + + +class FieldNotExist(AttributeError): + pass diff --git a/CACodeFramework/field/MySqlDefault.py b/CACodeFramework/field/MySqlDefault.py index efd64aa..8c0b19c 100644 --- a/CACodeFramework/field/MySqlDefault.py +++ b/CACodeFramework/field/MySqlDefault.py @@ -1,5 +1,9 @@ # 所有常量以空格开头并且以空格结束 # 空格符 +import threading + +from CACodeFramework.cacode.Modes import Singleton + space = ' ' @@ -12,6 +16,8 @@ class MySqlFields_Default: 默认的数据库方言配置 """ + _instance_lock = threading.RLock() + def __init__(self): # 角标 self.subscript = '`' @@ -64,3 +70,7 @@ class MySqlFields_Default: keys_str += '{}=%s{}'.format(i, self.ander_str) keys_str = keys_str[0:len(keys_str) - len(self.ander_str)] return keys_str + + def __new__(cls, *args, **kwargs): + instance = Singleton.createDbOpera(cls) + return instance diff --git a/CACodeFramework/opera/op_db.py b/CACodeFramework/opera/op_db.py index cd46883..d86e20c 100644 --- a/CACodeFramework/opera/op_db.py +++ b/CACodeFramework/opera/op_db.py @@ -88,27 +88,36 @@ class DbOperation(object): return self.result - def __insert_one__(self, *args, **kwargs): + def __insert__(self, *args, **kwargs): """作者:CACode 最后编辑于2021/4/12 - + :param pojo: pojo对象 任务方法 """ kwargs = ParseUtil.print_sql(**kwargs) kwargs = ParseUtil.last_id(**kwargs) + ParseUtil.fieldExist(kwargs, 'pojo', raise_exception=True) - if 'pojo' not in kwargs.keys(): - raise SyntaxError('the key of `pojo` cannot be found in the parameters') + if 'many' in kwargs and kwargs['many']: + # 多行插入 + filed_list = ParseUtil.parse_insert_pojo(kwargs['pojo'][0], __table_name__=kwargs['__table_name__'], + insert_str=kwargs['sqlFields'].insert_str, + values_str=kwargs['sqlFields'].values_str) - filed_list = ParseUtil.parse_insert_pojo(kwargs['pojo'], __table_name__=kwargs['__table_name__'], - insert_str=kwargs['sqlFields'].insert_str, - values_str=kwargs['sqlFields'].values_str) + kwargs['pojo_data'] = ParseUtil.parse_pojo_many('fields', kwargs['pojo']) + kwargs.update(filed_list) + self.result = kwargs['db_util'].insert(**kwargs) + return self.result + else: + filed_list = ParseUtil.parse_insert_pojo(kwargs['pojo'], __table_name__=kwargs['__table_name__'], + insert_str=kwargs['sqlFields'].insert_str, + values_str=kwargs['sqlFields'].values_str) - kwargs.update(filed_list) + kwargs.update(filed_list) - self.result = kwargs['db_util'].insert(**kwargs) + self.result = kwargs['db_util'].insert(**kwargs) - return self.result + return self.result def get_result(self): """ diff --git a/CACodeFramework/util/Config.py b/CACodeFramework/util/Config.py index 1b35729..b58145c 100644 --- a/CACodeFramework/util/Config.py +++ b/CACodeFramework/util/Config.py @@ -1,4 +1,6 @@ from CACodeFramework.cacode.Serialize import JsonUtil +from CACodeFramework.exception.e_fields import FieldNotExist +from CACodeFramework.util.Log import CACodeLog class config(object): @@ -14,7 +16,7 @@ class config(object): - conf:其他配置 """ - def __init__(self, host, port, database, user, password, charset='utf8'): + def __init__(self, host, port, database, user, password, charset='utf8', creator=None): """ 必须要有的参数 :param host:数据库地址 @@ -23,8 +25,12 @@ class config(object): :param user:用户名 :param password:密码 :param charset:编码默认utf8 + :param creator:创建者 """ + if creator is None: + CACodeLog.log_error(msg="缺少数据库创建者,你是不是要设置`creator=pymysql`?", obj=FieldNotExist, raise_exception=True) + self.creator = creator self.host = host self.port = port self.database = database diff --git a/CACodeFramework/util/DBPool/__init__.py b/CACodeFramework/util/DBPool/__init__.py new file mode 100644 index 0000000..67a5f2f --- /dev/null +++ b/CACodeFramework/util/DBPool/__init__.py @@ -0,0 +1,6 @@ +__all__ = [ + '__version__', + 'simple_pooled_pg', 'steady_pg', 'pooled_pg', 'persistent_pg', + 'simple_pooled_db', 'steady_db', 'pooled_db', 'persistent_db'] + +__version__ = '2.0.1' diff --git a/CACodeFramework/util/DBPool/persistent_db.py b/CACodeFramework/util/DBPool/persistent_db.py new file mode 100644 index 0000000..ba27f42 --- /dev/null +++ b/CACodeFramework/util/DBPool/persistent_db.py @@ -0,0 +1,92 @@ +from . import __version__ +from .steady_db import connect + +try: + # 纯python实现的thread lock + from _threading_local import local +except ImportError: + # 默认版本的thread locak + from threading import local + + +class PersistentDBError(Exception): + """General PersistentDB error.""" + + +class NotSupportedError(PersistentDBError): + """DB-API module not supported by PersistentDB.""" + + +class PersistentDB: + """生成基于db-api2的数据库连接池对象 + """ + + version = __version__ + + def __init__( + self, creator, + maxusage=None, setsession=None, failures=None, ping=1, + closeable=False, threadlocal=None, *args, **kwargs): + """ + 设置持久性DB-API 2连接生成器。 + + 创建者:返回新的DB-API 2的任意函数 + 连接对象或符合DB-API 2的数据库模块 + maxusage:连接池最大数量,0表示无限 + setsession:可用于准备的SQL命令的可选列表 + 会话,例如[“将日期样式设置为...”,“将时区设置为...”] + 失败:可选的异常类或异常类的元组 + 为此,应应用连接故障转移机制, + 如果默认值(OperationalError,InternalError)不足够 + ping:确定何时应使用ping()检查连接 + (0 =无=永不,1 =默认=每当被请求时, + 2 =创建游标时,4 =执行查询时, + 7 =始终,以及这些值的所有其他位组合) + closeable:设置为True将允许被关闭 + threadlocal:线程独立 + """ + try: + threadsafety = creator.threadsafety + except AttributeError: + try: + if not callable(creator.connect): + raise AttributeError + except AttributeError: + threadsafety = 1 + else: + threadsafety = 0 + if not threadsafety: + raise NotSupportedError("数据库模块未分配线程安全") + self._creator = creator + self._maxusage = maxusage + self._setsession = setsession + self._failures = failures + self._ping = ping + self._closeable = closeable + self._args, self._kwargs = args, kwargs + self.thread = (threadlocal or local)() + + def steady_connection(self): + """Get a steady, non-persistent DB-API 2 connection.""" + return connect( + self._creator, self._maxusage, self._setsession, + self._failures, self._ping, self._closeable, + *self._args, **self._kwargs) + + def connection(self, shareable=False): + """共享连接 + 长时间未关闭则不会被共享 + """ + try: + con = self.thread.connection + except AttributeError: + con = self.steady_connection() + if not con.threadsafety(): + raise NotSupportedError("数据库模块未分配线程安全") + self.thread.connection = con + con._ping_check() + return con + + def dedicated_connection(self): + """Alias for connection(shareable=False).""" + return self.connection() diff --git a/CACodeFramework/util/DBPool/persistent_pg.py b/CACodeFramework/util/DBPool/persistent_pg.py new file mode 100644 index 0000000..60c73cc --- /dev/null +++ b/CACodeFramework/util/DBPool/persistent_pg.py @@ -0,0 +1,57 @@ +from . import __version__ +from .steady_pg import SteadyPgConnection + +try: + from _threading_local import local +except ImportError: + from threading import local + + +class PersistentPg: + """Generator for persistent classic PyGreSQL connections. + + After you have created the connection pool, you can use + connection() to get thread-affine, steady PostgreSQL connections. + """ + + version = __version__ + + def __init__( + self, maxusage=None, setsession=None, + closeable=False, threadlocal=None, *args, **kwargs): + """Set up the persistent PostgreSQL connection generator. + + maxusage: maximum number of reuses of a single connection + (0 or None means unlimited reuse) + When this maximum usage number of the connection is reached, + the connection is automatically reset (closed and reopened). + setsession: optional list of SQL commands that may serve to prepare + the session, e.g. ["set datestyle to ...", "set time zone ..."] + closeable: if this is set to true, then closing connections will + be allowed, but by default this will be silently ignored + threadlocal: an optional class for representing thread-local data + that will be used instead of our Python implementation + (threading.local is faster, but cannot be used in all cases) + args, kwargs: the parameters that shall be used to establish + the PostgreSQL connections using class PyGreSQL pg.DB() + """ + self._maxusage = maxusage + self._setsession = setsession + self._closeable = closeable + self._args, self._kwargs = args, kwargs + self.thread = (threadlocal or local)() + + def steady_connection(self): + """Get a steady, non-persistent PyGreSQL connection.""" + return SteadyPgConnection( + self._maxusage, self._setsession, self._closeable, + *self._args, **self._kwargs) + + def connection(self): + """Get a steady, persistent PyGreSQL connection.""" + try: + con = self.thread.connection + except AttributeError: + con = self.steady_connection() + self.thread.connection = con + return con diff --git a/CACodeFramework/util/DBPool/pooled_db.py b/CACodeFramework/util/DBPool/pooled_db.py new file mode 100644 index 0000000..9d458f3 --- /dev/null +++ b/CACodeFramework/util/DBPool/pooled_db.py @@ -0,0 +1,408 @@ +from threading import Condition + +from . import __version__ +from .steady_db import connect + + +class PooledDBError(Exception): + """General PooledDB error.""" + + +class InvalidConnection(PooledDBError): + """Database connection is invalid.""" + + +class NotSupportedError(PooledDBError): + """DB-API module not supported by PooledDB.""" + + +class TooManyConnections(PooledDBError): + """Too many database connections were opened.""" + + +class PooledDB: + """Pool for DB-API 2 connections. + + After you have created the connection pool, you can use + connection() to get pooled, steady DB-API 2 connections. + """ + + version = __version__ + + def __init__( + self, creator, mincached=0, maxcached=0, + maxshared=0, maxconnections=0, blocking=False, + maxusage=None, setsession=None, reset=True, + failures=None, ping=1, + *args, **kwargs): + """ + + 设置DB-API 2连接池。 + + creator:返回新的DB-API 2的任意函数 + 连接对象或符合DB-API 2的数据库模块 + mincached:池中空闲连接的初始数量 + (0表示启动时未建立连接) + maxcached:池中最大空闲连接数 + (0或无表示池大小不受限制) + maxshared:共享连接的最大数量 + (0或无表示所有连接都是专用的) + 当达到此最大数量时,连接为 + 如果被要求共享,则将它们共享。 + maxconnections:通常允许的最大连接数 + (0或无表示任意数量的连接) + blocking:确定超出最大值时的行为 + (如果将其设置为true,请阻止并等待,直到 + 连接减少,否则将报告错误) + maxusage:单个连接的最大重用次数 + (0或无表示无限重用) + 当达到连接的最大使用次数时, + 连接将自动重置(关闭并重新打开)。 + setsession:可用于准备的SQL命令的可选列表 + 会话,例如[“将日期样式设置为...”,“将时区设置为...”] + reset:返回到池后应如何重置连接 + (对于以begin()开始的回滚事务,为False或None, + 出于安全考虑,总是发出回滚是正确的) + failures:可选的异常类或异常类的元组 + 为此,应应用连接故障转移机制, + 如果默认值(OperationalError,InternalError)不足够 + ping:确定何时应使用ping()检查连接 + (0 =无=永不,1 =默认=每当从池中获取时, + 2 =创建游标时,4 =执行查询时, + 7 =始终,以及这些值的所有其他位组合) + args,kwargs:应传递给创建者的参数 + 函数或DB-API 2模块的连接构造函数 + + + 初始化配置 + 以下参数与PooledDB一致 + :param creator:默认即可 + :param maxconnections:默认即可 + :param mincached:默认即可 + :param maxcached:默认即可 + :param maxshared:默认即可 + :param blocking:默认即可 + :param setsession:默认即可 + :param ping:默认即可 + :param host:数据库IP地址 + :param port:端口 + :param user:用户名,如root + :param password:密码 + :param database:数据库名 + :param charset:编码格式 + :param POOL:使用自定义的PooledDB,不建议 + """ + try: + threadsafety = creator.threadsafety + except AttributeError: + try: + if not callable(creator.connect): + raise AttributeError + except AttributeError: + threadsafety = 2 + else: + threadsafety = 0 + if not threadsafety: + raise NotSupportedError("数据库模块未分配线程安全") + self._creator = creator + self._args, self._kwargs = args, kwargs + self._blocking = blocking + self._maxusage = maxusage + self._setsession = setsession + self._reset = reset + self._failures = failures + self._ping = ping + if mincached is None: + mincached = 0 + if maxcached is None: + maxcached = 0 + if maxconnections is None: + maxconnections = 0 + if maxcached: + if maxcached < mincached: + maxcached = mincached + self._maxcached = maxcached + else: + self._maxcached = 0 + if threadsafety > 1 and maxshared: + self._maxshared = maxshared + self._shared_cache = [] # 共享连接的缓存 + else: + self._maxshared = 0 + if maxconnections: + if maxconnections < maxcached: + maxconnections = maxcached + if maxconnections < maxshared: + maxconnections = maxshared + self._maxconnections = maxconnections + else: + self._maxconnections = 0 + self._idle_cache = [] # 空闲连接池 + self._lock = Condition() + self._connections = 0 + # Establish an initial number of idle database connections: + idle = [self.dedicated_connection() for i in range(mincached)] + while idle: + idle.pop().close() + + def steady_connection(self): + """获得稳定的,未池化的DB-API 2连接.""" + return connect( + self._creator, self._maxusage, self._setsession, + self._failures, self._ping, True, *self._args, **self._kwargs) + + def connection(self, shareable=True): + """从连接池中获得稳定的缓存的DB-API 2连接。 + + :param shareable:允许共享连接 + """ + if shareable and self._maxshared: + self._lock.acquire() + try: + while (not self._shared_cache and self._maxconnections + and self._connections >= self._maxconnections): + self._wait_lock() + if len(self._shared_cache) < self._maxshared: + # shared cache is not full, get a dedicated connection + try: # first try to get it from the idle cache + con = self._idle_cache.pop(0) + except IndexError: # else get a fresh connection + con = self.steady_connection() + else: + con._ping_check() # check this connection + con = SharedDBConnection(con) + self._connections += 1 + else: # shared cache full or no more connections allowed + self._shared_cache.sort() # least shared connection first + con = self._shared_cache.pop(0) # get it + while con.con._transaction: + # do not share connections which are in a transaction + self._shared_cache.insert(0, con) + self._wait_lock() + self._shared_cache.sort() + con = self._shared_cache.pop(0) + con.con._ping_check() # check the underlying connection + con.share() # increase share of this connection + # put the connection (back) into the shared cache + self._shared_cache.append(con) + self._lock.notify() + finally: + self._lock.release() + con = PooledSharedDBConnection(self, con) + else: # try to get a dedicated connection + self._lock.acquire() + try: + while (self._maxconnections + and self._connections >= self._maxconnections): + self._wait_lock() + # connection limit not reached, get a dedicated connection + try: # first try to get it from the idle cache + con = self._idle_cache.pop(0) + except IndexError: # else get a fresh connection + con = self.steady_connection() + else: + con._ping_check() # check connection + con = PooledDedicatedDBConnection(self, con) + self._connections += 1 + finally: + self._lock.release() + return con + + def dedicated_connection(self): + """Alias for connection(shareable=False).""" + return self.connection(False) + + def unshare(self, con): + """Decrease the share of a connection in the shared cache.""" + self._lock.acquire() + try: + con.unshare() + shared = con.shared + if not shared: # connection is idle, + try: # so try to remove it + self._shared_cache.remove(con) # from shared cache + except ValueError: + pass # pool has already been closed + finally: + self._lock.release() + if not shared: # connection has become idle, + self.cache(con.con) # so add it to the idle cache + + def cache(self, con): + """Put a dedicated connection back into the idle cache.""" + self._lock.acquire() + try: + if not self._maxcached or len(self._idle_cache) < self._maxcached: + con._reset(force=self._reset) # rollback possible transaction + # the idle cache is not full, so put it there + self._idle_cache.append(con) # append it to the idle cache + else: # if the idle cache is already full, + con.close() # then close the connection + self._connections -= 1 + self._lock.notify() + finally: + self._lock.release() + + def close(self): + """Close all connections in the pool.""" + self._lock.acquire() + try: + while self._idle_cache: # close all idle connections + con = self._idle_cache.pop(0) + try: + con.close() + except Exception: + pass + if self._maxshared: # close all shared connections + while self._shared_cache: + con = self._shared_cache.pop(0).con + try: + con.close() + except Exception: + pass + self._connections -= 1 + self._lock.notifyAll() + finally: + self._lock.release() + + def __del__(self): + """Delete the pool.""" + try: + self.close() + except: # builtin Exceptions might not exist any more + pass + + def _wait_lock(self): + """Wait until notified or report an error.""" + if not self._blocking: + raise TooManyConnections + self._lock.wait() + + +# Auxiliary classes for pooled connections + +class PooledDedicatedDBConnection: + """Auxiliary proxy class for pooled dedicated connections.""" + + def __init__(self, pool, con): + """Create a pooled dedicated connection. + + pool: the corresponding PooledDB instance + con: the underlying SteadyDB connection + """ + # basic initialization to make finalizer work + self._con = None + # proper initialization of the connection + if not con.threadsafety(): + raise NotSupportedError("数据库模块未分配线程安全") + self._pool = pool + self._con = con + + def close(self): + """Close the pooled dedicated connection.""" + # Instead of actually closing the connection, + # return it to the pool for future reuse. + if self._con: + self._pool.cache(self._con) + self._con = None + + def __getattr__(self, name): + """Proxy all members of the class.""" + if self._con: + return getattr(self._con, name) + else: + raise InvalidConnection + + def __del__(self): + """Delete the pooled connection.""" + try: + self.close() + except: # builtin Exceptions might not exist any more + pass + + +class SharedDBConnection: + """Auxiliary class for shared connections.""" + + def __init__(self, con): + """Create a shared connection. + + con: the underlying SteadyDB connection + """ + self.con = con + self.shared = 1 + + def __lt__(self, other): + if self.con._transaction == other.con._transaction: + return self.shared < other.shared + else: + return not self.con._transaction + + def __le__(self, other): + if self.con._transaction == other.con._transaction: + return self.shared <= other.shared + else: + return not self.con._transaction + + def __eq__(self, other): + return (self.con._transaction == other.con._transaction + and self.shared == other.shared) + + def __ne__(self, other): + return not self.__eq__(other) + + def __gt__(self, other): + return other.__lt__(self) + + def __ge__(self, other): + return other.__le__(self) + + def share(self): + """Increase the share of this connection.""" + self.shared += 1 + + def unshare(self): + """Decrease the share of this connection.""" + self.shared -= 1 + + +class PooledSharedDBConnection: + """Auxiliary proxy class for pooled shared connections.""" + + def __init__(self, pool, shared_con): + """Create a pooled shared connection. + + pool: the corresponding PooledDB instance + con: the underlying SharedDBConnection + """ + # basic initialization to make finalizer work + self._con = None + # proper initialization of the connection + con = shared_con.con + if not con.threadsafety() > 1: + raise NotSupportedError("数据库模块未分配线程安全") + self._pool = pool + self._shared_con = shared_con + self._con = con + + def close(self): + """Close the pooled shared connection.""" + # Instead of actually closing the connection, + # unshare it and/or return it to the pool. + if self._con: + self._pool.unshare(self._shared_con) + self._shared_con = self._con = None + + def __getattr__(self, name): + """Proxy all members of the class.""" + if self._con: + return getattr(self._con, name) + else: + raise InvalidConnection + + def __del__(self): + """Delete the pooled connection.""" + try: + self.close() + except: # builtin Exceptions might not exist any more + pass diff --git a/CACodeFramework/util/DBPool/pooled_pg.py b/CACodeFramework/util/DBPool/pooled_pg.py new file mode 100644 index 0000000..6979376 --- /dev/null +++ b/CACodeFramework/util/DBPool/pooled_pg.py @@ -0,0 +1,186 @@ +try: + from Queue import Queue, Empty, Full +except ImportError: # Python 3 + from queue import Queue, Empty, Full + +from . import __version__ +from .steady_pg import SteadyPgConnection + + +class PooledPgError(Exception): + """General PooledPg error.""" + + +class InvalidConnection(PooledPgError): + """Database connection is invalid.""" + + +class TooManyConnections(PooledPgError): + """Too many database connections were opened.""" + + +class PooledPg: + """Pool for classic PyGreSQL connections. + + After you have created the connection pool, you can use + connection() to get pooled, steady PostgreSQL connections. + """ + + version = __version__ + + def __init__( + self, mincached=0, maxcached=0, + maxconnections=0, blocking=False, + maxusage=None, setsession=None, reset=None, + *args, **kwargs): + """Set up the PostgreSQL connection pool. + + mincached: initial number of connections in the pool + (0 means no connections are made at startup) + maxcached: maximum number of connections in the pool + (0 or None means unlimited pool size) + maxconnections: maximum number of connections generally allowed + (0 or None means an arbitrary number of connections) + blocking: determines behavior when exceeding the maximum + (if this is set to true, block and wait until the number of + connections decreases, otherwise an error will be reported) + maxusage: maximum number of reuses of a single connection + (0 or None means unlimited reuse) + When this maximum usage number of the connection is reached, + the connection is automatically reset (closed and reopened). + setsession: optional list of SQL commands that may serve to prepare + the session, e.g. ["set datestyle to ...", "set time zone ..."] + reset: how connections should be reset when returned to the pool + (0 or None to rollback transcations started with begin(), + 1 to always issue a rollback, 2 for a complete reset) + args, kwargs: the parameters that shall be used to establish + the PostgreSQL connections using class PyGreSQL pg.DB() + """ + self._args, self._kwargs = args, kwargs + self._maxusage = maxusage + self._setsession = setsession + self._reset = reset or 0 + if mincached is None: + mincached = 0 + if maxcached is None: + maxcached = 0 + if maxconnections is None: + maxconnections = 0 + if maxcached: + if maxcached < mincached: + maxcached = mincached + if maxconnections: + if maxconnections < maxcached: + maxconnections = maxcached + # Create semaphore for number of allowed connections generally: + from threading import Semaphore + self._connections = Semaphore(maxconnections) + self._blocking = blocking + else: + self._connections = None + self._cache = Queue(maxcached) # the actual connection pool + # Establish an initial number of database connections: + idle = [self.connection() for i in range(mincached)] + while idle: + idle.pop().close() + + def steady_connection(self): + """Get a steady, unpooled PostgreSQL connection.""" + return SteadyPgConnection(self._maxusage, self._setsession, True, + *self._args, **self._kwargs) + + def connection(self): + """Get a steady, cached PostgreSQL connection from the pool.""" + if self._connections: + if not self._connections.acquire(self._blocking): + raise TooManyConnections + try: + con = self._cache.get(0) + except Empty: + con = self.steady_connection() + return PooledPgConnection(self, con) + + def cache(self, con): + """Put a connection back into the pool cache.""" + try: + if self._reset == 2: + con.reset() # reset the connection completely + else: + if self._reset or con._transaction: + try: + con.rollback() # rollback a possible transaction + except Exception: + pass + self._cache.put(con, 0) # and then put it back into the cache + except Full: + con.close() + if self._connections: + self._connections.release() + + def close(self): + """Close all connections in the pool.""" + while 1: + try: + con = self._cache.get(0) + try: + con.close() + except Exception: + pass + if self._connections: + self._connections.release() + except Empty: + break + + def __del__(self): + """Delete the pool.""" + try: + self.close() + except: # builtin Exceptions might not exist any more + pass + + +# Auxiliary class for pooled connections + +class PooledPgConnection: + """Proxy class for pooled PostgreSQL connections.""" + + def __init__(self, pool, con): + """Create a pooled DB-API 2 connection. + + pool: the corresponding PooledPg instance + con: the underlying SteadyPg connection + """ + self._pool = pool + self._con = con + + def close(self): + """Close the pooled connection.""" + # Instead of actually closing the connection, + # return it to the pool so it can be reused. + if self._con: + self._pool.cache(self._con) + self._con = None + + def reopen(self): + """Reopen the pooled connection.""" + # If the connection is already back in the pool, + # get another connection from the pool, + # otherwise reopen the underlying connection. + if self._con: + self._con.reopen() + else: + self._con = self._pool.connection() + + def __getattr__(self, name): + """Proxy all members of the class.""" + if self._con: + return getattr(self._con, name) + else: + raise InvalidConnection + + def __del__(self): + """Delete the pooled connection.""" + try: + self.close() + except: # builtin Exceptions might not exist any more + pass diff --git a/CACodeFramework/util/DBPool/simple_pooled_db.py b/CACodeFramework/util/DBPool/simple_pooled_db.py new file mode 100644 index 0000000..5e31b29 --- /dev/null +++ b/CACodeFramework/util/DBPool/simple_pooled_db.py @@ -0,0 +1,221 @@ +"""SimplePooledDB - a very simple DB-API 2 database connection pool. + +Implements a pool of threadsafe cached DB-API 2 connections +to a database which are transparently reused. + +This should result in a speedup for persistent applications +such as the "Webware for Python" AppServer. + +For more information on the DB-API 2, see: + https://www.python.org/dev/peps/pep-0249/ +For more information on Webware for Python, see: + https://webwareforpython.github.io/w4py/ + +Measures are taken to make the pool of connections threadsafe +regardless of whether the DB-API 2 module used is threadsafe +on the connection level (threadsafety > 1) or not. It must only +be threadsafe on the module level (threadsafety = 1). If the +DB-API 2 module is threadsafe, the connections will be shared +between threads (keep this in mind if you use transactions). + +Usage: + +The idea behind SimplePooledDB is that it's completely transparent. +After you have established your connection pool, stating the +DB-API 2 module to be used, the number of connections +to be cached in the pool and the connection parameters, e.g. + + import pgdb # import used DB-API 2 module + from dbutils.simple_pooled_db import PooledDB + dbpool = PooledDB(pgdb, 5, host=..., database=..., user=..., ...) + +you can demand database connections from that pool, + + db = dbpool.connection() + +and use them just as if they were ordinary DB-API 2 connections. +It's really just a proxy class. + +db.close() will return the connection to the pool, it will not +actually close it. This is so your existing code works nicely. + +Ideas for improvement: + +* Do not create the maximum number of connections on startup +already, but only a certain number and the rest on demand. +* Detect and transparently reset "bad" connections. +* Connections should have some sort of maximum usage limit +after which they should be automatically closed and reopened. +* Prefer or enforce thread-affinity for the connections, +allowing for both shareable and non-shareable connections. + +Please note that these and other ideas have been already +implemented in in PooledDB, a more sophisticated version +of SimplePooledDB. You might also consider using PersistentDB +instead for thread-affine persistent database connections. +SimplePooledDB may still serve as a very simple reference +and example implementation for developers. + + +Copyright, credits and license: + +* Contributed as MiscUtils/DBPool for Webware for Python + by Dan Green, December 2000 +* Thread safety bug found by Tom Schwaller +* Fixes by Geoff Talvola (thread safety in _threadsafe_getConnection()) +* Clean up by Chuck Esterbrook +* Fix unthreadsafe functions which were leaking, Jay Love +* Eli Green's webware-discuss comments were lifted for additional docs +* Clean-up and detailed commenting, rename and move to DBUtils + by Christoph Zwerschke in September 2005 + +Licensed under the MIT license. +""" + +from . import __version__ + + +class PooledDBError(Exception): + """General PooledDB error.""" + + +class NotSupportedError(PooledDBError): + """DB-API module not supported by PooledDB.""" + + +class PooledDBConnection: + """A proxy class for pooled database connections. + + You don't normally deal with this class directly, + but use PooledDB to get new connections. + """ + + def __init__(self, pool, con): + self._con = con + self._pool = pool + + def close(self): + """Close the pooled connection.""" + # Instead of actually closing the connection, + # return it to the pool so it can be reused. + if self._con is not None: + self._pool.returnConnection(self._con) + self._con = None + + def __getattr__(self, name): + # All other members are the same. + return getattr(self._con, name) + + def __del__(self): + self.close() + + +class PooledDB: + """A very simple database connection pool. + + After you have created the connection pool, + you can get connections using getConnection(). + """ + + version = __version__ + + def __init__(self, dbapi, maxconnections, *args, **kwargs): + """Set up the database connection pool. + + dbapi: the DB-API 2 compliant module you want to use + maxconnections: the number of connections cached in the pool + args, kwargs: the parameters that shall be used to establish + the database connections using connect() + """ + try: + threadsafety = dbapi.threadsafety + except Exception: + threadsafety = None + if threadsafety == 0: + raise NotSupportedError( + "Database module does not support any level of threading.") + elif threadsafety == 1: + # If there is no connection level safety, build + # the pool using the synchronized queue class + # that implements all the required locking semantics. + try: + from Queue import Queue + except ImportError: # Python 3 + from queue import Queue + self._queue = Queue(maxconnections) # create the queue + self.connection = self._unthreadsafe_get_connection + self.addConnection = self._unthreadsafe_add_connection + self.returnConnection = self._unthreadsafe_return_connection + elif threadsafety in (2, 3): + # If there is connection level safety, implement the + # pool with an ordinary list used as a circular buffer. + # We only need a minimum of locking in this case. + from threading import Lock + self._lock = Lock() # create a lock object to be used later + self._nextConnection = 0 # index of the next connection to be used + self._connections = [] # the list of connections + self.connection = self._threadsafe_get_connection + self.addConnection = self._threadsafe_add_connection + self.returnConnection = self._threadsafe_return_connection + else: + raise NotSupportedError( + "Database module threading support cannot be determined.") + # Establish all database connections (it would be better to + # only establish a part of them now, and the rest on demand). + for i in range(maxconnections): + self.addConnection(dbapi.connect(*args, **kwargs)) + + # The following functions are used with DB-API 2 modules + # that do not have connection level threadsafety, like PyGreSQL. + # However, the module must be threadsafe at the module level. + # Note: threadsafe/unthreadsafe refers to the DB-API 2 module, + # not to this class which should be threadsafe in any case. + + def _unthreadsafe_get_connection(self): + """Get a connection from the pool.""" + return PooledDBConnection(self, self._queue.get()) + + def _unthreadsafe_add_connection(self, con): + """Add a connection to the pool.""" + self._queue.put(con) + + def _unthreadsafe_return_connection(self, con): + """Return a connection to the pool. + + In this case, the connections need to be put + back into the queue after they have been used. + This is done automatically when the connection is closed + and should never be called explicitly outside of this module. + """ + self._unthreadsafe_add_connection(con) + + # The following functions are used with DB-API 2 modules + # that are threadsafe at the connection level, like psycopg. + # Note: In this case, connections are shared between threads. + # This may lead to problems if you use transactions. + + def _threadsafe_get_connection(self): + """Get a connection from the pool.""" + self._lock.acquire() + try: + next = self._nextConnection + con = PooledDBConnection(self, self._connections[next]) + next += 1 + if next >= len(self._connections): + next = 0 + self._nextConnection = next + return con + finally: + self._lock.release() + + def _threadsafe_add_connection(self, con): + """Add a connection to the pool.""" + self._connections.append(con) + + def _threadsafe_return_connection(self, con): + """Return a connection to the pool. + + In this case, the connections always stay in the pool, + so there is no need to do anything here. + """ + pass diff --git a/CACodeFramework/util/DBPool/simple_pooled_pg.py b/CACodeFramework/util/DBPool/simple_pooled_pg.py new file mode 100644 index 0000000..844fdd0 --- /dev/null +++ b/CACodeFramework/util/DBPool/simple_pooled_pg.py @@ -0,0 +1,137 @@ +"""SimplePooledPg - a very simple classic PyGreSQL connection pool. + +Implements a pool of threadsafe cached connections +to a PostgreSQL database which are transparently reused, +using the classic (not DB-API 2 compliant) PyGreSQL pg API. + +This should result in a speedup for persistent applications +such as the "Webware for Python" AppServer. + +For more information on PostgreSQL, see: + https://www.postgresql.org/ +For more information on PyGreSQL, see: + http://www.pygresql.org +For more information on Webware for Python, see: + https://webwareforpython.github.io/w4py/ + +Measures are taken to make the pool of connections threadsafe +regardless of the fact that the PyGreSQL pg module itself is +not threadsafe at the connection level. Connections will never be +shared between threads, so you can safely use transactions. + +Usage: + +The idea behind SimplePooledPg is that it's completely transparent. +After you have established your connection pool, stating the +number of connections to be cached in the pool and the +connection parameters, e.g. + + from dbutils.simple_pooled_pg import PooledPg + dbpool = PooledPg(5, host=..., database=..., user=..., ...) + +you can demand database connections from that pool, + + db = dbpool.connection() + +and use them just as if they were ordinary PyGreSQL pg API +connections. It's really just a proxy class. + +db.close() will return the connection to the pool, it will not +actually close it. This is so your existing code works nicely. + +Ideas for improvement: + +* Do not create the maximum number of connections on startup +already, but only a certain number and the rest on demand. +* Detect and transparently reset "bad" connections. The PyGreSQL +pg API provides a status attribute and a reset() method for that. +* Connections should have some sort of "maximum usage limit" +after which they should be automatically closed and reopened. +* Prefer or enforce thread affinity for the connections. + +Please note that these and other ideas have been already +implemented in in PooledPg, a more sophisticated version +of SimplePooledPg. You might also consider using PersistentPg +instead for thread-affine persistent PyGreSQL connections. +SimplePooledPg may still serve as a very simple reference +and example implementation for developers. + + +Copyright, credits and license: + +* Contributed as supplement for Webware for Python and PyGreSQL + by Christoph Zwerschke in September 2005 +* Based on the code of DBPool, contributed to Webware for Python + by Dan Green in December 2000 + +Licensed under the MIT license. +""" + +from pg import DB as PgConnection + +from . import __version__ + + +class PooledPgConnection: + """A proxy class for pooled PostgreSQL connections. + + You don't normally deal with this class directly, + but use PooledPg to get new connections. + """ + + def __init__(self, pool, con): + self._con = con + self._pool = pool + + def close(self): + """Close the pooled connection.""" + # Instead of actually closing the connection, + # return it to the pool so it can be reused. + if self._con is not None: + self._pool.cache(self._con) + self._con = None + + def __getattr__(self, name): + # All other members are the same. + return getattr(self._con, name) + + def __del__(self): + self.close() + + +class PooledPg: + """A very simple PostgreSQL connection pool. + + After you have created the connection pool, + you can get connections using getConnection(). + """ + + version = __version__ + + def __init__(self, maxconnections, *args, **kwargs): + """Set up the PostgreSQL connection pool. + + maxconnections: the number of connections cached in the pool + args, kwargs: the parameters that shall be used to establish + the PostgreSQL connections using pg.connect() + """ + # Since there is no connection level safety, we + # build the pool using the synchronized queue class + # that implements all the required locking semantics. + try: + from Queue import Queue + except ImportError: # Python 3 + from queue import Queue + self._queue = Queue(maxconnections) + # Establish all database connections (it would be better to + # only establish a part of them now, and the rest on demand). + for i in range(maxconnections): + self.cache(PgConnection(*args, **kwargs)) + + def cache(self, con): + """Add or return a connection to the pool.""" + self._queue.put(con) + + def connection(self): + """Get a connection from the pool.""" + return PooledPgConnection(self, self._queue.get()) diff --git a/CACodeFramework/util/DBPool/steady_db.py b/CACodeFramework/util/DBPool/steady_db.py new file mode 100644 index 0000000..f4d2fd0 --- /dev/null +++ b/CACodeFramework/util/DBPool/steady_db.py @@ -0,0 +1,609 @@ +import sys + +from . import __version__ + +baseint = int + + +class SteadyDBError(Exception): + """General SteadyDB error.""" + + +class InvalidCursor(SteadyDBError): + """Database cursor is invalid.""" + + +def connect( + creator, maxusage=None, setsession=None, + failures=None, ping=1, closeable=True, *args, **kwargs): + """A tough version of the connection constructor of a DB-API 2 module. + + creator: either an arbitrary function returning new DB-API 2 compliant + connection objects or a DB-API 2 compliant database module + maxusage: maximum usage limit for the underlying DB-API 2 connection + (number of database operations, 0 or None means unlimited usage) + callproc(), execute() and executemany() count as one operation. + When the limit is reached, the connection is automatically reset. + setsession: an optional list of SQL commands that may serve to prepare + the session, e.g. ["set datestyle to german", "set time zone mez"] + failures: an optional exception class or a tuple of exception classes + for which the failover mechanism shall be applied, if the default + (OperationalError, InternalError) is not adequate + ping: determines when the connection should be checked with ping() + (0 = None = never, 1 = default = when _ping_check() is called, + 2 = whenever a cursor is created, 4 = when a query is executed, + 7 = always, and all other bit combinations of these values) + closeable: if this is set to false, then closing the connection will + be silently ignored, but by default the connection can be closed + args, kwargs: the parameters that shall be passed to the creator + function or the connection constructor of the DB-API 2 module + """ + return SteadyDBConnection( + creator, maxusage, setsession, + failures, ping, closeable, *args, **kwargs) + + +class SteadyDBConnection: + """A "tough" version of DB-API 2 connections.""" + + version = __version__ + + def __init__( + self, creator, maxusage=None, setsession=None, + failures=None, ping=1, closeable=True, *args, **kwargs): + """Create a "tough" DB-API 2 connection.""" + # basic initialization to make finalizer work + self._con = None + self._closed = True + # proper initialization of the connection + try: + self._creator = creator.connect + self._dbapi = creator + except AttributeError: + # try finding the DB-API 2 module via the connection creator + self._creator = creator + try: + self._dbapi = creator.dbapi + except AttributeError: + try: + self._dbapi = sys.modules[creator.__module__] + if self._dbapi.connect != creator: + raise AttributeError + except (AttributeError, KeyError): + self._dbapi = None + try: + self._threadsafety = creator.threadsafety + except AttributeError: + try: + self._threadsafety = self._dbapi.threadsafety + except AttributeError: + self._threadsafety = None + if not callable(self._creator): + raise TypeError("%r is not a connection provider." % (creator,)) + if maxusage is None: + maxusage = 0 + if not isinstance(maxusage, baseint): + raise TypeError("'maxusage' must be an integer value.") + self._maxusage = maxusage + self._setsession_sql = setsession + if failures is not None and not isinstance( + failures, tuple) and not issubclass(failures, Exception): + raise TypeError("'failures' must be a tuple of exceptions.") + self._failures = failures + self._ping = ping if isinstance(ping, int) else 0 + self._closeable = closeable + self._args, self._kwargs = args, kwargs + self._store(self._create()) + + def __enter__(self): + """Enter the runtime context for the connection object.""" + return self + + def __exit__(self, *exc): + """Exit the runtime context for the connection object. + + This does not close the connection, but it ends a transaction. + """ + if exc[0] is None and exc[1] is None and exc[2] is None: + self.commit() + else: + self.rollback() + + def _create(self): + """Create a new connection using the creator function.""" + con = self._creator(*self._args, **self._kwargs) + try: + try: + if self._dbapi.connect != self._creator: + raise AttributeError + except AttributeError: + # try finding the DB-API 2 module via the connection itself + try: + mod = con.__module__ + except AttributeError: + mod = None + while mod: + try: + self._dbapi = sys.modules[mod] + if not callable(self._dbapi.connect): + raise AttributeError + except (AttributeError, KeyError): + pass + else: + break + i = mod.rfind('.') + if i < 0: + mod = None + else: + mod = mod[:i] + else: + try: + mod = con.OperationalError.__module__ + except AttributeError: + mod = None + while mod: + try: + self._dbapi = sys.modules[mod] + if not callable(self._dbapi.connect): + raise AttributeError + except (AttributeError, KeyError): + pass + else: + break + i = mod.rfind('.') + if i < 0: + mod = None + else: + mod = mod[:i] + else: + self._dbapi = None + if self._threadsafety is None: + try: + self._threadsafety = self._dbapi.threadsafety + except AttributeError: + try: + self._threadsafety = con.threadsafety + except AttributeError: + pass + if self._failures is None: + try: + self._failures = ( + self._dbapi.OperationalError, + self._dbapi.InternalError) + except AttributeError: + try: + self._failures = ( + self._creator.OperationalError, + self._creator.InternalError) + except AttributeError: + try: + self._failures = ( + con.OperationalError, con.InternalError) + except AttributeError: + raise AttributeError( + "Could not determine failure exceptions" + " (please set failures or creator.dbapi).") + if isinstance(self._failures, tuple): + self._failure = self._failures[0] + else: + self._failure = self._failures + self._setsession(con) + except Exception as error: + # the database module could not be determined + # or the session could not be prepared + try: # close the connection first + con.close() + except Exception: + pass + raise error # re-raise the original error again + return con + + def _setsession(self, con=None): + """Execute the SQL commands for session preparation.""" + if con is None: + con = self._con + if self._setsession_sql: + cursor = con.cursor() + for sql in self._setsession_sql: + cursor.execute(sql) + cursor.close() + + def _store(self, con): + """Store a database connection for subsequent use.""" + self._con = con + self._transaction = False + self._closed = False + self._usage = 0 + + def _close(self): + """Close the tough connection. + + You can always close a tough connection with this method + and it will not complain if you close it more than once. + """ + if not self._closed: + try: + self._con.close() + except Exception: + pass + self._transaction = False + self._closed = True + + def _reset(self, force=False): + """Reset a tough connection. + + Rollback if forced or the connection was in a transaction. + """ + if not self._closed and (force or self._transaction): + try: + self.rollback() + except Exception: + pass + + def _ping_check(self, ping=1, reconnect=True): + """Check whether the connection is still alive using ping(). + + If the the underlying connection is not active and the ping + parameter is set accordingly, the connection will be recreated + unless the connection is currently inside a transaction. + """ + if ping & self._ping: + try: # if possible, ping the connection + try: # pass a reconnect=False flag if this is supported + alive = self._con.ping(False) + except TypeError: # the reconnect flag is not supported + alive = self._con.ping() + except (AttributeError, IndexError, TypeError, ValueError): + self._ping = 0 # ping() is not available + alive = None + reconnect = False + except Exception: + alive = False + else: + if alive is None: + alive = True + if alive: + reconnect = False + if reconnect and not self._transaction: + try: # try to reopen the connection + con = self._create() + except Exception: + pass + else: + self._close() + self._store(con) + alive = True + return alive + + def dbapi(self): + """Return the underlying DB-API 2 module of the connection.""" + if self._dbapi is None: + raise AttributeError( + "Could not determine DB-API 2 module" + " (please set creator.dbapi).") + return self._dbapi + + def threadsafety(self): + """Return the thread safety level of the connection.""" + if self._threadsafety is None: + if self._dbapi is None: + raise AttributeError( + "Could not determine threadsafety" + " (please set creator.dbapi or creator.threadsafety).") + return 0 + return self._threadsafety + + def close(self): + """Close the tough connection. + + You are allowed to close a tough connection by default + and it will not complain if you close it more than once. + + You can disallow closing connections by setting + the closeable parameter to something false. In this case, + closing tough connections will be silently ignored. + """ + if self._closeable: + self._close() + elif self._transaction: + self._reset() + + def begin(self, *args, **kwargs): + """Indicate the beginning of a transaction. + + During a transaction, connections won't be transparently + replaced, and all errors will be raised to the application. + + If the underlying driver supports this method, it will be called + with the given parameters (e.g. for distributed transactions). + """ + self._transaction = True + try: + begin = self._con.begin + except AttributeError: + pass + else: + begin(*args, **kwargs) + + def commit(self): + """Commit any pending transaction.""" + self._transaction = False + try: + self._con.commit() + except self._failures as error: # cannot commit + try: # try to reopen the connection + con = self._create() + except Exception: + pass + else: + self._close() + self._store(con) + raise error # re-raise the original error + + def rollback(self): + """Rollback pending transaction.""" + self._transaction = False + try: + self._con.rollback() + except self._failures as error: # cannot rollback + try: # try to reopen the connection + con = self._create() + except Exception: + pass + else: + self._close() + self._store(con) + raise error # re-raise the original error + + def cancel(self): + """Cancel a long-running transaction. + + If the underlying driver supports this method, it will be called. + """ + self._transaction = False + try: + cancel = self._con.cancel + except AttributeError: + pass + else: + cancel() + + def ping(self, *args, **kwargs): + """Ping connection.""" + return self._con.ping(*args, **kwargs) + + def _cursor(self, *args, **kwargs): + """A "tough" version of the method cursor().""" + # The args and kwargs are not part of the standard, + # but some database modules seem to use these. + transaction = self._transaction + if not transaction: + self._ping_check(2) + try: + # check whether the connection has been used too often + if (self._maxusage and self._usage >= self._maxusage + and not transaction): + raise self._failure + cursor = self._con.cursor(*args, **kwargs) # try to get a cursor + except self._failures as error: # error in getting cursor + try: # try to reopen the connection + con = self._create() + except Exception: + pass + else: + try: # and try one more time to get a cursor + cursor = con.cursor(*args, **kwargs) + except Exception: + pass + else: + self._close() + self._store(con) + if transaction: + raise error # re-raise the original error again + return cursor + try: + con.close() + except Exception: + pass + if transaction: + self._transaction = False + raise error # re-raise the original error again + return cursor + + def cursor(self, *args, **kwargs): + """Return a new Cursor Object using the connection.""" + return SteadyDBCursor(self, *args, **kwargs) + + def __del__(self): + """Delete the steady connection.""" + try: + self._close() # make sure the connection is closed + except: # builtin Exceptions might not exist any more + pass + + +class SteadyDBCursor: + """A "tough" version of DB-API 2 cursors.""" + + def __init__(self, con, *args, **kwargs): + """Create a "tough" DB-API 2 cursor.""" + # basic initialization to make finalizer work + self._cursor = None + self._closed = True + # proper initialization of the cursor + self._con = con + self._args, self._kwargs = args, kwargs + self._clearsizes() + try: + self._cursor = con._cursor(*args, **kwargs) + except AttributeError: + raise TypeError("%r is not a SteadyDBConnection." % (con,)) + self._closed = False + + def __enter__(self): + """Enter the runtime context for the cursor object.""" + return self + + def __exit__(self, *exc): + """Exit the runtime context for the cursor object.""" + self.close() + + def setinputsizes(self, sizes): + """Store input sizes in case cursor needs to be reopened.""" + self._inputsizes = sizes + + def setoutputsize(self, size, column=None): + """Store output sizes in case cursor needs to be reopened.""" + self._outputsizes[column] = size + + def _clearsizes(self): + """Clear stored input and output sizes.""" + self._inputsizes = [] + self._outputsizes = {} + + def _setsizes(self, cursor=None): + """Set stored input and output sizes for cursor execution.""" + if cursor is None: + cursor = self._cursor + if self._inputsizes: + cursor.setinputsizes(self._inputsizes) + for column, size in self._outputsizes.items(): + if column is None: + cursor.setoutputsize(size) + else: + cursor.setoutputsize(size, column) + + def close(self): + """Close the tough cursor. + + It will not complain if you close it more than once. + """ + if not self._closed: + try: + self._cursor.close() + except Exception: + pass + self._closed = True + + def _get_tough_method(self, name): + """Return a "tough" version of the given cursor method.""" + + def tough_method(*args, **kwargs): + execute = name.startswith('execute') + con = self._con + transaction = con._transaction + if not transaction: + con._ping_check(4) + try: + # check whether the connection has been used too often + if (con._maxusage and con._usage >= con._maxusage + and not transaction): + raise con._failure + if execute: + self._setsizes() + method = getattr(self._cursor, name) + result = method(*args, **kwargs) # try to execute + if execute: + self._clearsizes() + except con._failures as error: # execution error + if not transaction: + try: + cursor2 = con._cursor( + *self._args, **self._kwargs) # open new cursor + except Exception: + pass + else: + try: # and try one more time to execute + if execute: + self._setsizes(cursor2) + method = getattr(cursor2, name) + result = method(*args, **kwargs) + if execute: + self._clearsizes() + except Exception: + pass + else: + self.close() + self._cursor = cursor2 + con._usage += 1 + return result + try: + cursor2.close() + except Exception: + pass + try: # try to reopen the connection + con2 = con._create() + except Exception: + pass + else: + try: + cursor2 = con2.cursor( + *self._args, **self._kwargs) # open new cursor + except Exception: + pass + else: + if transaction: + self.close() + con._close() + con._store(con2) + self._cursor = cursor2 + raise error # raise the original error again + error2 = None + try: # try one more time to execute + if execute: + self._setsizes(cursor2) + method2 = getattr(cursor2, name) + result = method2(*args, **kwargs) + if execute: + self._clearsizes() + except error.__class__: # same execution error + use2 = False + error2 = error + except Exception as error: # other execution errors + use2 = True + error2 = error + else: + use2 = True + if use2: + self.close() + con._close() + con._store(con2) + self._cursor = cursor2 + con._usage += 1 + if error2: + raise error2 # raise the other error + return result + try: + cursor2.close() + except Exception: + pass + try: + con2.close() + except Exception: + pass + if transaction: + self._transaction = False + raise error # re-raise the original error again + else: + con._usage += 1 + return result + + return tough_method + + def __getattr__(self, name): + """Inherit methods and attributes of underlying cursor.""" + if self._cursor: + if name.startswith(('execute', 'call')): + # make execution methods "tough" + return self._get_tough_method(name) + else: + return getattr(self._cursor, name) + else: + raise InvalidCursor + + def __del__(self): + """Delete the steady cursor.""" + try: + self.close() # make sure the cursor is closed + except: # builtin Exceptions might not exist any more + pass diff --git a/CACodeFramework/util/DBPool/steady_pg.py b/CACodeFramework/util/DBPool/steady_pg.py new file mode 100644 index 0000000..3a500ee --- /dev/null +++ b/CACodeFramework/util/DBPool/steady_pg.py @@ -0,0 +1,328 @@ +"""SteadyPg - hardened classic PyGreSQL connections. + +Implements steady connections to a PostgreSQL database +using the classic (not DB-API 2 compliant) PyGreSQL API. + +The connections are transparently reopened when they are +closed or the database connection has been lost or when +they are used more often than an optional usage limit. +Only connections which have been marked as being in a database +transaction with a begin() call will not be silently replaced. + +A typical situation where database connections are lost +is when the database server or an intervening firewall is +shutdown and restarted for maintenance reasons. In such a +case, all database connections would become unusable, even +though the database service may be already available again. + +The "hardened" connections provided by this module will +make the database connections immediately available again. + +This results in a steady PostgreSQL connection that can be used +by PooledPg or PersistentPg to create pooled or persistent +connections to a PostgreSQL database in a threaded environment +such as the application server of "Webware for Python." +Note, however, that the connections themselves are not thread-safe. + +For more information on PostgreSQL, see: + https://www.postgresql.org/ +For more information on PyGreSQL, see: + http://www.pygresql.org +For more information on Webware for Python, see: + https://webwareforpython.github.io/w4py/ + + +Usage: + +You can use the class SteadyPgConnection in the same way as you +would use the class DB from the classic PyGreSQL API module db. +The only difference is that you may specify a usage limit as the +first parameter when you open a connection (set it to None +if you prefer unlimited usage), and an optional list of commands +that may serve to prepare the session as the second parameter, +and you can specify whether is is allowed to close the connection +(by default this is true). When the connection to the PostgreSQL +database is lost or has been used too often, it will be automatically +reset, without further notice. + + from dbutils.steady_pg import SteadyPgConnection + db = SteadyPgConnection(10000, ["set datestyle to german"], + host=..., dbname=..., user=..., ...) + ... + result = db.query('...') + ... + db.close() + + +Ideas for improvement: + +* Alternatively to the maximum number of uses, + implement a maximum time to live for connections. +* Optionally log usage and loss of connection. + + +Copyright, credits and license: + +* Contributed as supplement for Webware for Python and PyGreSQL + by Christoph Zwerschke in September 2005 + +Licensed under the MIT license. +""" + +from pg import DB as PgConnection + +from . import __version__ + +try: + baseint = (int, long) +except NameError: # Python 3 + baseint = int + + +class SteadyPgError(Exception): + """General SteadyPg error.""" + + +class InvalidConnection(SteadyPgError): + """Database connection is invalid.""" + + +class SteadyPgConnection: + """Class representing steady connections to a PostgreSQL database. + + Underlying the connection is a classic PyGreSQL pg API database + connection which is reset if the connection is lost or used too often. + Thus the resulting connection is steadier ("tough and self-healing"). + + If you want the connection to be persistent in a threaded environment, + then you should not deal with this class directly, but use either the + PooledPg module or the PersistentPg module to get the connections. + """ + + version = __version__ + + def __init__( + self, maxusage=None, setsession=None, closeable=True, + *args, **kwargs): + """Create a "tough" PostgreSQL connection. + + maxusage: maximum usage limit for the underlying PyGreSQL connection + (number of uses, 0 or None means unlimited usage) + When this limit is reached, the connection is automatically reset. + setsession: optional list of SQL commands that may serve to prepare + the session, e.g. ["set datestyle to ...", "set time zone ..."] + closeable: if this is set to false, then closing the connection will + be silently ignored, but by default the connection can be closed + args, kwargs: the parameters that shall be used to establish + the PostgreSQL connections with PyGreSQL using pg.DB() + """ + # basic initialization to make finalizer work + self._con = None + self._closed = True + # proper initialization of the connection + if maxusage is None: + maxusage = 0 + if not isinstance(maxusage, baseint): + raise TypeError("'maxusage' must be an integer value.") + self._maxusage = maxusage + self._setsession_sql = setsession + self._closeable = closeable + self._con = PgConnection(*args, **kwargs) + self._transaction = False + self._closed = False + self._setsession() + self._usage = 0 + + def __enter__(self): + """Enter the runtime context. This will start a transaction.""" + self.begin() + return self + + def __exit__(self, *exc): + """Exit the runtime context. This will end the transaction.""" + if exc[0] is None and exc[1] is None and exc[2] is None: + self.commit() + else: + self.rollback() + + def _setsession(self): + """Execute the SQL commands for session preparation.""" + if self._setsession_sql: + for sql in self._setsession_sql: + self._con.query(sql) + + def _close(self): + """Close the tough connection. + + You can always close a tough connection with this method + and it will not complain if you close it more than once. + """ + if not self._closed: + try: + self._con.close() + except Exception: + pass + self._transaction = False + self._closed = True + + def close(self): + """Close the tough connection. + + You are allowed to close a tough connection by default + and it will not complain if you close it more than once. + + You can disallow closing connections by setting + the closeable parameter to something false. In this case, + closing tough connections will be silently ignored. + """ + if self._closeable: + self._close() + elif self._transaction: + self.reset() + + def reopen(self): + """Reopen the tough connection. + + It will not complain if the connection cannot be reopened. + """ + try: + self._con.reopen() + except Exception: + if self._transcation: + self._transaction = False + try: + self._con.query('rollback') + except Exception: + pass + else: + self._transaction = False + self._closed = False + self._setsession() + self._usage = 0 + + def reset(self): + """Reset the tough connection. + + If a reset is not possible, tries to reopen the connection. + It will not complain if the connection is already closed. + """ + try: + self._con.reset() + self._transaction = False + self._setsession() + self._usage = 0 + except Exception: + try: + self.reopen() + except Exception: + try: + self.rollback() + except Exception: + pass + + def begin(self, sql=None): + """Begin a transaction.""" + self._transaction = True + try: + begin = self._con.begin + except AttributeError: + return self._con.query(sql or 'begin') + else: + # use existing method if available + if sql: + return begin(sql=sql) + else: + return begin() + + def end(self, sql=None): + """Commit the current transaction.""" + self._transaction = False + try: + end = self._con.end + except AttributeError: + return self._con.query(sql or 'end') + else: + if sql: + return end(sql=sql) + else: + return end() + + def commit(self, sql=None): + """Commit the current transaction.""" + self._transaction = False + try: + commit = self._con.commit + except AttributeError: + return self._con.query(sql or 'commit') + else: + if sql: + return commit(sql=sql) + else: + return commit() + + def rollback(self, sql=None): + """Rollback the current transaction.""" + self._transaction = False + try: + rollback = self._con.rollback + except AttributeError: + return self._con.query(sql or 'rollback') + else: + if sql: + return rollback(sql=sql) + else: + return rollback() + + def _get_tough_method(self, method): + """Return a "tough" version of a connection class method. + + The tough version checks whether the connection is bad (lost) + and automatically and transparently tries to reset the connection + if this is the case (for instance, the database has been restarted). + """ + def tough_method(*args, **kwargs): + transaction = self._transaction + if not transaction: + try: + # check whether connection status is bad + # or the connection has been used too often + if not self._con.db.status or ( + self._maxusage and self._usage >= self._maxusage): + raise AttributeError + except Exception: + self.reset() # then reset the connection + try: + result = method(*args, **kwargs) # try connection method + except Exception: # error in query + if transaction: # inside a transaction + self._transaction = False + raise # propagate the error + elif self._con.db.status: # if it was not a connection problem + raise # then propagate the error + else: # otherwise + self.reset() # reset the connection + result = method(*args, **kwargs) # and try one more time + self._usage += 1 + return result + return tough_method + + def __getattr__(self, name): + """Inherit the members of the standard connection class. + + Some methods are made "tougher" than in the standard version. + """ + if self._con: + attr = getattr(self._con, name) + if (name in ('query', 'get', 'insert', 'update', 'delete') + or name.startswith('get_')): + attr = self._get_tough_method(attr) + return attr + else: + raise InvalidConnection + + def __del__(self): + """Delete the steady connection.""" + try: + self._close() # make sure the connection is closed + except: # builtin Exceptions might not exist any more + pass diff --git a/CACodeFramework/util/DbUtil.py b/CACodeFramework/util/DbUtil.py index ec8cd06..567d8ea 100644 --- a/CACodeFramework/util/DbUtil.py +++ b/CACodeFramework/util/DbUtil.py @@ -2,8 +2,8 @@ import sys import threading from CACodeFramework.cacode.Modes import Singleton +from CACodeFramework.util.DBPool.pooled_db import PooledDB from CACodeFramework.util.Log import CACodeLog -import pymysql def parse_kwa(db, **kwargs): @@ -16,19 +16,25 @@ def parse_kwa(db, **kwargs): sql:处理过并加上%s的sql语句 params:需要填充的字段 print_sql:是否打印sql语句 + many:是否有多个 """ try: cursor = db.cursor() + many_flay = 'many' in kwargs.keys() and kwargs['many'] if 'params' in kwargs.keys(): sql = cursor.mogrify(kwargs['sql'], kwargs['params']) else: sql = kwargs['sql'] if 'print_sql' in kwargs.keys() and kwargs['print_sql'] is True: _l = sys._getframe().f_back.f_lineno - CACodeLog.log(obj=db, line=_l, task_name='Print Sql', msg=sql) + msg = f'{kwargs["sql"]} - many=True' if many_flay else kwargs['sql'] + CACodeLog.log(obj=db, line=_l, task_name='Print Sql', msg=msg) - cursor.execute(sql) + if many_flay: + cursor.executemany(kwargs['sql'], kwargs['pojo_data']) + else: + cursor.execute(sql) return cursor except Exception as e: db.rollback() @@ -43,7 +49,7 @@ class Db_opera(object): _instance_lock = threading.Lock() - def __init__(self, host, port, user, password, database, charset='utf8', creator=pymysql, maxconnections=6, + def __init__(self, host, port, user, password, database, charset='utf8', creator=None, maxconnections=6, mincached=2, maxcached=5, maxshared=3, blocking=True, setsession=[], ping=0, POOL=None): """ @@ -124,7 +130,6 @@ class Db_opera(object): """ 初始化数据库连接池 """ - from dbutils.pooled_db import PooledDB if self.POOL is None: self.POOL = PooledDB(creator=self.creator, maxconnections=self.maxconnections, mincached=self.mincached, maxcached=self.maxcached, @@ -178,17 +183,18 @@ class Db_opera(object): finally: db.close() - def insert(self, **kwargs): + def insert(self, many=False, **kwargs): """ 执行插入语句 :param kwargs:包含所有参数: last_id:是否需要返回最后一行数据,默认False sql:处理过并加上%s的sql语句 params:需要填充的字段 + :param many:是否为多行执行 """ db = self.get_conn() try: - cursor = parse_kwa(db=db, **kwargs) + cursor = parse_kwa(db=db, many=many, **kwargs) db.commit() # 最后一行ID last = cursor.lastrowid @@ -205,6 +211,16 @@ class Db_opera(object): finally: db.close() + def insert_many(self, **kwargs): + """ + 执行插入语句 + :param kwargs:包含所有参数: + last_id:是否需要返回最后一行数据,默认False + sql:处理过并加上%s的sql语句 + params:需要填充的字段 + """ + db = self.get_conn() + def update(self, **kwargs): """ 执行更新语句 diff --git a/CACodeFramework/util/Log.py b/CACodeFramework/util/Log.py index 8738e1f..a5d4c87 100644 --- a/CACodeFramework/util/Log.py +++ b/CACodeFramework/util/Log.py @@ -231,7 +231,17 @@ class CACodeLog(object): fontColor=ConsoleColor.FontColor.WARNING_COLOR) @staticmethod - def log_error(msg, obj=None, line=sys._getframe().f_back.f_lineno, task_name='Task', LogObject=None): + def log_error(msg, obj=None, line=sys._getframe().f_back.f_lineno, task_name='Task', LogObject=None, + raise_exception=False): + """ + :param msg:描述 + :param line:行 + :param task_name:线程唯一名称 + :param LogObject:日志对象 + :param raise_exception:是否抛出异常 + """ + if raise_exception: + raise obj(msg) CACodeLog.log(msg=msg, obj=obj, line=line, task_name=task_name, LogObject=LogObject, field=e_fields.Error(), func=LogObject.warn if LogObject is not None else None, fontColor=ConsoleColor.FontColor.ERROR_COLOR) diff --git a/CACodeFramework/util/ParseUtil.py b/CACodeFramework/util/ParseUtil.py index f8d1008..c40c5a0 100644 --- a/CACodeFramework/util/ParseUtil.py +++ b/CACodeFramework/util/ParseUtil.py @@ -1,6 +1,8 @@ import copy +from CACodeFramework.exception.e_fields import FieldNotExist from CACodeFramework.pojoManager import tag +from CACodeFramework.util.Log import CACodeLog def parse_main(*args, to_str=False, is_field=False): @@ -120,21 +122,34 @@ class ParseUtil(object): :param values_str:values的sql方言 :return: """ - _dict = pojo.fields # 得到所有的键 - keys = pojo.fields + ParseUtil.fieldExist(pojo, 'fields', raise_exception=True) # 在得到值之后解析是否为空并删除为空的值和对应的字段 cp_value = [] # 复制新的一张字段信息 keys_copy = [] + keys_c, cp_v = ParseUtil.parse_pojo(pojo) + keys_copy += keys_c + cp_value += cp_v + + return ParseUtil().parse_insert(keys_copy, cp_value, __table_name__, insert_str=insert_str, + values_str=values_str) + + @staticmethod + def parse_pojo(pojo) -> dict: + keys = pojo.fields + # 在得到值之后解析是否为空并删除为空的值和对应的字段 + cp_value = [] + # 复制新的一张字段信息 + keys_copy = [] values = [getattr(pojo, v) for v in keys] for i, j in enumerate(values): if j is not None and not ParseUtil.is_default(j): keys_copy.append(keys[i]) cp_value.append(j) - return ParseUtil().parse_insert(keys_copy, cp_value, __table_name__, insert_str=insert_str, - values_str=values_str) + + return keys_copy, cp_value @staticmethod def parse_obj(data: dict, instance: object) -> object: @@ -263,11 +278,55 @@ class ParseUtil(object): setattr(obj, key, val) @staticmethod - def fieldExit(obj, field, el=None): + def fieldExist(obj, field, el=None, raise_exception=False): """ 在对象中获取一个字段的值,如果这个字段不存在,则将值设置为`el` """ - if hasattr(obj, field): - return getattr(obj, field) + if isinstance(obj, dict): + if field in obj.keys(): + return obj[field] + else: + if raise_exception: + raise CACodeLog.log_error( + msg=f'the key of `pojo` cannot be found in the `{obj.__class__.__name__}`', + obj=FieldNotExist, + raise_exception=True) + else: + return el else: - return el + if hasattr(obj, field): + return getattr(obj, field) + else: + if raise_exception: + raise CACodeLog.log_error( + msg=f'the key of `pojo` cannot be found in the `{obj.__class__.__name__}`', + obj=FieldNotExist, + raise_exception=True) + else: + return el + + @staticmethod + def parse_pojo_many(keys_name: str, pojo_many: list) -> tuple: + + # 在得到值之后解析是否为空并删除为空的值和对应的字段 + cp_value = [] + for pojo in pojo_many: + keys_c, cp_v = ParseUtil.parse_pojo(pojo) + cp_value.append(tuple(cp_v)) + # 真实值 + return cp_value + # if not isinstance(pojo_many, list): + # CACodeLog.log_error(msg='Pojo definition is incorrect') + # + # result = [] + # for pojo_item in pojo_many: + # ParseUtil.fieldExist(pojo_item, keys_name, raise_exception=True) + # patterns = getattr(pojo_item, keys_name) + # if not isinstance(patterns, list): + # CACodeLog.log_error(msg=f'The type of `{keys_name}` is not a list') + # field_values = [] + # for field_name in patterns: + # field_values.append(getattr(pojo_item, field_name)) + # + # result.append(tuple(field_values)) + # return tuple(result) diff --git a/test/modules/BaseData.py b/test/modules/BaseData.py deleted file mode 100644 index 8f1e339..0000000 --- a/test/modules/BaseData.py +++ /dev/null @@ -1,8 +0,0 @@ -from CACodeFramework.pojoManager import Manage -from CACodeFramework.pojoManager.Manage import Pojo - - -class BaseData(Pojo): - def __init__(self): - self.id = Manage.tag.intField() - super(BaseData, self).__init__() diff --git a/test/modules/DatabaseConf.py b/test/modules/DatabaseConf.py index 951556c..ae975c2 100644 --- a/test/modules/DatabaseConf.py +++ b/test/modules/DatabaseConf.py @@ -1,3 +1,5 @@ +import pymysql + from CACodeFramework.util import Config @@ -18,4 +20,18 @@ class ConF(Config.config): password = "Zyzs1234.." database = "zh" - super(ConF, self).__init__(host, port, database, user, password, charset) + super(ConF, self).__init__(host, port, database, user, password, charset, creator=pymysql) + + +class DemoConF(Config.config): + def __init__(self, + host='localhost', + port=3306, + database='demo', + user='root', + password='123456', + charset='utf8'): + self.set_field('print_sql', True) + self.set_field('last_id', True) + + super(DemoConF, self).__init__(host, port, database, user, password, charset, creator=pymysql) diff --git a/test/modules/DemoTable.py b/test/modules/DemoTable.py new file mode 100644 index 0000000..eac99cf --- /dev/null +++ b/test/modules/DemoTable.py @@ -0,0 +1,19 @@ +from CACodeFramework.anno.annos import Table +from CACodeFramework.pojoManager import Manage +from CACodeFramework.pojoManager.Manage import Pojo +from test.modules.DatabaseConf import DemoConF + + +@Table(name='demo_table', msg='') +class DemoTable(Pojo): + def __init__(self, **kwargs): + self.t_id = Manage.tag.intField() + self.t_name = Manage.tag.intField() + self.t_msg = Manage.tag.intField() + self.t_pwd = Manage.tag.intField() + self.create_time = Manage.tag.datetimeField() + self.update_time = Manage.tag.datetimeField() + super(DemoTable, self).__init__(config_obj=DemoConF(), log_conf={ + 'path': "/log/", + 'save_flag': True + }, **kwargs) diff --git a/test/testFunc.py b/test/testFunc.py index c78e201..43f27cb 100644 --- a/test/testFunc.py +++ b/test/testFunc.py @@ -1,19 +1,33 @@ import time from CACodeFramework.cacode.Factory import Factory +from CACodeFramework.util.Log import CACodeLog +from test.modules.DemoTable import DemoTable class MyFactory(Factory): modules = [ 'test.modules.Demo', - 'test.modules.BaseData', + 'test.modules.DemoTable', ] +def set_many(): + a = [] + for i in range(10): + a.append(DemoTable(t_name='测试name', t_msg='测试msg', t_pwd='测试pwd')) + return a + + +info = CACodeLog.log +warn = CACodeLog.warning + if __name__ == '__main__': t1 = time.time() - DemoTable = MyFactory.createInstance('Demo.DemoTable') + demoTable = MyFactory.createInstance('DemoTable.DemoTable') count = 0 - result = DemoTable.orm.find().limit(10).end() - print(result.to_json(True)) - print(time.time() - t1) + # result = demoTable.find_all() + result = demoTable.create(pojo=set_many(), many=True) + info(f'time:{time.time() - t1}') + info(f'count:{len(result)}') + warn(result) -- Gitee From 184245107498bda3e92d5013d90dbfea98251e5b Mon Sep 17 00:00:00 2001 From: CACode Date: Thu, 6 May 2021 16:45:27 +0800 Subject: [PATCH 2/2] On branch cacode --- test/modules/DatabaseConf.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/test/modules/DatabaseConf.py b/test/modules/DatabaseConf.py index ae975c2..09c8111 100644 --- a/test/modules/DatabaseConf.py +++ b/test/modules/DatabaseConf.py @@ -14,12 +14,6 @@ class ConF(Config.config): self.set_field('print_sql', True) self.set_field('last_id', True) - host = "203.195.161.175" - port = 3306 - user = "root" - password = "Zyzs1234.." - database = "zh" - super(ConF, self).__init__(host, port, database, user, password, charset, creator=pymysql) -- Gitee