项目用到了python,习惯了TP的链式操作,自己动手写了一个python的Model层

实现了Pymysql的原生连接和dbutils的连接池连接

模型基类:

# coding: utf-8
import pymysql
import threading
from Include.Logger import get_logger
from Include.Setting import app_settings
from dbutils.pooled_db import PooledDB


class MysqlPool:
    config = {
        'creator': pymysql,
        'host': app_settings.get('ManagerHost', '127.0.0.1'),
        'port': int(app_settings.get('DBPort', 3306)),
        'user': app_settings.get('DBUser', 'root'),
        'password': app_settings.get('DBPwd', ''),
        'db': app_settings.get('DBName', ''),
        'charset': app_settings.get('DBCharSet', 'utf8'),
        'maxconnections': 30,  # 连接池最大连接数
        'cursorclass': pymysql.cursors.DictCursor
    }
    pool = PooledDB(**config)

    def __enter__(self):
        self.conn = MysqlPool.pool.connection()
        self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()


class Base(object):
    # 单实例锁
    _instance_lock = threading.Lock()
    __tablename__ = ''
    __fields__ = []

    def __init__(self, data=None, *args, **kwargs):
        self.__db_conn = None
        if isinstance(data, dict):
            for key in data:
                setattr(self, key, data[key])
        self.__fields__ = [name for name in dir(self) if isinstance(getattr(self, name), Column)]
        self.__pk__ = [name for name in self.__fields__ if hasattr(getattr(self, name), 'pk')]
        self._logger = get_logger('Mysql')

    @classmethod
    def instance(cls, *args, **kwargs):
        """
        获取一个Model实例
        :return:Model实例
        """
        if not hasattr(cls, '_instance'):
            with cls._instance_lock:
                cls._instance = cls(*args, **kwargs)
        return cls._instance

    def _try_connect(self):
        try:
            if self.__db_conn is None:
                self.__db_conn = pymysql.connect(app_settings.get('ManagerHost', '127.0.0.1'),
                                                 app_settings.get('DBUser', 'root'),
                                                 app_settings.get('DBPwd', ''),
                                                 app_settings.get('DBName', ''),
                                                 int(app_settings.get('DBPort', 3306)), autocommit=1)
            self.__db_conn.ping()
        except:
            self.__db_conn = pymysql.connect(app_settings.get('ManagerHost', '127.0.0.1'),
                                             app_settings.get('DBUser', 'root'),
                                             app_settings.get('DBPwd', ''),
                                             app_settings.get('DBName', ''),
                                             int(app_settings.get('DBPort', 3306)), autocommit=1)

    def close(self):
        if self.__db_conn is not None:
            self.__db_conn.close()
        pass

    def fetchone(self, sql):
        """
        查询单条记录
        :param sql:查询的sql语句
        :return:结果集或 None
        """
        data = None
        with MysqlPool() as db:
            self._logger.info(sql)
            db.cursor.execute(sql)
            data = db.cursor.fetchone()
        return data

    def fetchone_not_pool(self, sql):
        """
        查询单条记录
        :param sql:查询的sql语句
        :return:结果集或 None
        """
        self._try_connect()
        cursor = None
        try:
            cursor = self.__db_conn.cursor(pymysql.cursors.DictCursor)
            self._logger.info(sql)
            cursor.execute(sql)
            data = cursor.fetchone()
        except Exception as e:
            data = None
        if cursor is not None:
            cursor.close()
            self.close()
        return data

    def fetchall(self, sql):
        """
       查询多条记录
       :param sql:查询的sql语句
       :return:结果集或 None
       """
        data = None
        with MysqlPool() as db:
            self._logger.info(sql)
            db.cursor.execute(sql)
            data = db.cursor.fetchall()
        return data

    def fetchall_not_pool(self, sql):
        """
        查询多条记录
        :param sql:查询的sql语句
        :return:结果集或 None
        """
        self._try_connect()
        cursor = None
        try:
            cursor = self.__db_conn.cursor(pymysql.cursors.DictCursor)
            self._logger.info(sql)
            cursor.execute(sql)
            data = cursor.fetchall()
        except Exception as e:
            data = None
        if cursor is not None:
            cursor.close()
            self.close()
        return data

    def execute(self, sql):
        """
        执行sql语句
        :param sql: 执行的sql语句
        :return: True|False
        """
        ret = False
        with MysqlPool() as db:
            self._logger.info(sql)
            db.cursor.execute(sql)
            db.conn.commit()
            ret = True
        return ret

    def execute_not_pool(self, sql):
        """
        执行sql语句
        :param sql: 执行的sql语句
        :return: True|False
        """
        self._try_connect()
        cursor = None
        try:
            cursor = self.__db_conn.cursor(pymysql.cursors.DictCursor)
            self._logger.info(sql)
            cursor.execute(sql)
            self.__db_conn.commit()
            data = True
        except Exception as e:
            self.__db_conn.rollback()
            data = False
        if cursor is not None:
            cursor.close()
            self.close()
        return data

    def _parse_fields(self, _fields):
        if _fields == '*':
            return _fields
        else:
            return ','.join(_fields)

    def _parse_field(self, field):
        if "." not in field:
            return "`{}`".format(field)
        else:
            tmp = field.split('.')
            return "`{}`".format("`.`".join(tmp))

    def _parse_value(self, sv, key=None):
        return "'{}'".format(str(sv)) if isinstance(sv, str) else str(sv)

    def _parse_where(self, where):
        whereStr = ''
        if isinstance(where, str):
            # 直接使用字符串条件
            whereStr = where
        else:
            whereStrItem = [];
            for key in where:
                val = where[key]
                if key.startswith('_'):
                    whereStrItem.append(self._parseThinkWhere(key, val))
                else:
                    if isinstance(val, str):
                        whereStrItem.append("{}={}".format(self._parse_field(key), self._parse_value(val, key)))
                    elif isinstance(val, int):
                        whereStrItem.append("{}={}".format(self._parse_field(key), self._parse_value(val, key)))
                    elif isinstance(val, list):
                        if isinstance(val[0], str):
                            if val[0].lower() == 'exp':
                                whereStrItem.append(' ({} {}) '.format(self._parse_field(key), val[1]))
                            elif val[0].lower() == 'in':
                                val_1 = val[1]
                                if isinstance(val_1, str):
                                    val_1 = val_1.split(',')
                                val_11 = [self._parse_value(xval) for xval in val_1]
                                whereStrItem.append(' {} IN ({}) '.format(self._parse_field(key), ','.join(val_11)))
                            elif val[0].lower() == 'not in':
                                # 跟上边一样换成not in
                                val_1 = val[1]
                                if isinstance(val_1, str):
                                    val_1 = val_1.split(',')
                                val_11 = [self._parse_value(xval) for xval in val_1]
                                whereStrItem.append(' {} NOT IN ({}) '.format(self._parse_field(key), ','.join(val_11)))
                            elif val[0].lower() == 'like':
                                if isinstance(val[1], str):
                                    whereStrItem.append('({} LIKE {})'.format(self._parse_field(key),
                                                                              self._parse_value(val[1])))
                            elif val[0].lower() == 'between':  # BETWEEN运算
                                val_1 = val[1]
                                if isinstance(val_1, str):
                                    val_1 = val_1.split(',')
                                if len(val_1) == 2:
                                    whereStrItem.append(
                                        '({} BETWEEN {} AND {}) '.format(self._parse_field(key),
                                                                         self._parse_value(val_1[0]),
                                                                         self._parse_value(val_1[1])))
            whereStr = " ({}) ".format(" AND ".join(whereStrItem))
        return whereStr

    def _parse_with_pkwhere(self):
        ret = {}
        for field in self.__pk__:
            sv = getattr(self, field)
            if not isinstance(sv, Column):
                ret[field] = sv
        return ret

    def _parseThinkWhere(self, key, val):
        whereStr = '';
        if key == "_string":
            whereStr = val
        return whereStr

    def make_select_sql(self, fields="*", where='', table=None):
        """
        拼接 SELECT 语句
        :param fields: 查询的字段,可以是list 或 ,分割的str
        :param where: 查询字段,字典
        :param table: 表名,默认为Model定义的__tablename__
        :return:
        """
        if isinstance(fields, str):
            _fields = fields.split(",")
        elif isinstance(fields, list):
            _fields = fields
        else:
            _fields = '*'
        if table is None:
            table = "`{}`".format(self.__tablename__)
        sql = 'SELECT {} FROM {}'.format(self._parse_fields(_fields), table);
        _pkwhere = self._parse_with_pkwhere()
        dict(where, **_pkwhere)
        if where is not None:
            sql += ' WHERE ' + self._parse_where(where)
        return sql

    def _parse_set(self, set_dict):
        StrItem = [];
        for key in set_dict:
            # update set 不能修改主键
            if (key in self.__pk__):
                continue
            val = set_dict[key]
            StrItem.append("{}={}".format(self._parse_field(key), self._parse_value(val)))
        return "{}".format(",".join(StrItem))

    def make_updata_sql(self, where, table=None):
        """
        拼接 UPDATE 语句
        :param where:查询字段,字典
        :param table:表名,默认为Model定义的__tablename__
        :return:sql语句
        """
        sql = "UPDATE {} SET {}"
        set_val = {}
        for field in self.__fields__:
            sv = getattr(self, field)
            if not isinstance(sv, Column):
                set_val[field] = sv
        if table is None:
            table = "`{}`".format(self.__tablename__)
        sql = sql.format(table, self._parse_set(set_val))
        _pkwhere = self._parse_with_pkwhere()
        dict(where, **_pkwhere)
        if where is not None:
            sql += ' WHERE ' + self._parse_where(where)
        return sql if self._parse_set(set_val) != "" else None
        pass

    def make_insert_sql(self):
        """
         拼接 INSERT 语句
        :return: sql语句,默认拼接Model字段
        """

        sql = "INSERT INTO `{}` ({}) VALUES ({})"
        fields = [];
        vals = [];
        for field in self.__fields__:
            sv = getattr(self, field)
            if not isinstance(sv, Column):
                fields.append(self._parse_field(field))
                vals.append(self._parse_value(sv))
        fields_str = ','.join(fields)
        vals_str = ','.join(vals)
        return sql.format(self.__tablename__, fields_str, vals_str)

    def make_delete_sql(self, where, table=None):
        """
        拼接 DELETE 语句
        :param where:查询字段,字典
        :param table:表名,默认为Model定义的__tablename__
        :return:sql语句
        """
        if table is None:
            table = "`{}`".format(self.__tablename__)
        sql = "DELETE FROM {}".format(table)
        _pkwhere = self._parse_with_pkwhere()
        dict(where, **_pkwhere)
        if where is not None:
            sql += ' WHERE ' + self._parse_where(where)
        return sql


