public class RtmpSpyingTests extends AbstractTransactionalJUnit4SpringContextTests {
@Autowired
ThreadPoolTaskExecutor rtmpSpyingTaskExecutor; @Autowired
ApplicationContext ctx; @Autowired
RtmpSourceRepository rtmpRep; @Test
public void test() {
RtmpSource rtmpSourceSample = new RtmpSource("test"); rtmpRep.save(rtmpSourceSample);
rtmpRep.flush(); List<RtmpSource> rtmpSourceList = rtmpRep.findAll(); // Here I get a list containing rtmpSourceSample RtmpSpyingTask rtmpSpyingTask = ctx.getBean(RtmpSpyingTask.class,
"arg1","arg2");
rtmpSpyingTaskExecutor.execute(rtmpSpyingTask); }
} public class RtmpSpyingTask implements Runnable { @Autowired
RtmpSourceRepository rtmpRep; String nameIdCh;
String rtmpUrl; public RtmpSpyingTask(String nameIdCh, String rtmpUrl) {
this.nameIdCh = nameIdCh;
this.rtmpUrl = rtmpUrl;
} public void run() {
// Here I should get a list containing rtmpSourceSample, but instead of that
// I get an empty list
List<RtmpSource> rtmpSource = rtmpRep.findAll();
}
} 应该用
@Service
public class AsyncTransactionService { @Autowired
RtmpSourceRepository rtmpRep; @Transactional(readOnly = true)
public List<RtmpSource> getRtmpSources() {
return rtmpRep.findAll();
} @Transactional(propagation = Propagation.REQUIRES_NEW)
public void insertRtmpSource(RtmpSource rtmpSource) {
rtmpRep.save(rtmpSource);
}
}

或者

用内部类。

