Flux转Mono next()
2024-09-01 12:16:19
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; @Slf4j
public class TestFindResult {
private static final Map<String, String> templates;
private static final int sleep = 1000; static {
templates = new LinkedHashMap<>();
templates.put("aDB", "a");
templates.put("bDB", "b");
templates.put("cDB", "c");
} public Mono<String> findResult(Function<String, Mono<String>> query) {
return Flux.fromIterable(templates.values())
.flatMap(query)
.next()
.onErrorResume(NoSuchElementException.class, e -> Mono.empty())
.onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new);
} public static void main(String[] args) {
TestFindResult test = new TestFindResult();
Function<String, Mono<String>> query = (value) -> {
try {
Thread.sleep(sleep); // mock DB query
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(
"Thread id:{}, Thread name:{}, value:{}, used ms:{}",
Thread.currentThread().getId(),
Thread.currentThread().getName(),
value,
sleep);
return Mono.just(value);
};
System.out.println(test.findResult(query).subscribe());
}
}
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; @Slf4j
public class TestFindMongo {
private static final Map<String, String> templates;
private static final int sleep = 1000; static {
templates = new LinkedHashMap<>();
templates.put("aDB", "a");
templates.put("bDB", "b");
templates.put("cDB", "c");
} public Mono<String> findMongo() {
StopWatch stopWatch = StopWatch.createStarted();
return Flux.fromIterable(templates.entrySet())
.filterWhen(
template -> {
String key = template.getKey();
String value = template.getValue();
try {
Thread.sleep(sleep); // mock DB query
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(
"Thread id:{}, Thread name:{}, query:{}, value:{} , used ms:{}",
Thread.currentThread().getId(),
Thread.currentThread().getName(),
key,
value,
sleep);
return Mono.just(value.equals("b"));
})
.next()
.doOnSuccess(templateEntry -> log.info("Match {} ", templateEntry.getKey()))
.map(Entry::getValue)
.onErrorResume(NoSuchElementException.class, e -> Mono.empty())
.onErrorMap(IndexOutOfBoundsException.class, MultipleUpstreamException::new)
.doOnTerminate(() -> log.info("Database recon took {} ms", stopWatch.getTime()));
} public static void main(String[] args) {
TestFindMongo test = new TestFindMongo();
System.out.println(test.findMongo().subscribe());
}
}
import static org.springframework.http.HttpStatus.*; import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Flux; public class MultipleUpstreamException extends ResponseStatusException { private static final String MULTILPLE_UPSTREAM_MATCH_ERR =
"Your query contains properties matching multiple upstreams. "
+ "Data for multiple upstreams can't be returned in one query. "
+ "Please either specify upstream by providing publisherSystem "
+ "(GSM,MUNI_ITICKET,MUNI_OASYS,TPSDERIV,EDLR) "
+ "and region or request deal properties matching only one upstream"; MultipleUpstreamException() {
super(BAD_REQUEST, MULTILPLE_UPSTREAM_MATCH_ERR);
} /**
* This constructor has syntax adapted to Mono API
*
* @param indexOutOfBoundsException emitted on {@link Flux#single()} when Flux has more than one
* elements
* @see Mono#onErrorMap(Class, java.util.function.Function))
*/
MultipleUpstreamException(IndexOutOfBoundsException indexOutOfBoundsException) {
this();
}
}
最新文章
- PHP文件上传至另一台服务器
- DBUTIL 调用存储过程例子
- 继续说一下2016里面的json功能(1)
- C程序中对时间的处理——time库函数详解
- oracle:jdbcTest
- bnuoj 4187 GCC (数论)
- Hibernate注解学习1
- win2008阿里一键环境包mysql老是1067报错
- Android 调出和隐藏软键盘
- [SimplePlayer] 2. 在屏幕上显示视频图像
- OnCheckedChangeListener和setChecked之间冲突问题解决
- [原创]Cadence Allegro小技巧之解决Out of date shapes问题
- 实现Python与STM32通信
- error: invalid use of void expression
- Python基础学习(六)
- A Complete ActiveX Web Control Tutorial
- vim复制,粘贴,删除,撤销,替换,光标移动等用法
- Hive配置文件中配置项的含义详解(收藏版)
- 对 CasperJS 进行远程调试
- 结合Python代码介绍音符起始点检测 (onset detection)