博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
修改torndb库为依赖pymysql,使其适应python3,一个更简单的操作数据库的类。
阅读量:6893 次
发布时间:2019-06-27

本文共 10804 字,大约阅读时间需要 36 分钟。

1、python的MySQLdb和pymysql是两个基本数据库操作包,MySQLdb安装很麻烦,要有c++相关环境,python3也安装不了。

python3一般安装pymysql,此包与MySQLdb包具有高度的可替换性,只要学习一种库的api,另一种库的操作方法完全一模一样,不需要新学api。

但这两个都是偏底层,有个硬伤是公有查询方法不好用,因为执行是cursor.excute,取结果却要二次在游标上进行操作,所以一般情况下大部分人都会进行二次封装。

 

2、torndb就是一个这样的库,不需要调用者去关心数据库游标,每个公有方法都是直接返回结果,这也是一般情况下对数据库二次封装想达到的目标,有了torndb就不需要苦逼的自己去封装数据库操作了,就像有了好用的requests,70%场景下是不需要再去封装urllib了,也有特殊情况,我基于requests的Session类进行了二次封装,使其更容易使用,封装后在类外部只需要很少的外置代码,通过调用封装的方法就能得到结果了。

 

3、痛点就是torndb使依赖的是MySQLdb,安装难,而且里面有几个python语法在python2有,pyhton3去掉了,最后一次官方更新是2014年时候,估计是官方不打算更新来兼容py3了。

我把它修改成了依赖pymysql,兼容python3,并发布到pypi官网了。可以使用pip install torndb_for_python3来安装这个包,也可以自己复制以下代码。

torndb_for_python3完全兼容torndb包,api一模一样,torndb_for_python3包的使用方法,去百度搜索torndb的方法就可以了。

 

以下代码中有#TODO注释的行是在官方代码上修改了的行。

torndb_for_python3.py的代码。

