数据库的连接池建议放在类似settings.py的配置模块中,因为基本都是配置项,方便统一管理。

1) 连接池类#settings.py

import os
from DBUtils.PooledDB import PooledDB
from elasticsearch import Elasticsearch
import pymysql
class Config(object):
POOL = PooledDB(
creator = pymysql,
maxconnections = 20,
mincached = 3,
maxcached = 3,
host = '10.208.116.41',
port = 3306,
user = 'hadoop',
password= 'XXXXX',
database = 'hadoop_tools'
)
#elasticsearch connect
es_conn = Elasticsearch(
['10.208.116.33', '10.208.116.34', '10.208.116.35'],
sniff_timeout=60,
sniff_on_start=True
)
es_mapping = {
'properties': {
'title': {
'type': 'text',
'analyzer': 'ik_max_word',
'search_analyzer': 'ik_max_word'
}
}
} #POOL test
if __name__ == '__main__':
conn = Config.POOL.connection()
#cur = conn.cursor(pymysql.cursors.DictCursor)
cur = conn.cursor()
cur.execute("select * from hs2_status")
ret = cur.fetchall()
for i in ret:
print(i)

2)封装MySQL的操作

utils/helper.py

import pymysql
from ops.settings import Config
def connect():
conn = Config.POOL.connection()
cur = conn.cursor(pymysql.cursors.DictCursor)
return conn,cur
def connect_close(conn,cur):
conn.close()
cur.close() def fetch_all(sql,args):
conn,cur = connect()
cur.execute(sql,args)
result = cur.fetchall()
connect_close(conn,cur)
return result def fetch_one(sql,args):
conn,cur = connect()
cur.execute(sql,args)
result = cur.fetchone()
connect_close(conn,cur)
return result def insert(sql,args):
conn, cur = connect()
row = cur.execute(sql,args)
conn.commit()
connect_close(conn,cur)
return row def update(sql,args):
conn, cur = connect()
row = cur.execute(sql,args)
conn.commit()
connect_close(conn,cur)
return row def delete(sql,args):
conn, cur = connect()
row = cur.execute(sql,args)
conn.commit()
connect_close(conn,cur)
return row

##下面这段调试用,可以去掉
if __name__ == '__main__':
in_zk = 'YES'
ip = '10.208.106.159'
# row = update("update hs2_status set in_zk=%s where host_ip = %s",(in_zk,ip))
# print("row is %s" %(row)) # 获得表头
# sql = "SHOW FIELDS FROM hs2_status"
# tb_h = fetch_one(sql,()) # 执行sql
# #hs2_header = [l[0] for l in tb_h]
# # for i in tb_h:
# print("this is %s" %(tb_h))
#获取内容
# hs2_content = fetch_all(sql, ())
sql = "select * from hs2_status"
hs2_content = fetch_all(sql,())
for i in hs2_content:
print(i)

3)ES连接引用示例:

#!/usr/bin/env python
# encoding: UTF-8
from ops.settings import Config #create index,change mappings
Config.es_conn.indices.create(index='news', ignore=400)
Config.es_conn.indices.put_mapping(index='news', doc_type='politics', body=Config.es_mapping) datas = [
{
'title': '美国留给伊拉克的是个烂摊子吗',
'url': 'http://view.news.qq.com/zt2011/usa_iraq/index.htm',
'date': '2011-12-16'
},
{
'title': '公安部:各地校车将享最高路权',
'url': 'http://www.chinanews.com/gn/2011/12-16/3536077.shtml',
'date': '2011-12-16'
},
{
'title': '中韩渔警冲突调查:韩警平均每天扣1艘中国渔船',
'url': 'https://news.qq.com/a/20111216/001044.htm',
'date': '2011-12-17'
},
{
'title': '中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首',
'url': 'http://news.ifeng.com/world/detail_2011_12/16/11372558_0.shtml',
'date': '2011-12-18'
}
] #insert data
for data in datas:
Config.es_conn.index(index='news', doc_type='politics', body=data) #query data
result = Config.es_conn.search(index='news')
print(result)

  

最新文章

  1. 用apt-file解决找不到头文件的问题
  2. mysql忘记root密码解决办法
  3. d3 API scale
  4. 移动互联网实战--资源类APP的数据存储处理和优化
  5. 1.3 Content Provider
  6. dos界面下执行java文件将错误输出到一个文本小技巧
  7. crystal report format number
  8. 移动App測试实战:顶级互联网企业软件測试和质量提升最佳实践
  9. 最小生成树—prim算法
  10. 最快的3x3中值模糊
  11. wing带你玩转自定义view系列(2) 简单模仿qq未读消息去除效果
  12. P1090 合并果子 题解
  13. JAVA之锁-volatile
  14. 用SQL表达连接与外连接
  15. 2、每日复习点--ConcurrentHashMap vs HashMap vs LinkedHashMap vs HashTable
  16. Linux技术图谱
  17. ORACLE 存储函数
  18. PHP中通过bypass disable functions执行系统命令的几种方式
  19. Vue学习系列---安装
  20. 【Spark】SparkStreaming-输出到Kafka

热门文章

  1. F与Q查询 事务 choices参数
  2. vue3 自己做一个轻量级状态管理,带跟踪功能,知道是谁改的,还能定位代码。
  3. docker运行gerrit(代码审查工具)
  4. 带你读AI论文丨LaneNet基于实体分割的端到端车道线检测
  5. 离线安装VS Code Server
  6. wpf 中的 自定义控件的 binding
  7. node后台生成echarts图表
  8. ffmpeg细节整理记录
  9. (原创)[C#] DataTable排序扩展方法
  10. VSCode中相对路径设置问题