Python 操作金仓数据库的完全指南(下篇):SQL执行、批量操作与扩展功能

一、执行 SQL 语句

1.1 基础查询

先看一个完整的查询流程:

import ksycopg2

conn = ksycopg2.connect(
database='TEST',
user='SYSTEM',
password='123456',
host='127.0.0.1',
port='54321'
)

cur = conn.cursor()

# 执行查询
cur.execute("SELECT id, name FROM test_user WHERE age > %s", (18,))

# 获取结果
rows = cur.fetchall()
for row in rows:
print(f"id: {row[0]}, name: {row[1]}")

cur.close()
conn.close()

几个要点:

  • 先用 cursor() 创建游标
  • execute() 执行 SQL
  • fetchall() 拿全部结果
  • 用完记得关闭游标和连接

1.2 获取结果集的几种方式

fetchone():一条一条拿

cur.execute("SELECT id, name FROM test_user")
while True:
row = cur.fetchone()
if row is None:
break
print(row)

适合处理大结果集,不会一次性把所有数据加载到内存。

fetchmany():分批拿

cur.execute("SELECT id, name FROM test_user")
while True:
rows = cur.fetchmany(100) # 一次拿100条
if not rows:
break
for row in rows:
process_row(row)

比 fetchone 快,又不会像 fetchall 那样吃内存。

fetchall():一次性全拿

cur.execute("SELECT id, name FROM test_user")
rows = cur.fetchall() # 数据量小的时候用
for row in rows:
print(row)

数据量小的时候方便,几万条以上就别用了。

1.3 获取列信息

有时候需要知道查询结果有哪些列、什么类型,可以用 cursor.description

cur.execute("SELECT id, name, created_at FROM test_user")
for col in cur.description:
print(f"列名: {col.name}, 类型码: {col.type_code}, 长度: {col.internal_size}")

输出示例:

列名: id, 类型码: 23, 长度: 4
列名: name, 类型码: 1043, 长度: -1
列名: created_at, 类型码: 1114, 长度: 8

类型码是 Kingbase 内部的数据类型 OID,一般用不到,但调试时有用。

1.4 执行非查询 SQL

INSERT、UPDATE、DELETE 这些不返回结果集的 SQL,直接用 execute() 执行就行:

cur = conn.cursor()

# 插入
cur.execute("INSERT INTO test_user (id, name) VALUES (%s, %s)", (1, '张三'))

# 更新
cur.execute("UPDATE test_user SET name = %s WHERE id = %s", ('李四', 1))

# 删除
cur.execute("DELETE FROM test_user WHERE id = %s", (1,))

# 注意:需要提交事务
conn.commit()
cur.close()

千万别忘了 commit(),否则数据不会真正写入。

二、参数传递与防 SQL 注入

ksycopg2 用占位符 %s 传递参数,会自动处理转义,不用自己拼接字符串。

# 正确写法:参数单独传递
cur.execute(
"INSERT INTO test_user (id, name) VALUES (%s, %s)",
(1, "张三")
)

# 错误写法:自己拼接字符串,有 SQL 注入风险
cur.execute(f"INSERT INTO test_user (id, name) VALUES ({id}, '{name}')")

2.1 位置占位符

# 用元组传参
cur.execute(
"SELECT * FROM test_user WHERE age > %s AND name LIKE %s",
(18, '%张%')
)

2.2 命名占位符

# 用字典传参,参数多的时候更清晰
cur.execute(
"SELECT * FROM test_user WHERE age > %(min_age)s AND name LIKE %(name_pattern)s",
{'min_age': 18, 'name_pattern': '%张%'}
)

三、批量操作

3.1 executemany() 的问题

ksycopg2 提供了 executemany(),但它的实现方式是循环调用 execute(),一次发一条 SQL,性能提升不大。

data = [(1, '张三'), (2, '李四'), (3, '王五')]
cur.executemany("INSERT INTO test_user (id, name) VALUES (%s, %s)", data)
conn.commit()

数据量小的时候用用还行,大批量插入不建议。

3.2 execute_batch() 批量执行

ksycopg2.extras.execute_batch() 把多条 SQL 分成若干组,每组一次发给数据库,减少网络往返次数。

from ksycopg2 import extras

data = [(1, '张三'), (2, '李四'), (3, '王五'), ...] # 几百条数据

extras.execute_batch(
cur,
"INSERT INTO test_user (id, name) VALUES (%s, %s)",
data,
page_size=100 # 每100条发一次
)
conn.commit()

