继承方式

HystrixCommand

public class UserSelectAllCommand extends HystrixCommand<List<User>> {
private RestTemplate restTemplate; /**
* 设置线程组 和命令名用于仪表盘统计信息
* 设置线程组 可以使同一个组使用同一个线程池
* .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey") 可以跟细粒度的线程池划分
* @param restTemplate
*/
public UserSelectAllCommand(RestTemplate restTemplate){
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("YouGroupName"))
.andCommandKey(HystrixCommandKey.Factory.asKey("YouCommandName"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(10000)//设置超时时间。我这边全局设置无效 就对应设置 ));
this.restTemplate=restTemplate;
}
@Override
protected List<User> run() throws Exception {
return restTemplate.getForObject("http://PROVIDER/user/findAll",List.class);
}
}
@Controller
@RequestMapping("/UserHystrixCommand")
public class UserHystrixCommandContorller {
@Autowired
RestTemplate restTemplate; //同步执行
@RequestMapping("/findAll")
@ResponseBody
public List<User> findAll() {
UserSelectAllCommand userSelectAllCommand = new UserSelectAllCommand(restTemplate);
return userSelectAllCommand.execute();
} //异步
@RequestMapping("/findAllAsyn")
@ResponseBody
public List<User> findAllAsyn() throws ExecutionException, InterruptedException {
UserSelectAllCommand userSelectAllCommand = new UserSelectAllCommand(restTemplate);
return userSelectAllCommand.queue().get();
} }

调用execute同步执行  queue 返回Future 异步执行

还可以通过执行

 Observable observable= userSelectAllCommand.toObservable();//订阅的时候发起请求
Observable observable=userSelectAllCommand.observe();//立即发起请求

通过订阅获得请求结果

