Observable.of(1, 2, 3)

.map {

$0 * $0

}

.subscribe(onNext: { print($0) })

.disposed(by: disposeBag)

Map.swift

extension ObservableType

public func map<R>(_ transform: @escaping (E) throws -> R)

-> Observable<R> {

return self.asObservable().composeMap(transform)

}

Observable.swift

public class Observable<Element> : ObservableType

public func asObservable() -> Observable<E> {

return self

}

Observable.swift

public class Observable<Element> : ObservableType

internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {

return _map(source: self, transform: transform)

}

//纯函数

internal func _map<Element, R>(source: Observable<Element>, transform: @escaping (Element) throws -> R) -> Observable<R> {

return Map(source: source, transform: transform)

}

final fileprivate class Map<SourceType, ResultType>: Producer<ResultType>

init(source: Observable<SourceType>, transform: @escaping Transform) {

_source = source

_transform = transform

#if TRACE_RESOURCES

let _ = AtomicIncrement(&_numberOfMapOperators)

#endif

}

----------------------

.subscribe(onNext: {

print($0)

}

extension ObservableType

public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)

class Producer<Element> : Observable<Element>

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element

return CurrentThreadScheduler.instance.schedule(()) { _ in

let disposer = SinkDisposer()

let sinkAndSubscription = self.run(observer, cancel: disposer)

disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

return disposer

}

final fileprivate class Map<SourceType, ResultType>: Producer<ResultType>

override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == ResultType {

let sink = MapSink(transform: _transform, observer: observer, cancel: cancel)

let subscription = _source.subscribe(sink)

return (sink: sink, subscription: subscription)

}

class Producer<Element> : Observable<Element>

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {

if !CurrentThreadScheduler.isScheduleRequired {

// The returned disposable needs to release all references once it was disposed.

let disposer = SinkDisposer()

let sinkAndSubscription = run(observer, cancel: disposer)

disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

return disposer

}

final fileprivate class MapSink<SourceType, O : ObserverType> : Sink<O>, ObserverType

init(transform: @escaping Transform, observer: O, cancel: Cancelable) {

_transform = transform

super.init(observer: observer, cancel: cancel)

}

-----------------------------

map {

$0 * $0

}

final fileprivate class ObservableSequenceSink<S: Sequence, O: ObserverType> : Sink<O> where S.Iterator.Element == O.E

func run() -> Disposable {

return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in

var mutableIterator = iterator

if let next = mutableIterator.0.next() {

self.forwardOn(.next(next))

recurse(mutableIterator)

}

else {

self.forwardOn(.completed)

self.dispose()

}

}

}

class Sink<O : ObserverType> : Disposable

final func forwardOn(_ event: Event<O.E>) {

#if DEBUG

_synchronizationTracker.register(synchronizationErrorMessage: .default)

defer { _synchronizationTracker.unregister() }

#endif

if _disposed {

return

}

_observer.on(event)

}

final fileprivate class MapSink<SourceType, O : ObserverType> : Sink<O>, ObserverType

func on(_ event: Event<SourceType>) {

switch event {

case .next(let element):

do {

let mappedElement = try _transform(element)

forwardOn(.next(mappedElement))

}

catch let e {

forwardOn(.error(e))

dispose()

}

case .error(let error):

forwardOn(.error(error))

dispose()

case .completed:

forwardOn(.completed)

dispose()

}

}

$0 * $0

---------------------------

print($0)

final fileprivate class MapSink<SourceType, O : ObserverType> : Sink<O>, ObserverType

func on(_ event: Event<SourceType>) {

switch event {

case .next(let element):

do {

let mappedElement = try _transform(element)

forwardOn(.next(mappedElement))

}

catch let e {

forwardOn(.error(e))

dispose()

}

case .error(let error):

forwardOn(.error(error))

dispose()

case .completed:

forwardOn(.completed)

dispose()

}

}

class Sink<O : ObserverType> : Disposable

final func forwardOn(_ event: Event<O.E>) {

#if DEBUG

_synchronizationTracker.register(synchronizationErrorMessage: .default)

defer { _synchronizationTracker.unregister() }

#endif

if _disposed {

return

}

_observer.on(event)

}

class ObserverBase<ElementType> : Disposable, ObserverType

func on(_ event: Event<E>) {

switch event {

case .next:

if _isStopped == 0 {

onCore(event)

}

case .error, .completed:

if AtomicCompareAndSwap(0, 1, &_isStopped) {

onCore(event)

}

}

}

final class AnonymousObserver<ElementType> : ObserverBase<ElementType>

override func onCore(_ event: Event<Element>) {

return _eventHandler(event)

}

extension ObservableType

public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)

let observer = AnonymousObserver<E> { event in

synchronizationTracker.register(synchronizationErrorMessage: .default)

defer { synchronizationTracker.unregister() }

switch event {

case .next(let value):

onNext?(value)

case .error(let error):

if let onError = onError {

onError(error)

}

else {

Hooks.defaultErrorHandler(callStack, error)

}

disposable.dispose()

case .completed:

onCompleted?()

disposable.dispose()

}

}

print($0)

最新文章

  1. java @ResponseBody返回值中去掉NULL字段
  2. QT 对话框二
  3. html中的src与href的区别
  4. eclipse常用的字体
  5. 二分法-C++
  6. vue+vuex初入门
  7. mongodb 压缩——3.0+支持zlib和snappy
  8. 怎样将Android手机弄死机?
  9. Netflix网关zuul(1.x和2.x)全解析
  10. Java中的线程协作之Condition
  11. CPU利用率和CPU负荷(CPU usage vs CPU load)
  12. java钉钉通讯录同步
  13. [Swift]JSON字符串与字典(Dictionary)、数组(Array)之间的相互转换
  14. javascript 自动填充功能
  15. form提交xml文件
  16. IBM推出新一代云计算技术来解决多云管理
  17. 扇入Fan-in和扇出Fan-out
  18. C++进阶--拥有资源句柄的类(浅拷贝,深拷贝,虚构造函数)
  19. 一步步实现windows版ijkplayer系列文章之一——Windows10平台编译ffmpeg 4.0.2,生成ffplay
  20. 【loj6145】「2017 山东三轮集训 Day7」Easy 动态点分治+线段树

热门文章

  1. html第四节课
  2. 解决HTML select控件 设置属性 disabled 后无法向后台传值的方法
  3. echarts常用实例
  4. 那么再会吧!OI!(HNOI2019退役记)
  5. Sperner定理及其证明
  6. 【hihoCoder挑战赛28 A】异或排序
  7. PHP5.5下安装配置EcShop
  8. hdu 1245 Saving James Bond 策画几何+最短路 最短路求步数最少的路径
  9. redis开发为什么选用skiplist?
  10. gcc 4.8安装