淘先锋技术网

首页 1 2 3 4 5 6 7

在这里插入图片描述

一、前言

1.1 概述

  Python 是一种高级、通用的解释型编程语言,以其优雅、准确、 简单的语言特性,在云计算、Web 开发、自动化运维、数据科学以及机器学习等人工智能领域获得了广泛应用。Python 定义了连接和操作数据库的标准接口 Python DB API。不同的数据库在此基础上实现了特定的驱动,这些驱动都实现了标准接口。支持PostgreSQL的库也有不少,从PostgreSQL提供的 WIKI 可以得知,使用得最多的而且最成熟的是Psycopg2。
在这里插入图片描述

  PostgreSQL 数据库是最常用的关系型数据库之一,最吸引人的一点是它作为开源数据库且具有可拓展性,能够提供丰富的应用。运用python可以很简单的建立PostgreSQL 数据库连接,最常见的 Python 驱动程序就是 psycopg,它完全实现了 Python DB-API 2.0 接口规范。接下来我们介绍如何通过 psycopg 连接和操作 PostgreSQL 数据库。

1.2 什么是 Psycopg

  PostgreSQL可以使用 psycopg 模块与Python集成,是用于 Python 编程语言的 PostgreSQL 数据库适配器,其是非常小,快速,稳定的。 不需要单独安装此模块,因为默认情况下它会随着Python一起发布。首先,我们需要安装 Python 。Python 可以通过官方网站下载,安装之后可以通过以下命令查看版本信息:

Python -v

然后通过 pip 安装最新的 psycopg:

# 适用于Python2
pip install --upgrade psycopg2
# 适用于Python3
pip3 install --upgrade psycopg2

也可以在pycharm中查找psycopg2安装包:
在这里插入图片描述

想要使用psycopg2,必须用import语句导入该包:

import psycopg2

二、操作 PostgreSQL

2.1 连接数据库

  为了方便开发,在 Pycharm 中新建一个项目 PythonPostgreSQL,然后创建一个数据库连接的配置文件 dbconfig.ini,添加以下内容:

[postgresql]
host = 127.0.0.1
port = 5432
database = HR
user = navicat
password = testnavicat

  配置文件中存储了数据库的连接信息:主机、端口、数据库、用户以及密码,按照自己的环境进行配置。要使用 psycopg2 模块,必须首先创建一个表示数据库的 Connection 对象,然后可以选择创建可以执行所有SQL语句的游标对象。新建一个测试数据库连接的 Python 文件 postgresql_connection.py,示例代码如下所示:

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
    connection = psycopg2.connect(**db_config)
    print("Opened database successfully")
    pass


if __name__ == '__main__':
    main()

  上述代码展示了如何连接到一个现有的数据库。如果数据库不存在,那么它就会被创建,最终将返回一个数据库对象。connection类表示数据库连接对象,是由psycopg2.connect()方法创建。psycopg2.connect() 函数创建一个新的数据库会话并且返回一个连接对象,以执行 DML、DDL、DQL 等数据库事务。该函数的参数为:

参数说明
database要连接的数据库名称
user连接数据库的用户名
password连接数据库的密码
host数据库地址,一般为 localhost,或者主机的IP地址
port端口,默认为5432

connection提供了如下常用的数据库操作:

数据库操作说明
commit()提交任何未提交的事务到数据库
rollback()回滚
close()关闭数据库。如果关闭数据库时仍有未提交的事务,则执行回滚操作

  为了执行任何 SQL 事务或语句,我们需要实现游标 cursor 类,用来里执行 PostgreSQL 命令,其由 connection.cursor() 方法创建。它们在整个生命周期内都绑定到连接,并且所有命令都在连接包装的数据库会话的上下文中执行,退出程序时也会关闭光标。

有了cursor对象,就可以操作数据库了,cursor 提供了 execute 方法用来执行SQL语句,详细方法继续往下看。

