Python实现TP的Model层
项目用到了python,习惯了TP的链式操作,自己动手写了一个python的Model层
实现了Pymysql
的原生连接和dbutils
的连接池连接
模型基类:
# coding: utf-8
import pymysql
import threading
from Include.Logger import get_logger
from Include.Setting import app_settings
from dbutils.pooled_db import PooledDB
class MysqlPool:
config = {
'creator': pymysql,
'host': app_settings.get('ManagerHost', '127.0.0.1'),
'port': int(app_settings.get('DBPort', 3306)),
'user': app_settings.get('DBUser', 'root'),
'password': app_settings.get('DBPwd', ''),
'db': app_settings.get('DBName', ''),
'charset': app_settings.get('DBCharSet', 'utf8'),
'maxconnections': 30, # 连接池最大连接数
'cursorclass': pymysql.cursors.DictCursor
}
pool = PooledDB(**config)
def __enter__(self):
self.conn = MysqlPool.pool.connection()
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
class Base(object):
# 单实例锁
_instance_lock = threading.Lock()
__tablename__ = ''
__fields__ = []
def __init__(self, data=None, *args, **kwargs):
self.__db_conn = None
if isinstance(data, dict):
for key in data:
setattr(self, key, data[key])
self.__fields__ = [name for name in dir(self) if isinstance(getattr(self, name), Column)]
self.__pk__ = [name for name in self.__fields__ if hasattr(getattr(self, name), 'pk')]
self._logger = get_logger('Mysql')
@classmethod
def instance(cls, *args, **kwargs):
"""
获取一个Model实例
:return:Model实例
"""
if not hasattr(cls, '_instance'):
with cls._instance_lock:
cls._instance = cls(*args, **kwargs)
return cls._instance
def _try_connect(self):
try:
if self.__db_conn is None:
self.__db_conn = pymysql.connect(app_settings.get('ManagerHost', '127.0.0.1'),
app_settings.get('DBUser', 'root'),
app_settings.get('DBPwd', ''),
app_settings.get('DBName', ''),
int(app_settings.get('DBPort', 3306)), autocommit=1)
self.__db_conn.ping()
except:
self.__db_conn = pymysql.connect(app_settings.get('ManagerHost', '127.0.0.1'),
app_settings.get('DBUser', 'root'),
app_settings.get('DBPwd', ''),
app_settings.get('DBName', ''),
int(app_settings.get('DBPort', 3306)), autocommit=1)
def close(self):
if self.__db_conn is not None:
self.__db_conn.close()
pass
def fetchone(self, sql):
"""
查询单条记录
:param sql:查询的sql语句
:return:结果集或 None
"""
data = None
with MysqlPool() as db:
self._logger.info(sql)
db.cursor.execute(sql)
data = db.cursor.fetchone()
return data
def fetchone_not_pool(self, sql):
"""
查询单条记录
:param sql:查询的sql语句
:return:结果集或 None
"""
self._try_connect()
cursor = None
try:
cursor = self.__db_conn.cursor(pymysql.cursors.DictCursor)
self._logger.info(sql)
cursor.execute(sql)
data = cursor.fetchone()
except Exception as e:
data = None
if cursor is not None:
cursor.close()
self.close()
return data
def fetchall(self, sql):
"""
查询多条记录
:param sql:查询的sql语句
:return:结果集或 None
"""
data = None
with MysqlPool() as db:
self._logger.info(sql)
db.cursor.execute(sql)
data = db.cursor.fetchall()
return data
def fetchall_not_pool(self, sql):
"""
查询多条记录
:param sql:查询的sql语句
:return:结果集或 None
"""
self._try_connect()
cursor = None
try:
cursor = self.__db_conn.cursor(pymysql.cursors.DictCursor)
self._logger.info(sql)
cursor.execute(sql)
data = cursor.fetchall()
except Exception as e:
data = None
if cursor is not None:
cursor.close()
self.close()
return data
def execute(self, sql):
"""
执行sql语句
:param sql: 执行的sql语句
:return: True|False
"""
ret = False
with MysqlPool() as db:
self._logger.info(sql)
db.cursor.execute(sql)
db.conn.commit()
ret = True
return ret
def execute_not_pool(self, sql):
"""
执行sql语句
:param sql: 执行的sql语句
:return: True|False
"""
self._try_connect()
cursor = None
try:
cursor = self.__db_conn.cursor(pymysql.cursors.DictCursor)
self._logger.info(sql)
cursor.execute(sql)
self.__db_conn.commit()
data = True
except Exception as e:
self.__db_conn.rollback()
data = False
if cursor is not None:
cursor.close()
self.close()
return data
def _parse_fields(self, _fields):
if _fields == '*':
return _fields
else:
return ','.join(_fields)
def _parse_field(self, field):
if "." not in field:
return "`{}`".format(field)
else:
tmp = field.split('.')
return "`{}`".format("`.`".join(tmp))
def _parse_value(self, sv, key=None):
return "'{}'".format(str(sv)) if isinstance(sv, str) else str(sv)
def _parse_where(self, where):
whereStr = ''
if isinstance(where, str):
# 直接使用字符串条件
whereStr = where
else:
whereStrItem = [];
for key in where:
val = where[key]
if key.startswith('_'):
whereStrItem.append(self._parseThinkWhere(key, val))
else:
if isinstance(val, str):
whereStrItem.append("{}={}".format(self._parse_field(key), self._parse_value(val, key)))
elif isinstance(val, int):
whereStrItem.append("{}={}".format(self._parse_field(key), self._parse_value(val, key)))
elif isinstance(val, list):
if isinstance(val[0], str):
if val[0].lower() == 'exp':
whereStrItem.append(' ({} {}) '.format(self._parse_field(key), val[1]))
elif val[0].lower() == 'in':
val_1 = val[1]
if isinstance(val_1, str):
val_1 = val_1.split(',')
val_11 = [self._parse_value(xval) for xval in val_1]
whereStrItem.append(' {} IN ({}) '.format(self._parse_field(key), ','.join(val_11)))
elif val[0].lower() == 'not in':
# 跟上边一样换成not in
val_1 = val[1]
if isinstance(val_1, str):
val_1 = val_1.split(',')
val_11 = [self._parse_value(xval) for xval in val_1]
whereStrItem.append(' {} NOT IN ({}) '.format(self._parse_field(key), ','.join(val_11)))
elif val[0].lower() == 'like':
if isinstance(val[1], str):
whereStrItem.append('({} LIKE {})'.format(self._parse_field(key),
self._parse_value(val[1])))
elif val[0].lower() == 'between': # BETWEEN运算
val_1 = val[1]
if isinstance(val_1, str):
val_1 = val_1.split(',')
if len(val_1) == 2:
whereStrItem.append(
'({} BETWEEN {} AND {}) '.format(self._parse_field(key),
self._parse_value(val_1[0]),
self._parse_value(val_1[1])))
whereStr = " ({}) ".format(" AND ".join(whereStrItem))
return whereStr
def _parse_with_pkwhere(self):
ret = {}
for field in self.__pk__:
sv = getattr(self, field)
if not isinstance(sv, Column):
ret[field] = sv
return ret
def _parseThinkWhere(self, key, val):
whereStr = '';
if key == "_string":
whereStr = val
return whereStr
def make_select_sql(self, fields="*", where='', table=None):
"""
拼接 SELECT 语句
:param fields: 查询的字段,可以是list 或 ,分割的str
:param where: 查询字段,字典
:param table: 表名,默认为Model定义的__tablename__
:return:
"""
if isinstance(fields, str):
_fields = fields.split(",")
elif isinstance(fields, list):
_fields = fields
else:
_fields = '*'
if table is None:
table = "`{}`".format(self.__tablename__)
sql = 'SELECT {} FROM {}'.format(self._parse_fields(_fields), table);
_pkwhere = self._parse_with_pkwhere()
dict(where, **_pkwhere)
if where is not None:
sql += ' WHERE ' + self._parse_where(where)
return sql
def _parse_set(self, set_dict):
StrItem = [];
for key in set_dict:
# update set 不能修改主键
if (key in self.__pk__):
continue
val = set_dict[key]
StrItem.append("{}={}".format(self._parse_field(key), self._parse_value(val)))
return "{}".format(",".join(StrItem))
def make_updata_sql(self, where, table=None):
"""
拼接 UPDATE 语句
:param where:查询字段,字典
:param table:表名,默认为Model定义的__tablename__
:return:sql语句
"""
sql = "UPDATE {} SET {}"
set_val = {}
for field in self.__fields__:
sv = getattr(self, field)
if not isinstance(sv, Column):
set_val[field] = sv
if table is None:
table = "`{}`".format(self.__tablename__)
sql = sql.format(table, self._parse_set(set_val))
_pkwhere = self._parse_with_pkwhere()
dict(where, **_pkwhere)
if where is not None:
sql += ' WHERE ' + self._parse_where(where)
return sql if self._parse_set(set_val) != "" else None
pass
def make_insert_sql(self):
"""
拼接 INSERT 语句
:return: sql语句,默认拼接Model字段
"""
sql = "INSERT INTO `{}` ({}) VALUES ({})"
fields = [];
vals = [];
for field in self.__fields__:
sv = getattr(self, field)
if not isinstance(sv, Column):
fields.append(self._parse_field(field))
vals.append(self._parse_value(sv))
fields_str = ','.join(fields)
vals_str = ','.join(vals)
return sql.format(self.__tablename__, fields_str, vals_str)
def make_delete_sql(self, where, table=None):
"""
拼接 DELETE 语句
:param where:查询字段,字典
:param table:表名,默认为Model定义的__tablename__
:return:sql语句
"""
if table is None:
table = "`{}`".format(self.__tablename__)
sql = "DELETE FROM {}".format(table)
_pkwhere = self._parse_with_pkwhere()
dict(where, **_pkwhere)
if where is not None:
sql += ' WHERE ' + self._parse_where(where)
return sql
class Column(object):
def __init__(self, *args, **kwargs):
if 'pk' in kwargs.keys():
self.pk = True
# 下边再写下去就真的是orm了暂时不需要,先不写吧
# 未验证
class StrColumn(Column):
pass
class IntColumn(Column):
pass
class DateColumn(Column):
pass
使用实例:
# coding: utf-8
import json
import uuid
from Include.Setting import app_settings
from Model.Base import Base, Column
class TaskResult(Base):
__tablename__ = "taskresult"
mkey = Column(pk=True)
taskID = Column()
node = Column()
taskFinished = Column()
subID = Column()
elements = Column()
reuslt = Column()
description = Column()
def __str__(self):
desc = {
'mkey': self.mkey,
'taskID': self.taskID,
'node': self.node,
'subID': self.subID
}
return json.dumps(desc)
def make_new_mkey(self):
self.mkey = str(uuid.uuid4())
return self.mkey
def get_result_by_id(self, taskID):
# SELECT * FROM taskresult WHERE taskID='{}' AND node='{}' ORDER BY subID ASC
sql = self.make_select_sql("*", {'node': app_settings.get('NodeName', 'main'), 'taskID': taskID})
sql += ' ORDER BY subID ASC'
data = [];
for item in self.fetchall(sql):
data.append(TaskResult(item))
return data
- 作者:xmlwch
- 原文链接:https://m730.xmlwch.cn/2021/08/30/python/model.html
- 版权声明:本作品采用 知识共享 署名-相同方式共享 4.0 国际(CC BY-SA 4.0)许可协议 进行许可,转载无需与我联系,但请注明出处。