上一篇解读了ReactiveCocoa的三个重要的类的底层实现,本篇继续。

一、RACMulticastConnection

1.应用

RACMulticastConnection: 用于当一个信号被多次订阅时,为了保证创建信号时,避免多次调用创建信号的block造成副作用,可以使用该类处理,保证创建信号的block执行一次。

// 创建信号
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id subscriber) {
NSLog(@"发送请求");
[subscriber sendNext:@1];
return nil;
}]; // 创建连接
RACMulticastConnection *connect = [signal publish]; // 订阅连接的信号
[connect.signal subscribeNext:^(id x) {
NSLog(@"connect 第一次订阅信号: %@", x);
}]; [connect.signal subscribeNext:^(id x) {
NSLog(@"connect 第二次订阅信号: %@", x);
}]; // 连接
[connect connect];

2.源码实现

  • 底层原理
1.创建connect,connect.sourceSignal -> RACSignal(原始信号) connect.signal -> RACSubject
2.订阅connect.signal,会调用RACSubject的subscribeNext,创建订阅者,而且把订阅者保存起来,不会执行block。
3.[connect connect]内部会订阅RACSignal(原始信号),并且订阅者是RACSubject
3.1.订阅原始信号,就会调用原始信号中的didSubscribe
3.2 didSubscribe,拿到订阅者调用sendNext,其实是调用RACSubject的sendNext
4.RACSubject的sendNext,会遍历RACSubject所有订阅者发送信号。
4.1 因为刚刚第二步,都是在订阅RACSubject,因此会拿到第二步所有的订阅者,调用他们的nextBlock
  • 创建信号

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe

// RACDynamicSignal.m
+ (RACSignal *)createSignal:(RACDisposable * (^)(id subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
//将代码块保存到信号里面(但此时仅仅是保存,没有调用),所以信号还是冷信号
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
  • 创建连接

[signal publish]

// RACSignal+Operations.m
- (RACMulticastConnection *)publish {
// 创建订阅者
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
// 创建connection,参数是刚才创建的订阅者
RACMulticastConnection *connection = [self multicast:subject];
return connection;
} - (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
} // RACMulticastConnection.m
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil); self = [super init];
if (self == nil) return nil;
// 保存原始信号
_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
// 保存订阅者,即_signal是RACSubject对象
_signal = subject; return self;
}
  • 订阅信号

(RACDisposable *)subscribeNext:(void (^ )(id x))nextBlock;

// RACSignal.m
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL); RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
} // RACSubscriber.m
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init]; subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy]; return subscriber;
} // RACSubject.m
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil); RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable]; NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
} return [RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}]; if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}];
}
  • 连接信号

[connect connect];

// RACMulticastConnection.m
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected); if (shouldConnect) {
// 订阅原生信号
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
} return self.serialDisposable;
} // RACDynamicSignal.m
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil); RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable]; if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}]; [disposable addDisposable:schedulingDisposable];
} return disposable;
} // RACSubject.m
- (void)sendNext:(id)value {
// 遍历_subscribers数组,执行nextBlock
[self enumerateSubscribersUsingBlock:^(id subscriber) {
[subscriber sendNext:value];
}];
}

3.流程图

4.总结

RACMulticastConnection利用RACSubject实现了创建信号的block只执行一次的功能。对于需要对此订阅信号,但是不希望多次创建信号的应用场合,可以RACMulticastConnection解决。

二、RACCommand

1.应用

RACCommand类用来表示动作的执行, 是对动作触发后的连锁事件的封装。常用在封装网络请求,按钮点击等等场合。

RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^RACSignal *(id input) {
return [RACSignal createSignal:^RACDisposable *(id subscriber) { if (/* DISABLES CODE */ (YES)) {
// 正常发送数据,必须发送完成信号
[subscriber sendNext:@"Smile"];
[subscriber sendCompleted];
} else {
// 发送错误信号
[subscriber sendError:[NSError errorWithDomain:@"Network failed" code:0005 userInfo:nil]];
} // 信号被销毁前,做一些清理的工作;如果不需要,可以 return nil
return [RACDisposable disposableWithBlock:^{
NSLog(@"信号被销毁了");
}];
}];
}]; // 执行信号并订阅
[[command execute:nil] subscribeNext:^(id x) {
NSLog(@"receive data: %@", x);
}];

