最近项目组分配到研究storm-jdbc用法

发现网上关于insert和query方法挺多的,但是自定义方法很少。而且用法上也挺多缺陷。在此自己总结记录一下

JdbcInsertBolt 的核心代码

/*
*
* JdbcInsertBolt org.apache.storm 默认包下 此类不会进行emit。所以插入数据后不能接下层bolt
* 如果需要在插入数据后继续往下游流数据必须在源码中进行添加 在github可以获得类(JdbcLookupBolt也是类似,响应还有hbase和redis)
* */
public static JdbcInsertBolt getJdbcInsertBolt() {
System.out.println("had run.........==============");
//Fields outputFields = new Fields("session","time","count");//输出给下层bolt
//get session time
List<Column> schemaColumns = Lists.newArrayList(new Column("session", Types.VARCHAR),//上游过来的字段
new Column("time", Types.INTEGER));
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(schemaColumns);
//SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, schemaColumns);
JdbcInsertBolt jdbcInsertBolt = new JdbcInsertBolt(connectionProvider,simpleJdbcMapper)
.withInsertQuery("insert into webinfo(session,time) values(?,?)")
.withQueryTimeoutSecs(50);
return jdbcInsertBolt;
}

//query的核心代码

public static JdbcLookupBolt getJdbcLookupBlot() {
Fields outputFields = new Fields("session","time","count");//输出给下层bolt
List<Column> queryParamColumns = Lists.newArrayList(new Column("session", Types.VARCHAR),
new Column("time", Types.INTEGER)//上层bolt流入
);
//String insertSQL = "insert into student_infos(name,age) values(?,?)";
String selectSql = "select count(*) count from student_infos where session=? and time=?";
SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
.withQueryTimeoutSecs(30);

//return userNameLookupBolt;
// JdbcState.Options options = new JdbcState.Options()
// .withConnectionProvider(connectionProvider)
// .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
// .withSelectQuery("select user_name from user_details where user_id = ?")
// .withQueryTimeoutSecs(30);
// JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
return userNameLookupBolt;
}

//自定义查询方法的核心代码

private JdbcClient client;
public void setup() {
// Map map = Maps.newHashMap();
// map.put("dataSourceClassName","org.hsqldb.jdbc.JDBCDataSource");//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
// map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test
// map.put("dataSource.user","SA");//root
// map.put("dataSource.password","");//password
// ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
connectionProvider.prepare();

int queryTimeoutSecs = 60;
this.client = new JdbcClient(connectionProvider, queryTimeoutSecs);
client.executeSql("UPDATE webinfo s SET s.session='haitao' where s.time = '4' ");
}

关于用stormJDBC 效率提升上还是挺大的,项目组兄弟自己写插入一条在50MS左右,大数据下这种情况是不允许的,太慢,然后通过storm开源工具插入,大概在初始化50MS,后面每条是5MS。当然想更快应该也可以做事务JDBC。暂时还没研究

更多参考资料:

https://insight.io/github.com/apache/storm/tree/HEAD/external/storm-jdbc/

最新文章

  1. Web前端工程师成长之路——知识汇总
  2. ajax批删
  3. 一致性 hash 算法( consistent hashing )a
  4. 练习一:SQLite基本操作
  5. cri-o 创建非infra容器
  6. Nginx 笔记与总结(1)编译安装
  7. TFS使用中的问题
  8. Web前端学习笔记1
  9. Poj 3061 Subsequence(二分+前缀和)
  10. 把本地git仓库中的项目引入到码云上
  11. JavaScript设计模式(9)-享元模式
  12. 【NOI2001】炮兵阵地(状态压缩,动态规划)
  13. 用CSS画一个带阴影的三角形的示例代码
  14. WEB通知和React Native之即时通讯(iOS Android)
  15. 截取所有的winform runtime error
  16. 3451: Tyvj1953 Normal 点分治 FFT
  17. Could not determine own NN ID in namespace &#39;mycluster&#39;
  18. [UE4][Canvas]用C++代码绘制血条(HealthBar)
  19. 当堆遇到STL 代码焕发光芒
  20. python易错盲点排查之+=与+的区别分析以及一些赋值运算踩过的坑

热门文章

  1. 微信小程序(mpvue框架) 购物车
  2. php实现在不同国家显示网站的不同语言版本
  3. python中遇到的问题:IndentationError: unexpected indent
  4. Ubuntu Service说明与使用方法
  5. 07.27NOIP模拟赛
  6. Solrj demo
  7. Cesium官方教程12--材质(Fabric)
  8. Cesium官方教程8-- 几何体和外观效果
  9. Spring cloud config client获取不到配置中心的配置
  10. wpf中数据绑定和INotifyPeropertyChanged的理解