page_size 控制每组多少条。太小网络交互多,太大 SQL 语句太长,100-500 之间比较合适。

3.3 execute_values() 一条 SQL 插多行

execute_values() 把所有数据拼成一条 INSERT 语句,效率最高:

from ksycopg2 import extras

data = [(1, '张三'), (2, '李四'), (3, '王五')]

extras.execute_values(
cur,
"INSERT INTO test_user (id, name) VALUES %s",
data,
page_size=100
)
conn.commit()

生成的 SQL 类似:

INSERT INTO test_user (id, name) VALUES (1, '张三'), (2, '李四'), (3, '王五')

一次性插入几千条数据时,execute_values 比 execute_batch 快不少。

三种批量插入方式对比(插入 1 万条数据测试):

Python 操作金仓数据库的完全指南(下篇):SQL执行、批量操作与扩展功能

四、调用存储过程

4.1 调用函数

# 先创建函数
cur.execute("""
CREATE OR REPLACE FUNCTION add_user(p_id INTEGER, p_name TEXT)
RETURNS TEXT AS $$
BEGIN
INSERT INTO test_user (id, name) VALUES (p_id, p_name);
RETURN 'success';
END;
$$ LANGUAGE plpgsql;
""")

# 调用函数
cur.callproc('add_user', (10, 'test_user'))
result = cur.fetchone()
print(result) # ('success',)
conn.commit()

4.2 调用存储过程

调用存储过程需要用 execute() 配合 CALL 语句:

-- 创建存储过程
CREATE OR REPLACE PROCEDURE update_user_name(
p_id INTEGER,
p_new_name TEXT
)
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE test_user SET name = p_new_name WHERE id = p_id;
END;
$$;

# 调用存储过程
cur.execute("CALL update_user_name(%s, %s)", (1, '新名字'))
conn.commit()

五、COPY 命令:高效数据导入导出

COPY 是 Kingbase 提供的快速数据导入导出方式,比 INSERT 快很多。

5.1 copy_from():从文件导入

cur = conn.cursor()

with open('data.csv', 'r') as f:
cur.copy_from(
file=f,
table='test_user',
columns=('id', 'name'),
sep=',' # 列分隔符
)
conn.commit()

默认分隔符是制表符 \t,CSV 文件需要指定 sep=','NULL 值默认用 \N 表示,也可以改:

cur.copy_from(f, 'test_user', columns=('id', 'name'), sep=',', null='NULL')

5.2 copy_to():导出到文件

with open('export.csv', 'w') as f:
cur.copy_to(
file=f,
table='test_user',
columns=('id', 'name'),
sep=','
)
conn.commit()

5.3 copy_expert():自定义 COPY

copy_expert 最灵活,可以写完整的 COPY 语句:

copy_sql = """
COPY test_user(id, name)
TO STDOUT
WITH CSV HEADER DELIMITER AS ','
"""

with open('export_with_header.csv', 'w') as f:
cur.copy_expert(sql=copy_sql, file=f)

用在数据迁移、日志导出、报表生成这些场景,效率比 SELECT 一行行写高得多。

六、大对象处理

Kingbase 支持 BLOB(二进制大对象)和 CLOB(字符大对象)。ksycopg2 可以处理这些类型。

import ksycopg2

conn = ksycopg2.connect(
database='TEST', user='SYSTEM',
password='123456', host='127.0.0.1', port='54321'
)
cur = conn.cursor()

# 建表
cur.execute('DROP TABLE IF EXISTS test_lob')
cur.execute('''
CREATE TABLE test_lob (
id INTEGER,
b BLOB,
c CLOB,
ba BYTEA
)
''')

# 准备测试数据
ba_data = bytearray("中文测试bytearray", "UTF8")
b_data = bytes('中文测试bytes' * 2, "UTF8")
str_data = '中文str' * 4

# 插入
cur.execute(
"INSERT INTO test_lob VALUES (%s, %s, %s, %s)",
(1, ba_data, ba_data, ba_data)
)
cur.execute(
"INSERT INTO test_lob VALUES (%s, %s, %s, %s)",
(2, b_data, b_data, b_data)
)
cur.execute(
"INSERT INTO test_lob VALUES (%s, %s, %s, %s)",
(3, str_data, str_data, str_data)
)

conn.commit()

# 读取
cur.execute("SELECT id, b, c, ba FROM test_lob")
rows = cur.fetchall()