2.源码实现

RACCommand底层实现

1. 创建命令,保存signalBlock
2. 执行命令
* 2.1 调用signalBlock
* 2.2 创建connect,传入RACReplaySubject对象,然后连接信号
3. 订阅信号
* 3.1 创建订阅者,保存到RACReplaySubject对象的_subscribers数组中
* 3.2 遍历valuesReceived数组,调用订阅者发送数据
  • 创建command

- (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock

// RACCommand.m
- (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock {
return [self initWithEnabled:nil signalBlock:signalBlock];
} - (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock {
NSCParameterAssert(signalBlock != nil); self = [super init];
if (self == nil) return nil; _activeExecutionSignals = [[NSMutableArray alloc] init];
// 保存创建信号的block
_signalBlock = [signalBlock copy];
......
}
  • 执行command

- (RACSignal *)execute:(id)input

// RACCommand.m
- (RACSignal *)execute:(id)input {
// `immediateEnabled` is guaranteed to send a value upon subscription, so
// -first is acceptable here.
BOOL enabled = [[self.immediateEnabled first] boolValue];
if (!enabled) {
NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),
RACUnderlyingCommandErrorKey: self
}]; return [RACSignal error:error];
} RACSignal *signal = self.signalBlock(input);
......
// 创建连接,用RACReplaySubject作为订阅者
RACMulticastConnection *connection = [[signal
subscribeOn:RACScheduler.mainThreadScheduler]
multicast:[RACReplaySubject subject]]; ......
// 连接信号
[connection connect];
return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, [input rac_description]];
} // RACMulticastConnection.m
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &amp;_hasConnected); if (shouldConnect) {
// 执行创建信号的block
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
} return self.serialDisposable;
}
  • 订阅command

- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock

// RACSignal.m
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL); RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
} // RACReplaySubject.m
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable]; RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
// 调用订阅者,发送数据 "Smile"
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
} if (compoundDisposable.disposed) return; if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
// 调用父类方法,保存订阅者到_subscribers数组
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}]; [compoundDisposable addDisposable:schedulingDisposable]; return compoundDisposable;
}

3.流程图

4.总结

RACCommand用来封装事件时,还可以订阅信号(executionSignals)、订阅最新信号(switchToLatest)、跳过几次信号(skip)或信号是否正在执行(executing),在执行信号时,还可以监听错误信号和完成信号,请参考demo例子。


ReactiveCocoa框架的源码分析暂告一段落,如有分析不足之处,欢迎互相交流。

Demo地址:

RACDemo

最新文章

  1. BigZhuGod的粉丝
  2. 让hammer完美支持Pixi.js - 2D webG库
  3. charCode与keyCode的区别
  4. 关于动态URL地址设置静态形式
  5. atitit.web ui 结构建模工具总结
  6. Unity数据存储路径总结
  7. DButils实现查询和新增
  8. 菜鸟学习Struts——国际化
  9. 改变VC生成exe图标
  10. 如何在jenkins上新建一个项目及其简单配置
  11. CodeForces - 867E Buy Low Sell High (贪心 +小顶堆)
  12. C++ 模态与非模态对话框
  13. ssh远程登录不上的处理
  14. datatables插件提示Cannot reinitialise DataTable的解决办法
  15. Django-website 程序案例系列-3 URL详解
  16. maven(一)
  17. 深入理解 Java中的 流 (Stream)
  18. python接口自动化测试框架实现之操作oracle数据库
  19. Spring Cloud实战
  20. xdebug 一直报错 upstream timed out (110: Connection timed out) while reading response header from upstream

热门文章

  1. Redis配置文件redis.conf参数配置详解
  2. JSP include HTML出现乱码 问题解决
  3. (转)基于 Token 的身份验证
  4. html之结构化标记
  5. Java版简易画图板的实现
  6. 浅谈viewport
  7. 标准IO:常用函数集合
  8. Java8学习(3)- Lambda 表达式
  9. java.util.Arrays类
  10. Python基础之常用模块(三)