implementation of RAC(OC) on JavaScript and replacement of RxJS
包含一定数量的可订阅数据(值,完成,错误)的信号对象,订阅者再接受到error或者complete之后就意味着订阅已经结束,即使信号还有没发送的值.
//创建一个signal,传入一个基于subscriber(订阅者对象)参数的block,block的返回值是一个Disposbale(FP编程的side effects)
let signal = new Signal(subscriber => {
subscriber.sendNext(2)
subscriber.sendNext(3)
subscriber.sendComplete() //subscriber.sendError(new Error())
subscriber.sendNext(4) // 4 不会被订阅者接收
return null
})
维护当前订阅者的接收操作,以及该订阅者订阅信号产生的Disposable(额外消耗)的对象(从设计的角度,Subscriber更像是一个定义一些操作的接口),一般不自己创建,对使用方透明。
//具体的订阅
let next = v => console.log(v)
let complete = () => console.log('completed')
let error = err => console.log('error')
let disposable = signal.subscribe(next, complete, error)
//如果想取消这次订阅可以调用 disposable.dispose
//订阅的方法有4个,subscribe是最全的
-subscribe(next, complete, error)
-subscribeNext(next)
-subscribeComplete(complete)
-subscribeError(error)
这个是RAC里面的概念,RxJs没有,作用同于类似于RxJs的subscription对象的unsubscribe。Disposable是管理当前订阅操作以及Signal各种链式,组合过程中出现的side effects。 Disposable中有Disposable和CompoundDisposable两种,采用的是组合嵌套的设计模式。每一个对应的disposable只负责当前订阅者。几乎对使用者透明
概括的描述: 1)继承于Signal,同时又实现了Subscriber接口的对象 2)OOP进入到FRP的桥梁 3)特别灵活
let subject = new Subject()
subject.subscribe(next, complete, error)
subject.sendNext(2)
subject.sendComplete()
subject.sendError()
Subject有Subject,CurrentSubject,ReplaySubject。其中Subject就是普通的;CurrentSubject在被订阅的时候如果当前有值了就会发送最新值;ReplaySubject在被订阅会完美的复现所有的值,包括complete和error信息
基于Signal和Subject的一种1对N的实现。之前说过Subject既是一个Signal,又是一个Subscriber。这样就让Subject订阅Signal对signal的值做透传,然后将subject暴露给使用方订阅。这么做的好处就是为了让signal创建时传入的block只执行一次,减少额外的side effects。
基于Subject和Subject实现N对N的实现。RxJs好像没有
借鉴RAC,RxJs好像没有。但是和RAC有区别
//第一个参数block,是根据输入生成一个signal
/*
* 第二个参数,传入一个<=0的值,代表当前的Command不支持concurrent,也不支持等待,也就是说如果当前command在被执行,在执行完成前其他输入会被忽略
* 如果 == 1,串型,支持等待。后续的输入会在前一个执行完成后执行
* 如果 > 1, 并行,支持等待,当当前执行的signal大于该数量后,会进入缓冲区等待,每当有空闲就会从缓冲区取出进行执行
*/
let command = new Command(v => Signal.of(3).delay(1000), 0)
command.execute(2).subscribe(next, complete)
...
有些很复杂的操作在这边不做解释,想了解的可以看代码,主要是为了其他方法的封装操作
快速创建一个只有一个数据Value的signal
快速创建一个空的signal
快速创建错误的signal
快速创建一个不会自己结束且空的signal
快速创建一个包含数组元素的signal
快速创建一个signal,signal的数据依次从start开始,每次增加distance,叠加count次
快速创建一个signal,promise then作为signal的数据+complete,catch作为signal的error
快速创建一个signal,从start开始,每time时间发送start+1
在订阅开始前注入操作
在订阅过程中对数据注入操作
在订阅结束后注入操作
Signal.of(4)
.do((v) => console.log('do next + ' + v), () => console.log('do complete'))
.initially(() => console.log('before subscribe'))
.finally(() => console.log('after complete'))
.subscribeNext()
/**
*before subscribe
*do next + 4
*do complete
*after error or complete
*/
当前的signal在结束之后,紧接着订阅otherSignal
concat的类方法
Signal.of(value)放在当前流的前面
Signal.of(value)放在当前流的后面
当前的signal在结束之后(忽略当前signal的next发送的值),紧接着订阅otherSignal
订阅当前Signal,直到otherSignal有值或者是complete消息过来
订阅当前的signal,直到otherSignal有任意的数据(包括error/complete)过来,并且紧接着订阅otherSignal
订阅otherSignal,直到当前signal有任意的数据过来,并且紧接着订阅当前的signal
如果当前signal发生错误,就根据错误生成新的signal,并且紧接着订阅这signal
如果当前signal发生错误,并且紧接着订阅新的otherSignal
//依次打印-2,-1,-0,1,2,3,4;如果最后的4000ms变为比3000ms小的数值打印就会变成-2,-1,0,4
Signal.of(2)
// .do((v) => console.log('do next + ' + v), () => console.log('do complete'))
// .initially(() => console.log('before subscribe'))
// .finally(() => console.log('after complete'))
.startWith(1)
.endWith(3)
.delay(3000)
.replace(Signal.fromArray([-2,-1,0]))
.replaced(Signal.of(4).delay(4000))
.subscribeNext(v => console.log('real v ' + v))
将数据流里面所有的数据想象成一个数组处理
将signal里面的数据都做了一个映射
将signal里面的数据都映射成一个值
将signal中不符合条件的都过滤掉
过滤所有的值,只关心完成和错误
判断signal的所有数据是不是满足条件
判断signal有没有数据满足条件
找出signal中第一个满足条件的值
找出signal中第一个满足条件的值的index
找出signal里面的第index的值,如果不存在就发送defaultValue
usefirst ? 使用signal的第一个值作为sourceValue :使用seed作为sourceValue signal每次发出的数据nowValue,都会经过第一个函数的处理变成新的数据发送出去,同时将最新的值作为下一次的sourceValue
取scan中的最后一个值
判断当前signal是不是没有数据
如果signal为空,是发送defaultValue,还是发送一个空的错误
取前count的数据
取最后几个数据
一直取到signal的数据中第一个不满足条件为止
取第一个
取最后一个
从第count开始取
最后几个不取
从signal的数据中第一个满足条件开始取
获取当前signal包含的数据总长度
将源signal的数据每几个一组发送出去
如果当前的值经过转换为key,发现key已经存在,该值就会被过滤掉
如果当前的值经过转换为key,和上一次值的key一样,该值就会被过滤掉
如果当前的值和上一次一样,该值就会被过滤掉
把signal的数据,完成,错误都包装成Notification对象发送出去
将包装成的Notification恢复成值,完成,错误发送出去
如果成功了,重复订阅当前的signal几次
如果错误,重复订阅当前的signal几次
将当前流和传入的流数组合并成一个新的流,所有流的数据都会被当成新流的数据。只有当所有流都完成了,新流才会完成;只要有一个流发生错误,心新流就会发生错误
merge的类方法
将当前流和传入的流数组进行打包形成一个新的流,每当打包内的所有流都有了新的数据,新的流就会将这些流的数据打包成一个数组发送,如果有一个流发生(错误)了,那么新的流也就完成了。任意一个流既没有新的数据且完成了,新的流就会发送完成
zip的类方法
将当前流和传入的流数组进行联系起来,每当有一个流发送了一个新的数据且所有的流都有数据的时,新的流就会将这边流的最新的数组打包成一个数组发送。所有流完成,新流才会完成;任意一个流错误,新流就会错误
combine的类方法
如果当前流发送的值都是signal类型,每发送新的值就会订阅该值,同时取消前一次的订阅
根据当前的值从signalMap中找到对应的signal,并订阅,同时取消上一次的订阅
根据当前的值是否,订阅对应的signal
将当前流的所有数据和完成错误信息都延迟发送
防抖,一定时间内只接收第一次的数据
防抖,在接收到最新的数据后且一定时间内没有数据过来后,发送这个最新值(先等待再发送)
防抖,在一定时间内没有数据过来后,发送下一次接收到的值(先发送再等待)
每次有数据后,都会在固定的时间读取缓冲池中的数据,缓冲池的大小由count决定,0代表无限大
在一定的时间内如果没有完成,就会发送超时错误
每当otherSignal有新的值/完成,就会发送当前signal的缓冲池
每当otherSignal有新的值/完成,就会发送当前signal的最新值
在RacJs中,弱化了RxJs里面的schedule概念,只提供了异步订阅/发送的两个操作
sync/async/asap,决定当前的订阅操作在哪个线程执行
sync/async/asap,决定当前的发送数据操作在哪个线程执行