for row in rows:
for cell in row:
# bytea 类型返回 memoryview,需要转换
if isinstance(cell, memoryview):
print(cell[:].tobytes().decode('UTF8'), end=" ")
else:
print(cell, end=" ")
print()

cur.close()
conn.close()

三个注意事项:

  • bytea 类型在 Python 3 里返回 memoryview,用 tobytes() 转换
  • 大对象不建议频繁读写,性能不好
  • 超大文件(几十 MB 以上)建议存文件系统,数据库只存路径

七、动态 SQL 生成

ksycopg2.sql 模块解决了一个头疼的问题:表名、字段名这类标识符不能直接用参数传递。

from ksycopg2 import sql

# 错误写法:标识符不能参数化
cur.execute("SELECT * FROM %s WHERE id = %s", ('test_user', 1))

# 正确写法:用 sql 模块
cur.execute(
sql.SQL("SELECT * FROM {} WHERE id = %s").format(sql.Identifier('test_user')),
(1,)
)

7.1 拼接带标识符的 SQL

table_name = 'test_user'
id_column = 'user_id'
name_column = 'user_name'

query = sql.SQL("SELECT {id}, {name} FROM {table} WHERE {id} = %s").format(
id=sql.Identifier(id_column),
name=sql.Identifier(name_column),
table=sql.Identifier(table_name)
)

cur.execute(query, (100,))

7.2 使用字面值

from ksycopg2 import sql

# 把 Python 值直接转成 SQL 字面量
value = sql.Literal("It's a test")
cur.execute(
sql.SQL("SELECT {}").format(value)
)
# 生成: SELECT 'It''s a test'

7.3 动态 IN 查询

ids = [1, 2, 3, 4, 5]
placeholders = ','.join(['%s'] * len(ids))
cur.execute(
f"SELECT * FROM test_user WHERE id IN ({placeholders})",
ids
)

八、错误处理

import ksycopg2
from ksycopg2 import OperationalError, IntegrityError

try:
conn = ksycopg2.connect(database='TEST', user='SYSTEM', password='123456')
cur = conn.cursor()
cur.execute("INSERT INTO test_user (id, name) VALUES (%s, %s)", (1, '张三'))
conn.commit()
except IntegrityError as e:
print(f"违反唯一约束: {e}")
conn.rollback()
except OperationalError as e:
print(f"连接或操作错误: {e}")
except Exception as e:
print(f"其他错误: {e}")
conn.rollback()
finally:
cur.close()
conn.close()

常见异常类型:

  • IntegrityError:违反约束(主键重复、外键不存在等)
  • OperationalError:连接断开、语法错误等
  • ProgrammingError:表不存在、列不存在等

九、完整示例:从连接到批量插入

import ksycopg2
from ksycopg2 import extras

def main():
conn = None
try:
conn = ksycopg2.connect(
database='TEST',
user='SYSTEM',
password='123456',
host='127.0.0.1',
port='54321'
)
conn.autocommit = False

cur = conn.cursor()

# 建表
cur.execute('DROP TABLE IF EXISTS test_user')
cur.execute('''
CREATE TABLE test_user (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')

# 批量插入
data = [(i, f'user_{i}') for i in range(1, 1001)]
extras.execute_values(
cur,
"INSERT INTO test_user (id, name) VALUES %s",
data,
page_size=200
)

# 查询
cur.execute("SELECT COUNT(*) FROM test_user")
count = cur.fetchone()[0]
print(f"插入了 {count} 条数据")

conn.commit()
cur.close()

except Exception as e:
print(f"操作失败: {e}")
if conn:
conn.rollback()
finally:
if conn:
conn.close()

if __name__ == "__main__":
main()

十、总结

下篇主要讲了这几个方面:

SQL 执行:execute() 执行查询和非查询语句,fetchone/fetchmany/fetchall 获取结果
参数传递:用 %s 占位符,不要拼接字符串
批量操作:execute_values 性能最佳,大批量插入首选
COPY 命令:数据导入导出最快的方式
大对象:BLOB/CLOB/BYTEA 的处理方式
动态 SQL:表名、字段名用 sql 模块处理
错误处理:区分异常类型,正确处理事务回滚
两篇合在一起,覆盖了 Python 操作金仓数据库的常用场景。从连接到高可用,从 SQL 执行到批量操作,基本够用了。如果遇到官方文档没覆盖的场景,建议去金仓官网看最新的驱动包和示例。

上一篇 【转载】HCIE R&S 备考笔记 MPLS virtual private network 地址重叠实验(华为设备)
下一篇 同一交换机上,设备ping不通?