1.1需求

数据库300 万条用户数据 ,遍历获取所有用户, 各种组合关联, 获取到一个新的json ,存到redis 上。

1.2 难点

数据库比较多, 不可能单线程查询所有的数据到内存。

1.3解决办法

多线程读取, 生产者 每次获取200 条数据, 消费者去消费。(这里 主要是根据MySQL分页去获取下一个200 条数据)

1.4 代码

1.4.1 调用方法

    /**
* 线程启动
*/
public void update() {
//redis操作类
HashRedisUtil redisUtil= HashRedisUtil.getInstance();
//生产者消费者
ProducerConsumer pc = new ProducerConsumer();
//数据仓库
Storage s = pc.new Storage(); ExecutorService service = Executors.newCachedThreadPool();
//一个线程进行查询
Producer p = pc.new Producer(s,userMapper);
service.submit(p);
System.err.println("生产线程正在生产中。。。。。。。。。");
//是个线程进行修改
for(int i=0;i<10;i++){
System.err.println("消费线程"+i+"正在消费中。。。。。。。。。。");
service.submit(pc.new Consumer( redisUtil,userMapper,s));
} }

1.4.2 主要核心类

package com.ypp.thread;

import java.math.BigDecimal;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.joda.time.LocalDateTime; import com.alibaba.fastjson.JSONObject;
import com.ypp.constants.Constants;
import com.ypp.mapper.UserMapper;
import com.ypp.model.User;
import com.ypp.model.UserAlis;
import com.ypp.model.UserBaseModel;
import com.ypp.model.UserVip;
import com.ypp.util.HashRedisUtil;
import com.ypp.util.JsonUtils;
import com.ypp.util.PHPSerializer; public class ProducerConsumer {
private static Logger logger = Logger.getLogger(ProducerConsumer.class);
//这个page 是核心, 全局变量, 当生产者生产一次 ,获取200 个用户, 会把这个page++, 下次获取就是后一个200 条用户了
private static Integer page = 0; //消费者

public class Consumer implements Runnable { private HashRedisUtil redisUtil;
private UserMapper userMapper;
private Storage s = null; public Consumer(HashRedisUtil redisUtil, UserMapper userMapper, Storage s) {
super();
this.redisUtil = redisUtil;
this.userMapper = userMapper;
this.s = s;
} public void run() {
try {
while (true) {
User users = s.pop(); long bbb = System.currentTimeMillis();
// 获取一个用户的粉丝列表 并存到redis
try {
fansUpdate(users.getToken(), users.getUserId(), redisUtil);
} catch (Exception e1) {
e1.printStackTrace();
}
// 获取一个用户的关注列表, 并存到redis
try {
followUpdate(users.getToken(), users.getUserId(), redisUtil);
} catch (Exception e) {
e.printStackTrace();
}
// 获取一个用户的黑名单, 并存到redis
try {
blackUpdate(users.getToken(), users.getUserId(), redisUtil);
} catch (Exception e) {
e.printStackTrace();
}
// 用户基本信息
try {
userbaseUpdate(users.getToken(), users.getUserId(), redisUtil);
} catch (Exception e) {
e.printStackTrace();
}
long ccc = System.currentTimeMillis();
System.out.println("用户:" + users.getToken() + " 全部总共耗时:" + (ccc - bbb) + "毫秒"); Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
} } public List<User> getUserInfo(Integer iThread) {
return userMapper.findUserInfo((iThread - 1) * 200 + 1);
} /**
* 用户基本信息修改
*
* @param token
* @param myuserId
* @param redisUtil
* @throws Exception
*/
private void userbaseUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /**
* 更新一个用户的黑名单(原来的token改成userID)
*
* @param token
* @param string
* @param redisUtil
* @throws Exception
*/
private void blackUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /**
* 获取一个用户的关注
*
* @param token
* @param string
* @param redisUtil
* @throws Exception
*/
private void followUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } /**
* 获取一个用户的粉丝列表
*
* @param token
* @param userId
* @param redisUtil
* @throws Exception
*/
private void fansUpdate(String token, String myUserId, HashRedisUtil redisUtil) throws Exception { } //生产者
public class Producer implements Runnable {
private Storage s = null;
private UserMapper mapper ;
public Producer( Storage s, UserMapper mapper) { this.s = s;
this.mapper = mapper;
}
public void run() {
try { while (true) { System.err.println("当前分页是:"+page+"****************************************");
List<User> list= mapper.findUserInfo(page);
s.push(list);
page++;
}
} catch (InterruptedException e1) {
e1.printStackTrace();
} }
}

//数据仓库
public class Storage {
BlockingQueue<User> queues = new LinkedBlockingQueue<User>(200); /**
* 生产
*
* @param p
* 产品
* @throws InterruptedException
*/
public void push(List<User> p) throws InterruptedException {
for(User user:p){
queues.put(user);
}
} /**
* 消费
*
* @return 产品
* @throws InterruptedException
*/
public User pop() throws InterruptedException {
return queues.take();
}
} }

最新文章

  1. Lesson 18 He often does this!
  2. PHP面试题集之基础题
  3. kuangbin_ShortPath P (HDU 4725)
  4. 使用 OWIN 作为 ASP.NET Web API 的宿主
  5. 使用jstack分析java程序cpu占用率过高
  6. javascript学习笔记3
  7. LightOj_1321 Sending Packets
  8. ASP.NET-FineUI开发实践-9(四)
  9. 机器学习数学|Taylor展开式与拟牛顿
  10. visual studio Web发布至 IIS WebDeploy出错(未能创建SSL/TLS安全通道)Could not create SSL/TLS secure channel
  11. LeetCode 973 K Closest Points to Origin 解题报告
  12. 基于MySQL提供的Yum repository安装MySQL5.6
  13. 常用工具类系列之DateUtil
  14. Java查找替换文本文件内容
  15. k8s1.4.3安装实践记录(1)-etcd、docker、flannel安装配置
  16. 集合之Stack
  17. 仿LOL项目开发第七天
  18. Python学习笔记四:列表,购物车程序实例
  19. JAVA连接各种数据库详解
  20. mysql中int(3)与int(11)有什么区别吗?

热门文章

  1. idea for mac 项目打开的情况下import project
  2. MailHelper
  3. 第三篇:异步请求遇上for循环怎么做
  4. Loadrunner 性能测试工具笔记
  5. UMP系统功能 资源隔离
  6. ListControl 设置表格行高与字体
  7. vue倒计时:天时分秒
  8. 扩展kmp板子
  9. 2016.9.17初中部下午NOIP普及组比赛总结
  10. String相加解析