We have seen how Subjects are useful for sharing an execution of an RxJS observable to multiple observers. However, this technique requires some laborious setting up. In this lesson we will learn about the multicast() operator which helps solve the same problem with less code, and with a neater API.

Let's go back and remember why did we need subjects in the first place? Originally, we had one typical observable, but we wanted two observers A and B, to see the same execution of that observable.

Does that mean that every time that we want to have multiple observers we need to set up a subject, and subscribe to the observables, subscribe to the subjects?

This system is not so ergonomic to set up. That's why there exists an operator or a method that simplifies all of this for us. That would be multicastmulticastis an operator on a normal observable. It takes here an argument, which is a subject.

// var source = Rx.Observable
// .interval(100)
// .take(5);
// var subject = new Rx.Subject();
// source.subscribe(subject);
var connectableObservable = Rx.Observable
.interval(100)
.take(5)
.multicast(new Rx.Subject());
connectableObservable.connect(); var observerA = {
next: function (x) { console.log('A next ' + x); },
error: function (err) { console.log('A error ' + err); },
complete: function () { console.log('A done'); },
}; connectableObservable.subscribe(observerA);
console.log('observerA subscribed'); var observerB = {
next: function (x) { console.log('B next ' + x); },
error: function (err) { console.log('B error ' + err); },
complete: function () { console.log('B done'); },
}; setTimeout(function () {
connectableObservable.subscribe(observerB);
console.log('observerB subscribed');
}, 300);

Now when we connect this observable, this connectableObservable, it will use a ReplaySubject to subscribe to this observable. That means that when the late observer arrives here, it will see the last values replayed to it. If we run this B arrives late, but B sees the latest values, zero and one, for instance.

// var source = Rx.Observable
// .interval(100)
// .take(5);
// var subject = new Rx.Subject();
// source.subscribe(subject);
var connectableObservable = Rx.Observable
.interval(100)
.take(5)
.multicast(new Rx.ReplaySubject(100));
connectableObservable.connect(); var observerA = {
next: function (x) { console.log('A next ' + x); },
error: function (err) { console.log('A error ' + err); },
complete: function () { console.log('A done'); },
}; connectableObservable.subscribe(observerA);
console.log('observerA subscribed'); var observerB = {
next: function (x) { console.log('B next ' + x); },
error: function (err) { console.log('B error ' + err); },
complete: function () { console.log('B done'); },
}; setTimeout(function () {
console.log('observerB subscribed');
connectableObservable.subscribe(observerB);
}, 300);
/*"observerA subscribed"
"A next 0"
"A next 1"
"A next 2"
"observerB subscribed"
"B next 0"
"B next 1"
"B next 2"
"A next 3"
"B next 3"
"A next 4"
"B next 4"
"A done"
"B done"*/

最新文章

  1. java servlet手机app访问接口(二)短信验证
  2. Tomcat7.0安装配置详细
  3. worksteal thread pool
  4. javascript Array 方法学习
  5. EasyUI 自定义DataGrid分页
  6. git 简单用
  7. javamail 学习及实例
  8. RPC介绍
  9. 【Linux】ssh免密登录
  10. 201521123024 《Java程序设计》第11周学习总结
  11. yum安装puppet
  12. Spring+Hibernate4 Junit 报错No Session found for current thread
  13. iframe 自适应
  14. [LeetCode] Insert into a Binary Search Tree 二叉搜索树中插入结点
  15. stm32高级定时器的应用——spwm
  16. java第11周
  17. springboot中使用拦截器、监听器、过滤器
  18. IE8的input兼容性问题
  19. 使用k8s && minio 进行 postgres 数据库自动备份
  20. uva1659(最大费用循环流)

热门文章

  1. Elasticsearch之shield(权限)插件安装之后的浏览详解
  2. echarts插件-从后台请求的数据在页面显示空白的问题
  3. [React] Use a Render Porp
  4. Oracle 11gR2光钎链路切换crs服务发生crash
  5. Android Cordova 插件开发之编写自己定义插件
  6. excel表如何实现多if选择结构多分支判断
  7. 2. APIS官网剖析(博主推荐)
  8. BIND View 加速南北方网络互访
  9. js上传文件(图片)的格式和大小限制
  10. 【Educational Codeforces Round 35 A】 Nearest Minimums