#!/usr/bin/env python## Copyright 2009 Facebook## Licensed under the Apache License, Version 2.0 (the "License"); you may# not use this file except in compliance with the License. You may obtain# a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the# License for the specific language governing permissions and limitations# under the License."""A lightweight wrapper around MySQLdb.Originally part of the Tornado framework.  The tornado.database moduleis slated for removal in Tornado 3.0, and it is now available separatelyas torndb.官方torndb需要搭配MySQLdb包,主要是修改成搭配pymysql,有TODO注释的行都是在官方基础上修改了的,是为了兼容pymsql的参数和兼容python3"""from __future__ import absolute_import, division, with_statementimport copyimport itertoolsimport loggingimport osimport timeimport pymysql  # TODO  需要安装pymysqlpymysql.install_as_MySQLdb()  # TODO  使用monkey技术将MySQLdb包替换为了pymsql包,所以不需要在代码里面大量替换MySQLdb为pymsql字眼,pymsql和MySQLdb的主要类是鸭子类的关系try:    import MySQLdb.constants    import MySQLdb.converters    import MySQLdb.cursorsexcept ImportError:    # If MySQLdb isn't available this module won't actually be useable,    # but we want it to at least be importable on readthedocs.org,    # which has limitations on third-party modules.    if 'READTHEDOCS' in os.environ:        MySQLdb = None    else:        raiseversion = "0.3"version_info = (0, 3, 0, 0)class Connection(object):    """A lightweight wrapper around MySQLdb DB-API connections.    The main value we provide is wrapping rows in a dict/object so that    columns can be accessed by name. Typical usage::        db = torndb.Connection("localhost", "mydatabase")        for article in db.query("SELECT * FROM articles"):            print article.title    Cursors are hidden by the implementation, but other than that, the methods    are very similar to the DB-API.    We explicitly set the timezone to UTC and assume the character encoding to    UTF-8 (can be changed) on all connections to avoid time zone and encoding errors.    The sql_mode parameter is set by default to "traditional", which "gives an error instead of a warning"    (http://dev.mysql.com/doc/refman/5.0/en/server-sql-mode.html). However, it can be set to    any other mode including blank (None) thereby explicitly clearing the SQL mode.    """    def __init__(self, host, database, user=None, password=None,                 max_idle_time=7 * 3600, connect_timeout=30,  # TODO  由0修改成了30,因为pymsql必须大于0                 time_zone="+0:00", charset="utf8", sql_mode="TRADITIONAL"):        self.host = host        self.database = database        self.max_idle_time = float(max_idle_time)        args = dict(conv=CONVERSIONS, use_unicode=True, charset=charset,                    db=database, init_command=('SET time_zone = "%s"' % time_zone),                    connect_timeout=connect_timeout, sql_mode=sql_mode)        if user is not None:            args["user"] = user        if password is not None:            args["passwd"] = password        # We accept a path to a MySQL socket file or a host(:port) string        if "/" in host:            args["unix_socket"] = host        else:            self.socket = None            pair = host.split(":")            if len(pair) == 2:                args["host"] = pair[0]                args["port"] = int(pair[1])            else:                args["host"] = host                args["port"] = 3306        self._db = None        self._db_args = args        self._last_use_time = time.time()        try:            self.reconnect()        except Exception:            logging.error("Cannot connect to MySQL on %s", self.host,                          exc_info=True)    def __del__(self):        self.close()    def close(self):        """Closes this database connection."""        if getattr(self, "_db", None) is not None:            self._db.close()            self._db = None    def reconnect(self):        """Closes the existing database connection and re-opens it."""        self.close()        self._db = MySQLdb.connect(**self._db_args)        self._db.autocommit(True)    def iter(self, query, *parameters, **kwparameters):        """Returns an iterator for the given query and parameters."""        self._ensure_connected()        cursor = MySQLdb.cursors.SSCursor(self._db)        try:            self._execute(cursor, query, parameters, kwparameters)            column_names = [d[0] for d in cursor.description]            for row in cursor:                yield Row(zip(column_names, row))        finally:            cursor.close()    def query(self, query, *parameters, **kwparameters):        """Returns a row list for the given query and parameters."""        cursor = self._cursor()        try:            self._execute(cursor, query, parameters, kwparameters)            column_names = [d[0] for d in cursor.description]            # return [Row(itertools.izip(column_names, row)) for row in cursor]   # TODO 修改了此行            return [Row(itertools.zip_longest(column_names, row)) for row in cursor]        finally:            cursor.close()    def get(self, query, *parameters, **kwparameters):        """Returns the (singular) row returned by the given query.        If the query has no results, returns None.  If it has        more than one result, raises an exception.        """        rows = self.query(query, *parameters, **kwparameters)        if not rows:            return None        elif len(rows) > 1:            raise Exception("Multiple rows returned for Database.get() query")        else:            return rows[0]    # rowcount is a more reasonable default return value than lastrowid,    # but for historical compatibility execute() must return lastrowid.    def execute(self, query, *parameters, **kwparameters):        """Executes the given query, returning the lastrowid from the query."""        return self.execute_lastrowid(query, *parameters, **kwparameters)    def execute_lastrowid(self, query, *parameters, **kwparameters):        """Executes the given query, returning the lastrowid from the query."""        cursor = self._cursor()        try:            self._execute(cursor, query, parameters, kwparameters)            return cursor.lastrowid        finally:            cursor.close()    def execute_rowcount(self, query, *parameters, **kwparameters):        """Executes the given query, returning the rowcount from the query."""        cursor = self._cursor()        try:            self._execute(cursor, query, parameters, kwparameters)            return cursor.rowcount        finally:            cursor.close()    def executemany(self, query, parameters):        """Executes the given query against all the given param sequences.        We return the lastrowid from the query.        """        return self.executemany_lastrowid(query, parameters)    def executemany_lastrowid(self, query, parameters):        """Executes the given query against all the given param sequences.        We return the lastrowid from the query.        """        cursor = self._cursor()        try:            cursor.executemany(query, parameters)            return cursor.lastrowid        finally:            cursor.close()    def executemany_rowcount(self, query, parameters):        """Executes the given query against all the given param sequences.        We return the rowcount from the query.        """        cursor = self._cursor()        try:            cursor.executemany(query, parameters)            return cursor.rowcount        finally:            cursor.close()    update = execute_rowcount    updatemany = executemany_rowcount    insert = execute_lastrowid    insertmany = executemany_lastrowid    def _ensure_connected(self):        # Mysql by default closes client connections that are idle for        # 8 hours, but the client library does not report this fact until        # you try to perform a query and it fails.  Protect against this        # case by preemptively closing and reopening the connection        # if it has been idle for too long (7 hours by default).        if (self._db is None or                (time.time() - self._last_use_time > self.max_idle_time)):            self.reconnect()        self._last_use_time = time.time()    def _cursor(self):        self._ensure_connected()        return self._db.cursor()    def _execute(self, cursor, query, parameters, kwparameters):        try:            return cursor.execute(query, kwparameters or parameters)        except OperationalError:            logging.error("Error connecting to MySQL on %s", self.host)            self.close()            raise    def __str__(self):        return f'{type(self)} -> [{self.host}机器的{ self.database}库]'  # TODOclass Row(dict):    """A dict that allows for object-like property access syntax."""    def __getattr__(self, name):        try:            return self[name]        except KeyError:            raise AttributeError(name)if MySQLdb is not None:    # Fix the access conversions to properly recognize unicode/binary    FIELD_TYPE = MySQLdb.constants.FIELD_TYPE    FLAG = MySQLdb.constants.FLAG    CONVERSIONS = copy.copy(MySQLdb.converters.conversions)    field_types = [FIELD_TYPE.BLOB, FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING]    if 'VARCHAR' in vars(FIELD_TYPE):        field_types.append(FIELD_TYPE.VARCHAR)    for field_type in field_types:        # CONVERSIONS[field_type] = [(FLAG.BINARY, str)] + CONVERSIONS[field_type]  # TODO 修改了此行        CONVERSIONS[field_type] = [(FLAG.BINARY, str)].append(CONVERSIONS[field_type])    # Alias some common MySQL exceptions    IntegrityError = MySQLdb.IntegrityError    OperationalError = MySQLdb.OperationalError

 

