lagom中的stream 流数据处理是基于akka stream的,异步的处理流数据的。如下看代码:

流式service好处是:

A: 并行:  hellos.mapAsync(8, name -> helloService.hello(name).invoke())),  八个线程并行处理;

B: 异步: 返回completedFuture, 使用基于Web Socket的方式。

C: 全双工:

package com.example.hello.stream.impl;

import akka.NotUsed;
import akka.stream.javadsl.Source;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.example.hello.hello.api.HelloService;
import com.example.hello.stream.api.StreamService; import javax.inject.Inject; import static java.util.concurrent.CompletableFuture.completedFuture; /**
* Implementation of the HelloString.
*/
public class StreamServiceImpl implements StreamService { private final HelloService helloService;
private final StreamRepository repository; @Inject
public StreamServiceImpl(HelloService helloService, StreamRepository repository) {
this.helloService = helloService;
this.repository = repository;
} @Override
public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> directStream() {
return hellos -> completedFuture(
hellos.mapAsync(8, name -> helloService.hello(name).invoke()));
} @Override
public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> autonomousStream() {
return hellos -> completedFuture(
hellos.mapAsync(8, name -> repository.getMessage(name).thenApply( message ->
String.format("%s, %s!", message.orElse("Hello"), name)
))
);
}
}

调用streamed service 接口的方式:

  Source<String, ?> response = await(streamService.directStream().invoke(
Source.from(Arrays.asList("a", "b", "c"))
.concat(Source.maybe())));
List<String> messages = await(response.take(3).runWith(Sink.seq(), mat));
assertEquals(Arrays.asList("Hello, a!", "Hello, b!", "Hello, c!"), messages);
private <T> T await(CompletionStage<T> future) throws Exception {   //等待10秒 拿结果
return future.toCompletableFuture().get(10, TimeUnit.SECONDS);
}

最新文章

  1. C++联合体(union)
  2. HV和VM 内存性能测试对比结果
  3. linux日常小坑
  4. SQL Server 2008 R2导出数据脚本的方法
  5. Xamarin.Android开发实践(十)
  6. 利用procdump+Mimikatz 绕过杀软获取Windows明文密码(转)
  7. 用jquery-easyui中的combotree实现树形结构的选择
  8. 当前位置: 银光首页 &gt; WPF &gt; WPF学习教程 &gt; WPF: ShowDialog() 切换到其他应用窗口后,再切换回来无法让子窗口总在最上方
  9. Cookies和Session理论总结
  10. 【HDOJ】3732 Ahui Writes Word
  11. jQuery autocomplete 使用
  12. MySQL中CASE的使用
  13. PeekMessage与GetMessage的对比
  14. OpenCV探索之路(十八):使用imwrite调整保存的图片质量
  15. centos 使用 beyond compare 对比工具
  16. layui样式修改记录
  17. JS Bootstrap-DateRangePicker 如何设置默认值为空
  18. 【JS】CharToAsciiToBinaryToAsciiToChar
  19. c语言#define用法
  20. RDP服务开启

热门文章

  1. TP 接收post请求使用框架自带函数I()防止注入
  2. 2015最流行的Android组件、工具、框架大全(转)
  3. 短信计时器Utils
  4. rtmp直播拉流客户端EasyRTMPClient设计过程中时间戳问题汇总
  5. 高性能流媒体服务器EasyDSS前端重构(二) webpack + vue + AdminLTE 多页面提取共用文件, 优化编译时间
  6. C#根据Type获取默认值
  7. [转]C#中的结构体与类的区别
  8. ubuntu中设置wireshark抓包
  9. 关于SAP S4 HANA 的13个问题
  10. 《高性能Javascript》 Summary(二)