2.2 异常处理

  使用psycopg2的 Error 进行异常捕获,能捕获到sql执行时期的所有异常。处理此异常的最佳方法是将 conn 对象和 cursor 的闭包放在 finally 块中,以防我们的连接出现错误。另外,如果数据库没有连接,并且对连接对象应用了 close 方法,我们的 finally 块中就会出错。下面代码中表test是库中不存的表,执行sql后会报异常,经过异常捕获后非常美观,不影响程序运行;

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    global connection
    try:
        # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
        connection = psycopg2.connect(**db_config)

        print("连接成功,PostgreSQL 服务器版本:")

    except (Exception, DatabaseError) as e:
        print("连接 PostgreSQL 失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
            print("PostgreSQL 数据库连接已关闭。")
    pass


if __name__ == '__main__':
    main()

2.3 创建表

  当数据库建立连接成功后,可以通过执行 CREATE TABLE 和 DROP TABLE 语句执行DDL 事务来创建和删除数据表。我们创建一个新的 Python 文件 postgresql_create.py 在数据库中创建一个表,示例代码如下所示:

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
        connection = psycopg2.connect(**db_config)

        # 创建一个游标
        cur = connection.cursor()

        # 定义 SQL 语句
        sql = """ create table createtabletest (
                  id serial primary key,
                  name character varying(10) not null unique,
                  created_at timestamp not null 
                ) """

        # 执行 SQL 命令
        cur.execute(sql)

        # 关闭游标
        cur.close()

        # 提交事务
        connection.commit()
        print("操作成功!")
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
            print("PostgreSQL 数据库连接已关闭。")
    pass


if __name__ == '__main__':
    main()

  同样是先连接数据库,然后利用游标对象的 execute() 方法执行 SQL 命令创建表。commit 方法用于提交事务修改,如果不执行该操作不会真正创建表,因为 psycopg2 连接 PostgreSQL 默认不会自动提交(autocommit)。执行该脚本的结果如下:

操作成功!
PostgreSQL 数据库连接已关闭。

如果 createtabletest 表已经存在,将会返回以下错误:

操作失败: relation "createtabletest" already exists

2.4 INSERT 操作

  我们将使用 INSERT 命令执行 DML 事务,可以将一些数据添加到表中。Python 中游标对象的 execute() 方法用于执行 SQL 语句,该方法可以接收参数实现预编译语句。以下显示了我们如何在已创建的表中插入记录,示例代码如下所示:

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
        connection = psycopg2.connect(**db_config)

        # 创建一个游标
        cur = connection.cursor()

        # 定义 SQL 语句
        sql = "insert into createtabletest(name, created_at) values (%s, %s) RETURNING id"

        # 执行 SQL 命令
        cur.execute(sql, ('tony', '2023-04-15 00:56:00'))
        cur.execute(sql, ('zhangSan', '2023-04-15 00:56:00'))
        cur.execute(sql, ('liSi', '2023-04-15 00:56:00'))
        cur.execute(sql, ('wangWu', '2023-04-15 00:56:00'))

        # 获取 id
        id = cur.fetchone()[0]

        # 提交事务
        connection.commit()
        print("操作成功! 用户 id:", id)

        # 关闭游标
        cur.close()
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
            print("PostgreSQL 数据库连接已关闭。")
    pass


if __name__ == '__main__':
    main()

  上面示例中,sql 变量中的百分号(%)是占位符,这些占位符的值在 execute() 方法中进行替换;游标对象的 fetchone 方法用于返回一行结果,这里用于获取 RETURNING id 返回的用户 id。

2.5 SELECT 操作

  当我们创建表完成并插入数据后,可以执行一个 SELECT 事务来查看插入的数据。游标对象提供了三种获取返回结果的方法。如下表所示:

方法说明
fetchone()获取下一行数据。当没有更多的数据是可用时,返回null
fetchmany([size=cursor.arraysize])获取下一组数据行。当没有找到记录,返回空列表。
fetchall()获取所有查询结果,返回值为tuple列表。空行时则返回空列表

我们创建一个新的文件来展示如何获取并显示在上面的例子中创建的记录,示例代码如下所示:

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
        connection = psycopg2.connect(**db_config)

        # 创建一个游标
        cur = connection.cursor()

        # 定义 SQL 语句
        sql = "select id, name, created_at from createtabletest"

        # 执行 SQL 命令
        cur.execute(sql)
        print("用户数量:", cur.rowcount)

        # 获取结果
        user = cur.fetchone()
        while user is not None:
            print(user)
            user = cur.fetchone()

        # 关闭游标
        cur.close()
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
    pass


if __name__ == '__main__':
    main()

在上面示例中,游标对象的 rowcount 属性代表了返回的数据行数,fetchone() 方法返回一行数据或者 None,while 循环用于遍历和打印查询结果。

2.6 UPDATE 操作

  更新数据的流程跟之前的查询、插入类似,可以使用UPDATE语句来获取并显示更新的记录,psycopg2支持单条数据更新及批量更新,不过需要注意的是,PostgreSQL的数据被修改后,直接SELECT的话,数据并不会根据主键自动排序。示例代码如下所示:

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
        connection = psycopg2.connect(**db_config)

        # 创建一个游标
        cur = connection.cursor()

        # 定义 SQL 语句
        sql = "update created_at set name = %s  where id = %s"

        # 执行 SQL 命令
        cur.execute(sql, ('tom', 1))

        # 获取 id
        rows = cur.rowcount

        # 提交事务
        connection.commit()
        print("操作成功! 更新行数:", rows)

        # 再次查询数据
        sql = "select id, name, created_at from createtabletest where id = 1"
        cur.execute(sql)
        user = cur.fetchone()
        print(user)

        # 关闭游标
        cur.close()
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
    pass


if __name__ == '__main__':
    main()

2.7 DELETE 操作

  删除操作很简单,与更新数据的操作基本相同,只需要将 UPDATE 语句替换成 DELETE 语句,就可以删除表中的数据。示例代码如下所示:

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
        connection = psycopg2.connect(**db_config)

        # 创建一个游标
        cur = connection.cursor()

        # 定义 SQL 语句
        sql = "delete from createtabletest where id = %s"

        # 执行 SQL 命令
        cur.execute(sql, (1,))
        rows = cur.rowcount

        # 提交事务
        connection.commit()
        print("操作成功! 删除行数:", rows)

        # 关闭游标
        cur.close()
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
    pass


if __name__ == '__main__':
    main()

在 MySQ 中,TRUNCATE TABLE 语句会将表的数据清空,而且会重置自增主键的计数。但是在 PostgreSQL 中,必须要在这个语句后面添加 RESTART IDENTITY,才能将主键重置,不然就只会清空表。

2.8 事务管理

  在前面的示例中,需要使用 connection.commit() 提交对 PostgreSQL 数据库执行的修改,这是因为 psycopg2 默认没有打开自动提交功能,我们也可以利用连接对象的 autocommit 属性设置是否自动提交。

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
        connection = psycopg2.connect(**db_config)

        # 打印和设置自动提交
        print('默认 autocommit:', connection.autocommit)
        connection.autocommit = True
        print('新的 autocommit:', connection.autocommit)

        # 创建一个游标
        cur = connection.cursor()

        # 定义 SQL 语句
        sql = " insert into createtabletest(name, created_at) values (%s, %s) RETURNING id "

        # 执行 SQL 命令
        cur.execute(sql, ('tony', '2020-06-08 11:00:00'))

        # 获取 id
        id = cur.fetchone()[0]

        print("操作成功! 用户 id:", id)

        # 关闭游标
        cur.close()
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
            print("PostgreSQL 数据库连接已关闭。")

    pass


if __name__ == '__main__':
    main()

  通过 connection.autocommit 设置了自动提交,所以 INSERT 语句插入数据之后不需要再执行 commit 操作。如果一个事务中包含多个数据库操作,还是应该在事务的最后统一执行提交,并且在异常处理部分通过连接对象的 rollback() 方法回滚部分完成的事务。
  另一种管理事务的方法是使用 with 语句,这样可以避免手动的资源管理和事务操作。

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 with 语句管理事务
        with psycopg2.connect(**db_config) as connection:

            # 创建一个游标
            with connection.cursor() as cur:
                # 插入数据
                sql = " insert into createtabletest(name, created_at) values (%s, %s) "
                cur.execute(sql, ('Jason', '2023-04-15 10:30:00'))

                # 更新数据
                sql = "update createtabletest set created_at = %s where name = %s"
                cur.execute(sql, ('2023-04-15 11:00:00', 'tony'))

                sql = " select id, name, created_at from createtabletest "

                # 查询数据
                cur.execute(sql)

                # 获取结果
                user = cur.fetchone()
                while user is not None:
                    print(user)
                    user = cur.fetchone()
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
    pass


if __name__ == '__main__':
    main()

在上面示例中,整个事务包含插入数据、更新数据以及查询数据三个操作。

2.9 调用存储函数

游标对象的 callproc() 方法可以用于执行存储函数。我们先创建一个返回用户数量的函数 get_user_count:

CREATE OR REPLACE FUNCTION get_user_count()
returns int
AS $$
DECLARE
ln_count int;
BEGIN
  select count(*) into ln_count
  from users;

  return ln_count;
END; $$
LANGUAGE plpgsql;

接下来创建一个新的文件来展示如何调用存储函数:

import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 with 语句管理事务
        with psycopg2.connect(**db_config) as connection:

            # 创建一个游标
            with connection.cursor() as cur:
                # 调用存储函数
                cur.callproc('get_user_count')

                row = cur.fetchone()[0]
                print('用户总数:', row)
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
    pass


if __name__ == '__main__':
    main()

callproc() 方法调用存储函数也可以写成以下等价的形式:

2.10 批量操作

  上面示例中提到 python操作 PostgreSql 的操作数据基本用法,但这并不能满足我们操作大量数据的美好心愿,在实际使用中,更多的时候需要进行批量的增删改查操作。批量操作数据库有多种方法,根据官网的叙述,数据批量操作有以下几种方法:

方法说明
executemany最简单的一种方法,属于游标的实例方法
execute_batch性能好,速度快的方法,属于类方法
execute_values官方推荐的方法,但是条件是要批量操作的字段的值必须相同
import psycopg2
from psycopg2 import DatabaseError
from configparser import ConfigParser
from psycopg2.extras import execute_batch


# 读取数据库配置文件,返回一个字典对象
def read_db_config(filename='dbconfig.ini', section='postgresql'):
    # 创建解析器,读取配置文件
    parser = ConfigParser()
    parser.read(filename)

    # 获取 postgresql 部分的配置
    db = {}
    if parser.has_section(section):
        items = parser.items(section)
        for item in items:
            db[item[0]] = item[1]
    else:
        raise Exception('文件 {1} 中未找到 {0} 配置信息!'.format(section, filename))

    return db


db_config = read_db_config()
connection = None


def main():
    try:
        # 使用 psycopg2.connect 方法连接 PostgreSQL 数据库
        connection = psycopg2.connect(**db_config)

        # 打印和设置自动提交
        print('默认 autocommit:', connection.autocommit)
        connection.autocommit = True
        print('新的 autocommit:', connection.autocommit)

        # 创建一个游标
        cursor = connection.cursor()

        # 定义 SQL 语句
        sql = " insert into createtabletest(name, created_at) values (%s, %s) RETURNING id "

        data = [['member%d' % x, '2023-04-16 11:%d:00' % x] for x in range(0, 60)]
        # 执行 SQL 命令
        execute_batch(cursor, sql, data, page_size=len(data))
        # 获取 id
        id = cursor.fetchone()[0]

        print("操作成功! 用户 id:", id)

        # 关闭游标
        cursor.close()
    except (Exception, DatabaseError) as e:
        print("操作失败:", e)
    finally:
        # 释放数据库连接
        if connection is not None:
            connection.close()
            print("PostgreSQL 数据库连接已关闭。")

    pass


if __name__ == '__main__':
    main()