SQLAlchemy连上了金仓数据库

一、方言包是什么,为什么非要手动装
SQLAlchemy本身只是个壳,它不知道自己该怎么跟数据库说话。每种数据库需要一个“翻译”,这个翻译就叫方言包(dialect)。

MySQL、PostgreSQL这种主流数据库,SQLAlchemy安装的时候就把它们的方言包带上了。但金仓不在这个列表里,所以得自己去官网下,然后手动放到指定位置。

方言包底层依赖ksycopg2驱动,所以这两个都得装。

二、安装步骤(踩坑记录)

2.1 先装SQLAlchemy

pip install sqlalchemy

装完看一眼装在哪了,后面放方言包要用:

pip show sqlalchemy

我的机器输出是这样的:

Name: SQLAlchemy
Version: 1.4.36
Location: /usr/local/lib/python3.8/site-packages

记下这个Location路径。

2.2 装ksycopg2驱动

pip install ksycopg2

这个倒是顺利,没遇到什么问题。Windows用户可能需要装ksycopg2-win64

2.3 放方言包(这里卡了我半天)

找到SQLAlchemy的安装目录,进去找到dialects文件夹:

cd /usr/local/lib/python3.8/site-packages/sqlalchemy/dialects

把从官网下载的kingbase文件夹整个复制到这里。最终目录结构应该是:

sqlalchemy/dialects/
├── kingbase/
│ ├── __init__.py
│ └── base.py
├── mysql/
├── postgresql/
└── ...

我一开始放错了地方,放到了site-packages根目录下,结果死活加载不了。后来仔细看了文档才发现要放到dialects下面。

版本匹配问题:官方提供了1.3、1.4、2.0三个版本的方言包。我用的SQLAlchemy是1.4.36,选了1.4版本的方言包。版本不对会报一些奇怪的错,比如找不到某个模块。

三、连接数据库

3.1 连接字符串怎么写

折腾完安装,终于可以写代码了。连接字符串的格式是:

from sqlalchemy import create_engine

engine = create_engine('kingbase+ksycopg2://SYSTEM:123456@127.0.0.1:54321/TEST')

翻译一下:

kingbase:方言名,告诉SQLAlchemy用金仓的方言包
ksycopg2:驱动名,实际干活的是这个
SYSTEM:数据库用户名
123456:密码
127.0.0.1:IP
54321:端口(金仓默认是这个)
TEST:数据库名
+ksycopg2其实可以省略,写成kingbase://...就行,默认就是用它。

3.2 测试一下能不能连上

from sqlalchemy import create_engine

conn_str = 'kingbase://SYSTEM:123456@127.0.0.1:54321/TEST'
engine = create_engine(conn_str)

conn = engine.connect()
result = conn.execute("SELECT version()")
print(result.fetchone())
conn.close()

如果看到版本信息输出,恭喜,连上了。

3.3 连接池参数(生产环境有用)

如果是写Web服务或者长期运行的脚本,建议配置一下连接池:

engine = create_engine(
'kingbase://SYSTEM:123456@127.0.0.1:54321/TEST',
pool_size=10, # 连接池里放多少个连接
max_overflow=20, # 不够用时最多再创建多少个
pool_recycle=3600, # 连接用多久回收(秒)
pool_pre_ping=True # 用之前先ping一下,确认还活着
)

pool_pre_ping=True这个参数挺实用的,能避免拿到一个已经断开的连接。

四、ORM建模和基本操作

连接搞定了,接下来看看怎么用ORM操作数据库。

4.1 定义模型

先建个基类,然后定义表对应的类

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime

Base = declarative_base()

class User(Base):
__tablename__ = 'test_user' # 表名

id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(100))
created_at = Column(DateTime, default=datetime.now)

def __repr__(self):
return f"User(id={self.id}, name='{self.name}', email='{self.email}')"

__tablename__是必须的,不写SQLAlchemy不知道表叫什么。

4.2 建表

# 自动创建表(如果不存在的话)
Base.metadata.create_all(engine)

这个操作是幂等的,执行多次也不会重复建表。

4.3 创建Session

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)
session = Session()

4.4 增删改查

插入数据:

user1 = User(name='张三', email='zhangsan@test.com')
user2 = User(name='李四', email='lisi@test.com')

session.add(user1)
session.add(user2)
# 或者一次加多个:session.add_all([user1, user2])
session.commit()

查询数据:

# 查所有
users = session.query(User).all()
for u in users:
print(u.name, u.email)

# 条件查询
user = session.query(User).filter(User.name == '张三').first()
print(user.email)

# 模糊查询
users = session.query(User).filter(User.name.like('%张%')).all()

更新数据:

# 方式1:查出来改属性
user = session.query(User).filter(User.name == '张三').first()
user.email = 'newemail@test.com'
session.commit()

# 方式2:批量更新(不查直接改)
session.query(User).filter(User.name == '张三').update(
{"email": "batch_update@test.com"}
)
session.commit()

删除数据:

# 查出来删
user = session.query(User).filter(User.name == '张三').first()
session.delete(user)
session.commit()

# 批量删
session.query(User).filter(User.id > 100).delete()
session.commit()

4.5 事务处理

Session默认不会自动提交,调用commit()才真正写入。如果中间出错,可以回滚:

try:
session.add(User(name='王五', email='wangwu@test.com'))
# 这里如果出错...
session.commit()
except Exception as e:
session.rollback()
print(f"出错了: {e}")
finally:
session.close()

4.6 完整跑一遍

把上面的代码串起来,跑一个完整的例子:

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String

# 连数据库
engine = create_engine('kingbase://SYSTEM:123456@127.0.0.1:54321/TEST')

# 定义模型
Base = declarative_base()
class User(Base):
__tablename__ = 'test_user'
id = Column(Integer, primary_key=True)
name = Column(String(50))
email = Column(String(100))
def __repr__(self):
return f"User(id={self.id}, name={self.name})"

# 建表
Base.metadata.create_all(engine)

# 操作
Session = sessionmaker(bind=engine)
session = Session()

# 插入
session.add_all([
User(name='张三', email='zhangsan@test.com'),
User(name='李四', email='lisi@test.com'),
])
session.commit()

# 查询并打印
for user in session.query(User).all():
print(user)

session.close()

五、踩过的几个坑

坑1:方言包放错位置

这是最容易犯的错。方言包必须放在sqlalchemy/dialects/kingbase/,不是随便扔到site-packages就行。

坑2:版本不对应

SQLAlchemy 1.4的版本要用1.4的方言包。拿1.3的方言包去连1.4的SQLAlchemy,会报模块找不到的错误。

坑3:libkci.so找不到

报错信息是libkci.so: cannot open shared object file。设置一下环境变量就能解决:

export LD_LIBRARY_PATH=/home/kingbase/lib:$LD_LIBRARY_PATH

坑4:连接字符串写错

kingbase://不要写成kingbase+psycopg2://,虽然ksycopg2基于psycopg2改的,但名字写错了就是连不上。

上一篇 环路导致S9700 CPU利用率高