Observable
题目描述
实现一个基本的 Observable 类,支持观察者模式的核心功能:
- 支持订阅者订阅数据流
- 支持
next、error、complete三种事件 - 支持取消订阅功能
- 支持多个订阅者
error和complete只能触发一次,之后的调用被忽略
Observable 决定了 values 如何传递给 Observer,Observer 本质上就是一些 callback 的集合。
js
const observer = {
next: (value) => {
console.log('we got a value', value)
},
error: (error) => {
console.log('we got an error', error)
},
complete: () => {
console.log('ok, no more values')
},
}本题考查 观察者模式、发布订阅模式 和 响应式编程 的核心概念。
核心知识点
本节不是介绍“理想的 RxJS 风格 Observable”,而是精确描述当前这份实现的特性与缺失点,方便定位可以改进的地方。
观察者模式最小实现特征
当前类只做一件事:当你 subscribe 时运行构造时提供的 setup 函数,并把一个“包装的 subscriberWrapper”交给它,后者提供 next / error / complete / unsubscribe 四个方法与一个状态标记 unsubscribed。
订阅对象结构
subscriberWrapper 字段:
| 字段 | 作用 | 备注 |
|---|---|---|
unsubscribed | 是否已终止 | error/complete/unsubscribe 设置为 true |
next(value) | 向下游发送值 | 若最初订阅参数是函数,直接调用函数;否则调用 subscriber.next |
error(err) | 触发错误终止 | 先检查状态→调用 unsubscribe()→再调用 subscriber.error(若存在) |
complete() | 触发完成终止 | 同上逻辑,调用 subscriber.complete(若存在) |
unsubscribe() | 手动终止 | 仅仅把 unsubscribed 设置为 true(无资源清理) |
订阅生命周期
subscribe()→ 创建包装对象 → 调用this._setup(subscriberWrapper)→ 返回包装对象。next()在未终止时透传;终止后直接忽略。error()/complete():若未终止 → 标记终止 → 调用相应回调;之后所有事件丢弃。unsubscribe():仅标记;不调用error/complete,也不做清理逻辑。
并发与时序
事件派发完全同步(没有调度微任务/宏任务);如果 setup 中同步调用多次 next(),订阅方会同步收到多个值;这与常见库(常常也是同步,只是有时支持 scheduler)一致,但无调度扩展点。
代码实现(当前版本)
javascript
class Observable {
constructor(setup) {
this._setup = setup
}
subscribe(subscriber) {
const subscriberWrapper = {
unsubscribed: false,
next(value) {
if (this.unsubscribed)
return
if (typeof subscriber === 'function')
return subscriber(value)
return subscriber.next ? subscriber.next(value) : null
},
error(value) {
if (this.unsubscribed)
return
this.unsubscribe()
return subscriber.error ? subscriber.error(value) : null
},
complete() {
if (this.unsubscribed)
return
this.unsubscribe()
return subscriber.complete ? subscriber.complete() : null
},
unsubscribe() {
this.unsubscribed = true
},
}
if (!subscriber)
return
this._setup(subscriberWrapper)
return subscriberWrapper
}
}
export default Observable
/**
* 真实行为说明:
* - 不捕获 setup 异常
* - 不消费 setup 返回的清理函数
* - unsubscribe 仅设置 flag
*/关键技术点
订阅者参数多态
javascript
// 支持三种订阅方式
// 1. 函数形式
observable.subscribe(value => console.log(value))
// 2. 对象形式 - 完整
observable.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('done'),
})
// 3. 对象形式 - 部分
observable.subscribe({
next: value => console.log(value),
})状态管理(终止判定)
javascript
// 防止重复触发的设计
if (this.unsubscribed) return
// error 和 complete 的互斥性
error(value) {
if (this.unsubscribed) return
this.unsubscribe() // 立即标记为已取消
return subscriber.error ? subscriber.error(value) : null
}内存与资源风险
当前 unsubscribe() 仅做:this.unsubscribed = true,无法:
- 清除
setInterval / setTimeout - 移除 DOM 事件监听
- 关闭 WebSocket / Abort fetch
所以如果 setup 中创建了持续资源,要么:
- 在内部监听某个条件并调用
complete()时手工清理; - 改造 Observable 支持“setup 返回 teardown”。
使用示例
javascript
// 基本用法
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
}, 1000);
});
// 订阅方式1:函数
const sub1 = observable.subscribe((value) => {
console.log("Got value:", value);
});
// 订阅方式2:对象
const sub2 = observable.subscribe({
next: (value) => console.log("Next:", value),
error: (err) => console.log("Error:", err),
complete: () => console.log("Complete!"),
});
// 取消订阅(注意:multiSub 的计时器仍继续运行直到内部 complete)
setTimeout(() => {
sub1.unsubscribe()
sub2.unsubscribe()
}, 500)
// 错误处理示例
const errorObservable = new Observable((subscriber) => {
subscriber.next("start");
subscriber.error("something wrong");
subscriber.next("this will not be called"); // 被忽略
});
// 多订阅者(每个订阅独立执行 setup,不共享)
const multiSub = new Observable((subscriber) => {
let count = 0
const timer = setInterval(() => {
subscriber.next(++count)
if (count === 5) {
subscriber.complete()
clearInterval(timer) // 仅在正常完成时清理
}
}, 100)
// 返回的清理函数不会被使用(当前实现忽略)
return () => clearInterval(timer)
})
const subA = multiSub.subscribe(v => console.log('SubA:', v))
const subB = multiSub.subscribe(v => console.log('SubB:', v))扩展思考与改进
增强版 Observable(支持 teardown + 操作符)
javascript
class BetterObservable {
constructor(setup) { this._setup = setup }
subscribe(observer) {
if (!observer) return { unsubscribe() {} }
const wrapper = (typeof observer === 'function')
? { next: observer }
: observer
let closed = false
let teardown
const safe = {
next: v => { if (!closed && wrapper.next) wrapper.next(v) },
error: e => { if (!closed) { closed = true; wrapper.error && wrapper.error(e); teardown && teardown() } },
complete: () => { if (!closed) { closed = true; wrapper.complete && wrapper.complete(); teardown && teardown() } },
}
try { teardown = this._setup(safe) }
catch (e) { safe.error(e) }
return {
unsubscribe() { if (!closed) { closed = true; teardown && teardown() } },
get closed() { return closed },
}
}
map(fn) {
return new BetterObservable(sub => this.subscribe({
next: v => { try { sub.next(fn(v)) } catch (e) { sub.error(e) } },
error: e => sub.error(e),
complete: () => sub.complete(),
}))
}
filter(pred) {
return new BetterObservable(sub => this.subscribe({
next: v => { try { if (pred(v)) sub.next(v) } catch (e) { sub.error(e) } },
error: e => sub.error(e),
complete: () => sub.complete(),
}))
}
pipe(...ops) { return ops.reduce((acc, op) => op(acc), this) }
}增加错误捕获
在 next/error/complete 回调调用用户代码前包裹 try/catch 并转化为 error 事件。
添加 Subject
Subject 可作为桥接:一个源 → 多个订阅共享同一生产者,避免多次执行 setup。
Scheduler / 调度
引入一个调度函数参数(如 schedule(cb)),实现:同步、宏任务、微任务、animation frame、priority 等策略统一封装。
Backpressure / 节流
对于高频源(如事件、WebSocket)可在 next 前做缓冲/节流/丢弃策略。
高级特性实现(现有代码需先升级才能正确运作)
javascript
// Subject - 既是 Observable 又是 Observer
class Subject extends Observable {
constructor() {
super()
this.observers = []
this.closed = false
}
subscribe(observer) {
if (this.closed)
return
this.observers.push(observer)
return {
unsubscribe: () => {
const index = this.observers.indexOf(observer)
if (index > -1)
this.observers.splice(index, 1)
},
}
}
next(value) {
if (this.closed)
return
this.observers.forEach((observer) => {
if (typeof observer === 'function') {
observer(value)
}
else if (observer.next) {
observer.next(value)
}
})
}
error(err) {
if (this.closed)
return
this.closed = true
this.observers.forEach((observer) => {
if (observer.error)
observer.error(err)
})
this.observers = []
}
complete() {
if (this.closed)
return
this.closed = true
this.observers.forEach((observer) => {
if (observer.complete)
observer.complete()
})
this.observers = []
}
}实际应用场景(需 teardown 支持)
javascript
// 用户输入处理
function createInputObservable(element) {
return new Observable((subscriber) => {
const handler = event => subscriber.next(event.target.value)
element.addEventListener('input', handler)
return () => {
element.removeEventListener('input', handler)
}
})
}
// HTTP 请求封装
function createHttpObservable(url) {
return new Observable((subscriber) => {
fetch(url)
.then(response => response.json())
.then((data) => {
subscriber.next(data)
subscriber.complete()
})
.catch(err => subscriber.error(err))
})
}
// WebSocket 连接
function createWebSocketObservable(url) {
return new Observable((subscriber) => {
const ws = new WebSocket(url)
ws.onmessage = event => subscriber.next(JSON.parse(event.data))
ws.onerror = error => subscriber.error(error)
ws.onclose = () => subscriber.complete()
return () => ws.close()
})
}