class Column(object):
    def __init__(self, *args, **kwargs):
        if 'pk' in kwargs.keys():
            self.pk = True


# 下边再写下去就真的是orm了暂时不需要,先不写吧
# 未验证
class StrColumn(Column):
    pass


class IntColumn(Column):
    pass


class DateColumn(Column):
    pass

使用实例:

# coding: utf-8
import json
import uuid

from Include.Setting import app_settings
from Model.Base import Base, Column


class TaskResult(Base):
    __tablename__ = "taskresult"
    mkey = Column(pk=True)
    taskID = Column()
    node = Column()
    taskFinished = Column()
    subID = Column()
    elements = Column()
    reuslt = Column()
    description = Column()

    def __str__(self):
        desc = {
            'mkey': self.mkey,
            'taskID': self.taskID,
            'node': self.node,
            'subID': self.subID
        }
        return json.dumps(desc)

    def make_new_mkey(self):
        self.mkey = str(uuid.uuid4())
        return self.mkey

    def get_result_by_id(self, taskID):
        # SELECT * FROM taskresult WHERE taskID='{}' AND node='{}' ORDER BY subID ASC
        sql = self.make_select_sql("*", {'node': app_settings.get('NodeName', 'main'), 'taskID': taskID})
        sql += ' ORDER BY subID ASC'
        data = [];
        for item in self.fetchall(sql):
            data.append(TaskResult(item))
        return data