package com.italktv.platform.audioDist.service;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.TimeUnit; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import com.italktv.platform.audioDist.mongo.CustomerRepository;
import com.italktv.platform.audioDist.mongo.PlayUrl;
import com.italktv.platform.audioDist.mongo.PlayUrl.MyUrl;
import com.italktv.platform.audioDist.mongo.PlayUrlRepository;
import com.italktv.platform.audioDist.mysql.SubSet;
import com.italktv.platform.audioDist.mysql.UserRepository;
import com.italktv.platform.audioDist.task.MyTask;
import com.italktv.platform.audioDist.task.TaskManager; @Component
public class ScheduleJobs {
private static final Logger log = LoggerFactory.getLogger(ScheduleJobs.class); public final static long SECOND = 1 * 1000;
LocalDateTime nowDate = LocalDateTime.now(); @Autowired
// This means to get the bean called userRepository
// Which is auto-generated by Spring, we will use it to handle the data
private UserRepository userRepository; @Autowired
private PlayUrlRepository repository;
@Autowired
private CustomerRepository cc; @Autowired
private UserRepository user; @Autowired
TaskManager taskManager; @Scheduled(fixedRate = SECOND * 400)
public void fixedRateJob() {
nowDate = LocalDateTime.now();
System.out.println("=== start distribution: " + nowDate);
dotask();
} // @PostConstruct
// public void init() {
//
// taskManager = new TaskManager();
// taskManager.init();
// }
//
// @PreDestroy
// void destroy() {
// taskManager.destroy();
// } void dotask() { Map<Integer, List<SubSet>> map = userRepository.getUploadFileMap();
for (Entry<Integer, List<SubSet>> subject : map.entrySet()) {
int subjectId = subject.getKey();
log.info(" subject id:" + subjectId);
List<SubSet> allsub = subject.getValue();
for (SubSet item : allsub) {
log.info(" sub:" + item.toString());
taskManager.add(new MessagePublish(item.id, item.path));
} //wait them finished
//TODO: //update subject status
//TODO } } ////////////////////////内部类////////////////////////
public class MessagePublish extends MyTask implements Serializable{
public MessagePublish() {
super();
}
public MessagePublish(int id,String name ){
this.srcFile = name;
this.partId=id;
} @Value("${platform.audio.dist.domain}") private String domain; @Override
public String call() {
System.out.println(srcFile + " is uploading...");
try {
//获取消息发布的区域
TimeUnit.SECONDS.sleep(new Random().nextInt(10)+1);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(srcFile + " uploaded."); //2.RECORD TO MONGO DB
PlayUrl play=new PlayUrl();
play.programid="programid fake"+ "";
play.domain=domain;
play.protocol="HTTP";
MyUrl myurl=new MyUrl();
myurl.high="http://xxx.xxx/xi//";
play.url=myurl;
repository.save(play);
//TODO: //IF FAILED, RETRY, RECORD RETRY TIMES.
//TODO: return "ok";
} }
} package com.italktv.platform.audioDist.task; import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; @Component
public class TaskManager { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TaskManager.class); // @Resource(lookup = "java:comp/DefaultManagedScheduledExecutorService")
// ManagedScheduledExecutorService executor; Map<String, Future<String>> tasks;
ExecutorService executor ;
@PostConstruct
public void init() {
logger.info(" === init TaskManager===");
tasks = new HashMap<String, Future<String>>();
executor = Executors.newFixedThreadPool(3);
} public void add(MyTask task) {
logger.info("add delay:"+ task.partId+task.srcFile);
Future<String> future = executor.submit(task);
tasks.put(task.srcFile, future);
} public boolean cancel(String name) {
logger.info("cancel "+ name);
boolean ret = false;
Future<String> future = tasks.get(name);
if (future == null) {
logger.info("Not found name:" + name);
} else {
ret = future.cancel(true);
logger.info("cancel "+ name+":"+ret);
tasks.remove(name);
}
return ret;
} public void waitTaskDone(){
Collection<Future<String>> futuretasks = tasks.values();
for(Future<String> future: futuretasks ){
System.out.println("future done? " + future.isDone()); String result="";
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("future exec failed.");
e.printStackTrace();
} System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
}
}
@PreDestroy
public void destroy(){
try {
System.out.println("attempt to shutdown executor");
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
finally {
if (!executor.isTerminated()) {
System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}
} package com.italktv.platform.audioDist.task; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; public abstract class MyTask implements Callable<String> {
protected String srcFile;
protected int partId;
String programId; protected MyTask() { } }

最新文章

  1. 「理解HTTP」之常见的状态码segmentfault
  2. ASP.NET MVC 5 -从控制器访问数据模型
  3. C#之Windows消息处理
  4. 谈谈“色彩空间表示方法”——RGB、YUY2、YUYV、YVYU、UYVY、AYUV
  5. Dinic
  6. 20145120 《Java程序设计》实验二实验报告
  7. java Spring 生命周期
  8. Apache 一台主机绑定多个域名及虚拟主机
  9. SG函数学(hua)习(shui)记录
  10. Intellj Idea使用tomcat部署不成功,死活也找不到解决办法的看这里
  11. Mybatis 系列1
  12. 关于vue的增删改查操作
  13. Python—包介绍
  14. consul介绍
  15. ES6快速入门(一)函数与作用域
  16. csrf 攻击和防御
  17. zabbix学习小结
  18. win8 下面 listen 的队列长度貌似无效了 上c/s 代码 并附截图,有图有真相
  19. RedHat6.5上安装Hadoop单机
  20. [Vuejs] 组件 v-if 和 v-show 切换时生命周期钩子的执行

热门文章

  1. [转帖]SAP一句话入门:Finacial &amp; Controlling Accounting
  2. [转帖]Docker save and load镜像保存
  3. [转帖]CentOS 6 服务器安全配置指南(通用)
  4. select2 简单解析
  5. linux安装php7.2.7
  6. 【数学建模】day10-主成分分析
  7. ng-click 发两次ajax请求的原因及解决方法
  8. TP5.x——update更新成功但是返回是0
  9. Codeforces 714A 朋友聚会
  10. HDU3507-Print Article-斜率dp入门题