使用用法,举一个例子:

# pip install torndb_for_python3

mysql_conn = torndb_for_python3.Connection(host='localhost', database='test', user='root', password='123456', charset='utf8') print(mysql_conn.query('SELECT * FROM test.tablexx'))

转载地址:http://ilzdl.baihongyu.com/

你可能感兴趣的文章
【WP8.1开发】基于应用的联系人存储
查看>>
AI新时代-教你使用python+Opencv完成人脸解锁(附源码)
查看>>
MongoDB ( 三 )高级_状态返回和安全
查看>>
基于 Netty 的可插拔业务通信协议的实现「1」协议描述及基本消息对象设计
查看>>
NodeJS介绍以及开发微信公众号Example
查看>>
新时代前端的自我修养—2017 D2主题分享记录及我的思考
查看>>
java并发编程学习14--CompletableFuture(一)
查看>>
ES6语法之Symbol
查看>>
取周期性字符串中的其中一个
查看>>
d3.js ----面积图表
查看>>
Zepto这样操作元素属性
查看>>
30-seconds-code——Object
查看>>
pyspark底层浅析
查看>>
【设计模式】组合模式之神经网络应用
查看>>
Jenkins系统搭建及常见操作
查看>>
SQL Server 2012自动异地备份
查看>>
Ubuntu 下 SVN 多版本库的搭建
查看>>
CSS选择器
查看>>
一款简单到极致的 React 数据流框架——Refast
查看>>
ribbon的ServerListRefreshInterval
查看>>