/**
* 首次启动加载数据至缓存
*/
public class ApplicationStartTask { private static Logger logger = LoggerFactory.getLogger(ApplicationStartTask.class);
private AtomicInteger loadNum = new AtomicInteger(0); @Autowired
private FaceProducer cacheSynchronizationProducer;
@Autowired
private FaceConsumer cacheSynchronizationConsumer; @Autowired
private FaceInfoService faceInfoService; /**
* 当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,
* <br>等到需要执行的任务数大于线程池基本大小时就不再创建
*/
private static final int POOL_CORE_SIZE = Runtime.getRuntime().availableProcessors(); /**
* 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务
*/
private static final int POOL_MAX_SIZE = Runtime.getRuntime().availableProcessors(); /**
* 线程池的工作线程空闲后,保持存活的时间 <br>
* 单位:分
*/
private static final int POOL_KEEP_ALIVE_TIME = 2; @Autowired
@Qualifier("sqliteManagerImpl")
private SqlieteManager sqlieteManager; @Autowired
private FaceCompareManager faceCompareManager; @Value("${local.sqlite.sqlname}")
private String sqlName; @Bean
public ApplicationStartTask loadCache() {
logger.info("[启动任务]首次启动载入数据至缓存...");
checkSqLiteFile();
List<String> belongIds = sqlieteManager.getAllBelongIds();
if (ArrayUtil.isNotEmpty(belongIds)) { loadFromSqlite(belongIds); initMqServer(); return new ApplicationStartTask();
}
loadFromDB(); initMqServer();
return new ApplicationStartTask(); } /**
* 开启MQ服务
*/
private void initMqServer() { logger.info("开始开启MQ服务...");
cacheSynchronizationProducer.producerStart();
cacheSynchronizationConsumer.getMessage();
} /**
* 直接从MYSQL载入数据
*/
private void loadFromDB() {
logger.info("当前CPU内核线程数:{}", POOL_CORE_SIZE); long beginTimsamp = System.currentTimeMillis(); List<String> allBelongId = faceCompareManager.getAllBelongId();
int size = 0;
for (String belongId : allBelongId) {
FaceInfoSyncDataQuery query = new FaceInfoSyncDataQuery(belongId);
query.setPageSize(1000);
query.setDeleteStatus(0);
Result<Integer> result = faceInfoService.getTotalPage(query);
if (!result.isSuccess() || result.getModel() == 0) {
continue;
} int totalPage = result.getModel(); size += totalPage; }
logger.info("总任务数:{}", size);
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(POOL_CORE_SIZE, POOL_MAX_SIZE, POOL_KEEP_ALIVE_TIME,
TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(size));
CountDownLatch latch = new CountDownLatch(size); for (String belongId : allBelongId) { FaceInfoSyncDataQuery query = new FaceInfoSyncDataQuery();
query.setBelongId(belongId);
query.setDeleteStatus(0);
query.setPageSize(1000);
Result<FaceInfoSyncDataVO> result = faceInfoService.getFaces(query);
if (!result.isSuccess() || result.getModel() == null || result.getModel().getTotalPages() == 0) {
continue;
}
FaceInfoSyncDataVO faceInfoSyncDataVO = result.getModel(); int totalPage = faceInfoSyncDataVO.getTotalPages();
//第一页 poolExecutor.execute(new CacheLoadTask1(1, belongId, faceInfoSyncDataVO.getFaces(), latch));
if (totalPage == 1) {
continue;
}
for (int i = 2; i <= totalPage; i++) {
query = new FaceInfoSyncDataQuery();
query.setBelongId(belongId);
query.setPageSize(1000);
query.setPageNo(i);
query.setDeleteStatus(0);
result = faceInfoService.getFaces(query);
if (!result.isSuccess() || result.getModel() == null || ArrayUtil.isEmpty(result.getModel().getFaces())) {
continue;
}
faceInfoSyncDataVO = result.getModel(); poolExecutor.execute(new CacheLoadTask1(i, belongId, faceInfoSyncDataVO.getFaces(), latch)); }
}
try { latch.await();// 同步阻塞,直到所有线程工作完成
// long endTime = System.currentTimeMillis(); // 获取结束时间 poolExecutor.shutdown();// 不允许提交新任务,等待当前任务及队列中的任务全部执行完毕后退出
// // 支持等待以前提交的任务停止执行
// // 所有任务关闭请求或线程中断或超时,阻塞取消
// poolExecutor.awaitTermination(20, TimeUnit.MINUTES);
// poolExecutor.shutdownNow();//通过Thread.interrupt试图停止所有正在执行的线程,并不再处理还在队列中等待的任务...
} catch (Exception e) {
logger.error("[启动任务]线程池异常打印:{}", e);
poolExecutor.shutdownNow(); }
// logger.info("[启动任务]从DB中载入数据至缓存完毕!共载入{}条人脸数据", loadNum.get()); logger.info("[启动任务]完毕!全文索引{}条数据,共耗时:{}s", loadNum.get(), (System.currentTimeMillis() - beginTimsamp) / 1000); } class CacheLoadTask1 implements Runnable { private CountDownLatch latch;
List<FaceInfoDO> faceInfoDOs = null;
int putCacheNum = 0;
String belongId = null;
int pageNo = 0; public CacheLoadTask1(int pageNo, String belongId, List<FaceInfoDO> faceInfoDOs, CountDownLatch latch) {
this.latch = latch;
this.faceInfoDOs = faceInfoDOs;
this.belongId = belongId;
this.pageNo = pageNo;
// this.index = index;
} @Override
public void run() {
Long beginTime = System.currentTimeMillis();
try {
for (FaceInfoDO faceInfoDO : faceInfoDOs) {
boolean isPutCache = FaceInfoCacheManage.put(faceInfoDO);
if (isPutCache) {
putCacheNum++;
loadNum.incrementAndGet(); }
}
logger.info("[启动任务]线程{}处理完belongId-{}第{}页数据,共载入{}条人脸数据,耗时:{}ms", Thread.currentThread().getName(),
belongId, pageNo, putCacheNum, System.currentTimeMillis() - beginTime); } catch (Exception e) {
logger.error("[启动任务]载入sqlite任务[CacheLoadTask]出错:{}", e);
} finally {
// TODO: handle finally clause
latch.countDown();
}
} } /**
* 从sqlite数据库中加载数据
*/
private void loadFromSqlite(List<String> belongIds) { logger.info("当前CPU内核线程数:{}", POOL_CORE_SIZE);
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(POOL_CORE_SIZE, POOL_MAX_SIZE, POOL_KEEP_ALIVE_TIME,
TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(belongIds.size()));
CountDownLatch latch = new CountDownLatch(belongIds.size());
for (String belongId : belongIds) {
List<FaceInfoDO> faceInfoDOs = sqlieteManager.selectAllInfoByBelongId(sqlName, belongId); poolExecutor.execute(new CacheLoadTask(belongId, faceInfoDOs, latch)); } try { latch.await();// 同步阻塞,直到所有线程工作完成
// long endTime = System.currentTimeMillis(); // 获取结束时间
logger.info("[启动任务]从sqlite中载入数据至缓存完毕!共载入{}条人脸数据", loadNum.get()); poolExecutor.shutdown();// 不允许提交新任务,等待当前任务及队列中的任务全部执行完毕后退出
// // 支持等待以前提交的任务停止执行
// // 所有任务关闭请求或线程中断或超时,阻塞取消
// poolExecutor.awaitTermination(20, TimeUnit.MINUTES);
// poolExecutor.shutdownNow();//通过Thread.interrupt试图停止所有正在执行的线程,并不再处理还在队列中等待的任务...
} catch (Exception e) {
logger.error("[启动任务]线程池异常打印:{}", e);
poolExecutor.shutdownNow(); } } private void checkSqLiteFile() {
File sqliteFile = new File(sqlName + ".sqlite3");
if (!sqliteFile.exists()) {
logger.info("[启动任务]sqlite数据库未创建,开始创建...");
// 第一次启动创建本地表
try {
sqlieteManager.createTable(sqlName);
} catch (Exception e) {
logger.error(e.getMessage());
} }
} }


