import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; @Slf4j
public class TestFindResult {
private static final Map<String, String> templates;
private static final int sleep = 1000; static {
templates = new LinkedHashMap<>();
templates.put("aDB", "a");
templates.put("bDB", "b");
templates.put("cDB", "c");
} public Mono<String> findResult(Function<String, Mono<String>> query) {
return Flux.fromIterable(templates.values())
.flatMap(query)
.next()
.onErrorResume(NoSuchElementException.class, e -> Mono.empty())
.onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new);
} public static void main(String[] args) {
TestFindResult test = new TestFindResult();
Function<String, Mono<String>> query = (value) -> {
try {
Thread.sleep(sleep); // mock DB query
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(
"Thread id:{}, Thread name:{}, value:{}, used ms:{}",
Thread.currentThread().getId(),
Thread.currentThread().getName(),
value,
sleep);
return Mono.just(value);
};
System.out.println(test.findResult(query).subscribe());
}
}
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; @Slf4j
public class TestFindMongo {
private static final Map<String, String> templates;
private static final int sleep = 1000; static {
templates = new LinkedHashMap<>();
templates.put("aDB", "a");
templates.put("bDB", "b");
templates.put("cDB", "c");
} public Mono<String> findMongo() {
StopWatch stopWatch = StopWatch.createStarted();
return Flux.fromIterable(templates.entrySet())
.filterWhen(
template -> {
String key = template.getKey();
String value = template.getValue();
try {
Thread.sleep(sleep); // mock DB query
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(
"Thread id:{}, Thread name:{}, query:{}, value:{} , used ms:{}",
Thread.currentThread().getId(),
Thread.currentThread().getName(),
key,
value,
sleep);
return Mono.just(value.equals("b"));
})
.next()
.doOnSuccess(templateEntry -> log.info("Match {} ", templateEntry.getKey()))
.map(Entry::getValue)
.onErrorResume(NoSuchElementException.class, e -> Mono.empty())
.onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new)
.doOnTerminate(() -> log.info("Database recon took {} ms", stopWatch.getTime()));
} public static void main(String[] args) {
TestFindMongo test = new TestFindMongo();
System.out.println(test.findMongo().subscribe());
}
}
import static org.springframework.http.HttpStatus.*;

import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux; public class MultipleUpstreamException extends ResponseStatusException { private static final String MULTILPLE_UPSTREAM_MATCH_ERR =
"Your query contains properties matching multiple upstreams. "
+ "Data for multiple upstreams can't be returned in one query. "
+ "Please either specify upstream by providing publisherSystem "
+ "(GSM,MUNI_ITICKET,MUNI_OASYS,TPSDERIV,EDLR) "
+ "and region or request deal properties matching only one upstream"; MultipleUpstreamException() {
super(BAD_REQUEST, MULTILPLE_UPSTREAM_MATCH_ERR);
} /**
* This constructor has syntax adapted to Mono API
*
* @param indexOutOfBoundsException emitted on {@link Flux#single()} when Flux has more than one
* elements
* @see Mono#onErrorMap(Class, java.util.function.Function))
*/
MultipleUpstreamException(IndexOutOfBoundsException indexOutOfBoundsException) {
this();
}
}

最新文章

  1. PHP文件上传至另一台服务器
  2. DBUTIL 调用存储过程例子
  3. 继续说一下2016里面的json功能(1)
  4. C程序中对时间的处理——time库函数详解
  5. oracle:jdbcTest
  6. bnuoj 4187 GCC (数论)
  7. Hibernate注解学习1
  8. win2008阿里一键环境包mysql老是1067报错
  9. Android 调出和隐藏软键盘
  10. [SimplePlayer] 2. 在屏幕上显示视频图像
  11. OnCheckedChangeListener和setChecked之间冲突问题解决
  12. [原创]Cadence Allegro小技巧之解决Out of date shapes问题
  13. 实现Python与STM32通信
  14. error: invalid use of void expression
  15. Python基础学习(六)
  16. A Complete ActiveX Web Control Tutorial
  17. vim复制,粘贴,删除,撤销,替换,光标移动等用法
  18. Hive配置文件中配置项的含义详解(收藏版)
  19. 对 CasperJS 进行远程调试
  20. 结合Python代码介绍音符起始点检测 (onset detection)

热门文章

  1. Spring boot application.properties和 application.yml 初学者的学习
  2. POJ 2287 田忌赛马
  3. 中兴获25个5G商用合同
  4. 获得spring
  5. 一百一十二、SAP的OO-ALV之六,复制一个工程的工具栏到另外一个工程的工具栏
  6. 145-PHP 使用&lt;&lt;&lt;和HTML混编(一)
  7. Docker 和虚拟机的区别
  8. springmvc无法访问JS,CSS等文件
  9. C++ 内存映射
  10. js interval ,timeout