日常数据管理工作中,需要处理存储在不同类型数据库系统的数据。对这些数据的管理,常见的是使用Navicat,DBeaver等管理工具。在对大量数据分析时,需要提取到Python/ target=_blank class=infotextkey>Python/R中进行处理。下面探索Python调用MySQL,MongoDB,InfluxDB等多种类型数据库通用连接方法。实现方式是在Python中封装各类数据库接口包。
实现后的效果:1.安全。接口信息封装便于保密管理;2.复用。一次封装,永久复用;3.上手快。方便不熟悉python和数据调用的同学,只会简单的sql即可使用,省时省力。
下面以MySQL,MongoDB,InfluxDB为例定义接口方法,然后把它们封装成1个通用方法。
mysql_get(sql,db):
# 导入包
import pandas as pd
from sqlalchemy import create_engine
# 定义取数函数
def mysql_get(sql,db):
conn = create_engine('mysql+pymysql://root:123456@localhost:3306/'+db)
df_read = pd.read_sql_query(sql, conn)
conn.dispose()
return df_read
mongo_get(sql,db):
# 导入包
import pymongo
from bson.objectid import ObjectId
# 处理sql语句in的内容,准备工作
list_x = ['c', 'd', 'e', 'f'] # 非_id字段
list_id = ['df343dr34rwfd3', 'ji80jju8jeoe9'] # _id字段
obj_x = ObjectId(list_id) # _id字段
# 定义取数函数
# eg, sql1:"select a,b from tb where a >= 1 order by b limit 5 "
# eg, sql2:"select a,b from tb where a in {list_x} limit 5"
# eg, sql3:"select a,b from tb where _id in {obj_x} limit 5"
def mongo_get(sql, db):
# 第一步,解析sql,得到projectionFields,order by顺序,head(n)等,过程略
# 第二步,执行查询,获得结果
mycl.NET = pymongo.MongoClient("mongodb://127.0.0.1:27017")
collection = myclinet[db]
serchRes = collection.find(queryArgs, projection=projectionFields)
# 第三步,返回dict
return serchRes
influx_get(sql,db):
import pandas as pd
from influxdb import InfluxDBClient
def influx_get(sql,db):
client = InfluxDBClient('127.0.0.1','8086','root','root',db)
# eg,sql:"select * from SYS_DISK where time >= '2022-06-01 00:00:00' and time <= '2022-06-30 23:55:00';"
tab_res = client.query(sql)
query_res = list(tab_res)
df = pd.DataFrame(query_res[0])
return df
可以看到,以上函数共同调用的参数为sql和db。我们再增加一个参数db_type,将构造一个通用的方法对以上数据库调用。
def get_data(db_type,sql,db):
if db_type == 'mysql':
return mysql_get(sql,db)
elif db_type == 'mongo':
return mongo_get(sql,db)
else:
return influx_get(sql,db)
同理,其他类型的数据库也可以加入到这个通用框架中,包括但不限于各类关系型,键值型,时序型数据库。