    observable.subscribe(new Subscriber() {
@Override
public void onCompleted() { } @Override
public void onError(Throwable throwable) { } @Override
public void onNext(Object o) { }
});

HystrixObservableCommand

与HystrixCommand是可以发射多次结果

public class UserSelectObservableCommand extends HystrixObservableCommand<User> {
/**
* 设置线程组 和命令名用于仪表盘统计信息
* 设置线程组 可以使同一个组使用同一个线程池
* .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey") 可以跟细粒度的线程池划分
* @param restTemplate
*/
@Autowired
RestTemplate restTemplate;
private List<Integer> ids;
public UserSelectObservableCommand(List<Integer> ids, RestTemplate restTemplate) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("YouGroupName"))
.andCommandKey(HystrixCommandKey.Factory.asKey("YouCommandName"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(10000)//设置超时时间。我这边全局设置无效 就对应设置 ));
this.restTemplate=restTemplate;
this.ids=ids;
} @Override
protected Observable<User> construct() {
return Observable.create(new Observable.OnSubscribe<User>() {
@Override
public void call(Subscriber<? super User> subscriber) {
try{
if(!subscriber.isUnsubscribed()){
for (Integer id:
ids) { MultiValueMap<String, String> map= new LinkedMultiValueMap<String, String>();
map.add("id",id.toString());
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(map, headers);
//调用多次服务
User user=restTemplate.postForEntity("http://PROVIDER/user/findById", request,User.class).getBody();
subscriber.onNext(user);
}
}
subscriber.onCompleted();
}catch (Exception e){
e.printStackTrace();
subscriber.onError(e);
} }
}).subscribeOn(Schedulers.io());
} /**
* 服务降级执行逻辑
* 错误 超时 线程池拒绝 断路器熔断 执行
* @return
*/
@Override
protected Observable<User> resumeWithFallback() {
return Observable.create(new Observable.OnSubscribe<User>() {
@Override
public void call(Subscriber<? super User> observer) {
try {
if (!observer.isUnsubscribed()) {
User u = new User();
observer.onNext(u);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
 //异步
@RequestMapping("/process")
@ResponseBody
public void process() throws ExecutionException, InterruptedException {
UserSelectObservableCommand userSelectObservableCommand=new UserSelectObservableCommand(Arrays.asList(new Integer[]{1,2,3,4,6}),restTemplate);
Observable<User> observable= userSelectObservableCommand.observe();
observable.subscribe(new Subscriber<User>(){
List<User> users=new ArrayList<User>(); @Override
public void onCompleted() {
users.stream().forEach(c->{
System.out.println(c.getName());
}); } @Override
public void onError(Throwable throwable) { } @Override
public void onNext(User user) {
users.add(user);
}
}); }

注解方式

@Component
public class UserService {
@Autowired
RestTemplate restTemplate; @HystrixCommand(groupKey = "userService",commandKey = "findAll")
public List<User> findAll(){
return restTemplate.getForObject("http://PROVIDER/user/findAll",List.class);
} @HystrixCommand
public Future<List<User>> findAllAsyn(){
return new AsyncResult<List<User>>() {
@Override
public List<User> invoke() {
return findAll();
}
};
} /**
* ObservableExecutionMode.EAGER observe ()
* ObservableExecutionMode.LAZY toObservable ()
* ignoreExceptions 排除异常
* @param id
* @return
*/
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER,ignoreExceptions = BusinessException.class)
public Observable<User> findUserByIdA(Integer id){ return Observable.create(new Observable.OnSubscribe<User>(){ @Override
public void call(Subscriber<? super User> subscriber) {
//判断是否取消订阅
if(subscriber.isUnsubscribed()){
User user=restTemplate.getForObject("http://PROVIDER/user/findById",User.class);
subscriber.onNext(user);
subscriber.onCompleted();
}
}
});
} }
@Controller
@RequestMapping("/UserHystrixCommandAnotation")
public class UserHystrixCommandAnotationContorller {
@Autowired
UserService userService;
//同步执行
@RequestMapping("/findAll")
@ResponseBody
public List<User> findAll(){
return userService.findAll();
} //异步
@RequestMapping("/findAllAsyn")
@ResponseBody
public List<User> findAllAsyn() throws ExecutionException, InterruptedException {
return userService.findAllAsyn().get();
} }

请求缓存

继承方式

public class UserSelectAllCommand extends HystrixCommand<List<User>> {
private RestTemplate restTemplate;
public static final HystrixCommandKey hystrixCommandKey=HystrixCommandKey.Factory.asKey("findAll");; /**
* 设置线程组 和命令名用于仪表盘统计信息
* 设置线程组 可以使同一个组使用同一个线程池
* .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey") 可以跟细粒度的线程池划分
* @param restTemplate
*/
public UserSelectAllCommand(RestTemplate restTemplate){ super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UserService"))
.andCommandKey(hystrixCommandKey)
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(10000)//设置超时时间。我这边全局设置无效 就对应设置 ));
this.restTemplate=restTemplate;
}
@Override
protected List<User> run() throws Exception {
System.out.println("执行了....");
return restTemplate.getForObject("http://PROVIDER/user/findAll",List.class);
} /**
* 只需要重写这个方法 将开启缓存
* @return
*/
@Override
protected String getCacheKey() {
return "UserSelectAllCommand";//因为没有参数所以key是用类名
} /**
* 清除缓存
*/
public static void clearKey(){
HystrixRequestCache.getInstance(UserSelectAllCommand.hystrixCommandKey,HystrixConcurrencyStrategyDefault.getInstance() ).clear("UserSelectAllCommand");
} }
@Controller
@RequestMapping("/UserHystrixCommand")
public class UserHystrixCommandContorller {
@Autowired
RestTemplate restTemplate; //同步执行
@RequestMapping("/findAll")
@ResponseBody
public List<User> findAll() {
//开启缓存后 必须初始化一个context
HystrixRequestContext.initializeContext();
UserSelectAllCommand userSelectAllCommand = new UserSelectAllCommand(restTemplate);
userSelectAllCommand.execute();
userSelectAllCommand = new UserSelectAllCommand(restTemplate);
userSelectAllCommand.execute();
userSelectAllCommand = new UserSelectAllCommand(restTemplate);
//清空缓存
UserSelectAllCommand.clearKey();
return userSelectAllCommand.execute();
} }

注意每个HystrixCommand命令只能调用一次 多次调用会报错

注解方式

@CacheResult 标识结果被缓存 必须配合@HystrixCommand使用 可以使用cacheKeyMethod或者CacheKey设置缓存key
cacheKeyMethod 标识获得缓存key的方法名 参数形式必须与目标方法一致
@CacheRemove 标识将清除指定key的缓存 commandKey 必须指定 用于定位到清除指定命令的缓存cacheKeyMethod 指定清除缓存key或者使用CacheKey指定

cacheKeyMethod 获得清除缓存key的方法名 参数形式必须与目标方法一致

commandKey 指定需要清除指定命令的缓存

@Cachekey 标识指定目标为缓存的key优先级比cacheKeyMethod低  

指定缓存key的几种方式

@Component
public class UserService {
@Autowired
RestTemplate restTemplate; /**
* 如果没有指定cacheKey 则默认是参数
*
* @param id
* @return
*/
@CacheResult
@HystrixCommand(ignoreExceptions = BusinessException.class)
public User findUserById(Integer id) {
System.out.println("执行了。。。。");
User user = restTemplate.getForObject("http://PROVIDER/user/findById?id={id}", User.class, id);
return user;
} }
 @RequestMapping("/findById")
@ResponseBody
public User findById(Integer id){
//开启缓存后 必须初始化一个context 可以在过滤器统一实现
HystrixRequestContext.initializeContext();
userService.findUserById(id);
userService.findUserById(id);
userService.findUserById(id);
return userService.findUserById(id);
}

使用cacheMethod定义缓存key

    /**
* 如果没有指定cacheKey 则默认是参数
* @param id
* @return
*/
@CacheResult(cacheKeyMethod= "getFindUserByIdKey")
@HystrixCommand(ignoreExceptions = BusinessException.class)
public User findUserById(Integer id) {
System.out.println("执行了。。。。");
User user = restTemplate.getForObject("http://PROVIDER/user/findById?id={id}", User.class, id);
return user;
} /**
* 参数要与上面指定方法的一致
* @param id
* @return
*/
public String getFindUserByIdKey(Integer id){
return String.valueOf(id);
}

使用cacheKey定义缓存的key

   /**
* 如果没有指定cacheKey 则默认是参数
* @param id
* @return
*/
@CacheResult
@HystrixCommand(ignoreExceptions = BusinessException.class)
public User findUserById(@CacheKey Integer id) {
System.out.println("执行了。。。。");
User user = restTemplate.getForObject("http://PROVIDER/user/findById?id={id}", User.class, id);
return user;
}

如果是对象可以使用

 @CacheResult
@HystrixCommand(ignoreExceptions = BusinessException.class)
public User findUserById(@CacheKey("{属性名字}") User user) {
System.out.println("执行了。。。。");
User user = restTemplate.getForObject("http://PROVIDER/user/findById?id={id}", User.class, id);
return user;
}

清除缓存

   /**
* CacheRemove.commandKey必须指定 通过他能够找到缓存的位置然后通过key删除
*
* @param user
* @return
*/
@CacheRemove(commandKey = "findUserById")
@HystrixCommand(ignoreExceptions = BusinessException.class)
public boolean saveEdit(@CacheKey("id") User user) {
return true;
}

commandKey必须指定 用于定位某个命令的key  没显式指定命令 则为方法名    几种指定的key的方式和缓存一致

请求合并

继承的方式

1.准备一个批量查询的Service

@Service
public class UserService {
@Autowired
private RestTemplate restTemplate; public List<User> findAll(List<Long> ids){
List<User> users=restTemplate.getForObject("http://PROVIDER/user?ids={1}", List.class, StringUtils.join(ids,","));
return users;
}
}

2.准备一个批处理Command

public class UserBatchCommand extends HystrixCommand<List<User>> {
UserService userService;
List<Long> ids;
public UserBatchCommand(UserService userService,List<Long> userIds){
super(Setter.withGroupKey(asKey("userServiceCommand")).andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(10000)));//设置超时时间。我这边全局设置无效 就对应设置
this.userService=userService;
this.ids=userIds;
} @Override
protected List<User> run() throws Exception {
return userService.findAll(ids);
}
}

3.定义请求合并器

/**
* 第一个泛型参数 为批量处理的请求的返回类型
* 第二个泛型参数 为单个请求的返回类型
* 第三个泛型参数 为参数类型
*/
public class UserCollapseCommand extends HystrixCollapser<List<User>,User ,Long> {
UserService userService;
Long userId; /**
* 用于获取请求参数
* @return
*/
@Override
public Long getRequestArgument() {
return userId;
}
public UserCollapseCommand(UserService userService,Long userId){
super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapsercommand"))
.andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
this.userService=userService;
this.userId=userId;
} /**
* 合并请求产生批量处理的方法
* @param collection
* @return
*/
@Override
protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> collection) {
List<Long> userIds=new ArrayList<Long>(collection.size());
userIds.addAll(collection.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
return new UserBatchCommand(userService,userIds);
} /**
* 批量请求获得结果后 将结果拆分 返回给各个原子请求
* @param users
* @param collection
*/
@Override
protected void mapResponseToRequests(List<User> users, Collection<CollapsedRequest<User, Long>> collection) {
int count=0;
ObjectMapper objectMapper=new ObjectMapper();
for(CollapsedRequest<User,Long> collapsedRequest:collection) {
User user =objectMapper.convertValue(users.get(count++),User.class);
collapsedRequest.setResponse(user);
}
}
}

4.测试

@RunWith(SpringJUnit4ClassRunner .class)
@SpringBootTest(classes={SpringcloudConsumerApplication.class, hystrixCollapse.hystrixCollapserTest.class})
public class hystrixCollapserTest { @Autowired
UserService userService;
@Test public void simpleTest() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext(); List<Future<User>> user=new ArrayList<Future<User>>();
for(long id=0;id<10;id++){
UserCollapseCommand userCollapseCommand=new UserCollapseCommand(userService,id);
User user1=userCollapseCommand.queue().get();
System.out.print(user1.getId());
}
Thread.sleep(4000); } }

当在一定窗口期内 的请求 会合并成一个请求   通过HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100) 默认100毫秒

注解方式

@Service
public class UserSerivice {
@Autowired
private RestTemplate restTemplate; @HystrixCollapser(batchMethod = "findAll",collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds",value="100")})
public User find(Long id){
return restTemplate.getForObject("http://PROVIDER/user/{1}", User.class, id);
} /**
* 直接返回list会转为linkendHashMap 所以这里手动转了一次 正式环境 都用包装类封装一次 ProcessResult7yt g
* @param ids
* @return
*/
@HystrixCommand(commandProperties={
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000")})
public List<User> findAll(List<Long> ids){
List<User> users=restTemplate.getForObject("http://PROVIDER/user?ids={1}", List.class, StringUtils.join(ids,","));
ObjectMapper objectMapper=new ObjectMapper();
List<User> userList=new ArrayList<User>();
for (Object obj:users
) {
userList.add(objectMapper.convertValue(obj,User.class));
}
return userList;
}
}

合并器原理图

未使用合并器

使用合并器后

请求合并器虽然能节省线程池的开销  但是因为有窗口期  如果窗口10m  一个请求耗时需要5m  也会等到窗口期过才发起请求

窗口期内有3个以上请求 才推荐使用请求合并

最新文章

  1. The Solution of UESTC 2016 Summer Training #1 Div.2 Problem B
  2. 服务器.htaccess 详解以及 .htaccess 参数说明(转载)
  3. css之z-index
  4. 在Eclipse 中安装插件 Activiti
  5. Linux 命令 - curl: transfer a URL
  6. Javabean的理解
  7. tf–idf算法解释及其python代码实现(下)
  8. 常见浏览器的宽高代码写法!有原生JavaScript和jquery两种写法-------------------------------以及我的个人网站
  9. 01_学习java WEB涉及到的相关技术
  10. MongoDB 中的【加减乘除】运算
  11. enquire.js-响应css媒体查询的轻量级javascript库
  12. linux ubuntu生成pac文件,实现代理
  13. 深入理解python中的yield关键字
  14. Exception in thread &quot;main&quot; java.lang.ClassCastException: $Proxy13
  15. .Net频繁访问数据库的优化探究(一)
  16. css预处理scss环境配置
  17. Java+selenium+Fitnesse
  18. ORB-SLAM(十)LoopClosing Sim3求解
  19. PAT 乙级 1017
  20. 一个小笔记(2):Socket网络编程

热门文章

  1. 动手分析安卓仿QQ联系人列表TreeView控件
  2. lucene 范围过滤
  3. 【BZOJ 1230】 开关灯
  4. 画板(适用于手机、PC端)
  5. 构造函数中this,return的详解
  6. Spring学习笔记之aop动态代理(3)
  7. 安卓学习之学生签到APP(一)
  8. 【SQL】含有NULL值的排序
  9. ROS:Nvidia Jetson TK1平台安装使用ROS
  10. 实验0 安装GLUT包及工程的创建与运行