最新文章

  1. MongoDB基础
  2. sass安装记录
  3. linux 录制并回放终端会话
  4. 李洪强漫谈iOS开发[C语言-048]-打印平方表
  5. 蓝牙技术BlueTooth
  6. Unity 处理策划的 Excel
  7. Eclipse插件开发
  8. leetcode@ [124] Binary Tree Maximum Path Sum (DFS)
  9. Book for Opencv
  10. nlpir分词器过期处理
  11. 创建 OVS 外部网络 ext_net - 每天5分钟玩转 OpenStack(144)
  12. php简单分页
  13. 使用solr6.0搭建solrCloud
  14. python 保存文本txt格式之总结篇,ANSI,unicode,UTF-8
  15. Java IO流 思维导图
  16. 基于jQuery实现简单的js模块化
  17. GC Tools
  18. Python随笔--爬虫(下载妹子图片)
  19. 我发起了一个 ILBC 的 子项目 EScript
  20. 华为机试001:字符串最后一个单词的长度(华为OJ001)

热门文章

  1. maven依赖问题的出现原因与解决方式
  2. moviepy1.03音视频剪辑:使用manual_tracking和headblur实现追踪人脸打马赛克
  3. PyQt(Python+Qt)学习随笔:QDial刻度盘部件功能简介
  4. 第10.4节 Python模块的弱封装机制
  5. Scrum 冲刺 第四篇
  6. Scrum 冲刺第一天
  7. 原创:DynamicDataDisplay波形显示自定义格式
  8. 将ACCESS 的数据库中的表的文件 导出了EXCEL格式
  9. redis学习之——CentOS 6 下载安装redis
  10. I/O方式(本章最重要)