Skip to content

Observable

题目描述

实现一个基本的 Observable 类,支持观察者模式的核心功能:

  • 支持订阅者订阅数据流
  • 支持 nexterrorcomplete 三种事件
  • 支持取消订阅功能
  • 支持多个订阅者
  • errorcomplete 只能触发一次,之后的调用被忽略

Observable 决定了 values 如何传递给 Observer,Observer 本质上就是一些 callback 的集合。

js
const observer = {
  next: (value) => {
    console.log('we got a value', value)
  },
  error: (error) => {
    console.log('we got an error', error)
  },
  complete: () => {
    console.log('ok, no more values')
  },
}

本题考查 观察者模式发布订阅模式响应式编程 的核心概念。

核心知识点

本节不是介绍“理想的 RxJS 风格 Observable”,而是精确描述当前这份实现的特性与缺失点,方便定位可以改进的地方。

观察者模式最小实现特征

当前类只做一件事:当你 subscribe 时运行构造时提供的 setup 函数,并把一个“包装的 subscriberWrapper”交给它,后者提供 next / error / complete / unsubscribe 四个方法与一个状态标记 unsubscribed

订阅对象结构

subscriberWrapper 字段:

字段作用备注
unsubscribed是否已终止error/complete/unsubscribe 设置为 true
next(value)向下游发送值若最初订阅参数是函数,直接调用函数;否则调用 subscriber.next
error(err)触发错误终止先检查状态→调用 unsubscribe()→再调用 subscriber.error(若存在)
complete()触发完成终止同上逻辑,调用 subscriber.complete(若存在)
unsubscribe()手动终止仅仅把 unsubscribed 设置为 true(无资源清理)

订阅生命周期

  1. subscribe() → 创建包装对象 → 调用 this._setup(subscriberWrapper) → 返回包装对象。
  2. next() 在未终止时透传;终止后直接忽略。
  3. error() / complete():若未终止 → 标记终止 → 调用相应回调;之后所有事件丢弃。
  4. unsubscribe():仅标记;不调用 error/complete,也不做清理逻辑。

并发与时序

事件派发完全同步(没有调度微任务/宏任务);如果 setup 中同步调用多次 next(),订阅方会同步收到多个值;这与常见库(常常也是同步,只是有时支持 scheduler)一致,但无调度扩展点。

代码实现(当前版本)

javascript
class Observable {
  constructor(setup) {
    this._setup = setup
  }

  subscribe(subscriber) {
    const subscriberWrapper = {
      unsubscribed: false,
      next(value) {
        if (this.unsubscribed)
          return
        if (typeof subscriber === 'function')
          return subscriber(value)
        return subscriber.next ? subscriber.next(value) : null
      },
      error(value) {
        if (this.unsubscribed)
          return
        this.unsubscribe()
        return subscriber.error ? subscriber.error(value) : null
      },
      complete() {
        if (this.unsubscribed)
          return
        this.unsubscribe()
        return subscriber.complete ? subscriber.complete() : null
      },
      unsubscribe() {
        this.unsubscribed = true
      },
    }
    if (!subscriber)
      return

    this._setup(subscriberWrapper)
    return subscriberWrapper
  }
}

export default Observable

/**
 * 真实行为说明:
 * - 不捕获 setup 异常
 * - 不消费 setup 返回的清理函数
 * - unsubscribe 仅设置 flag
 */

关键技术点

订阅者参数多态

javascript
// 支持三种订阅方式
// 1. 函数形式
observable.subscribe(value => console.log(value))

// 2. 对象形式 - 完整
observable.subscribe({
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('done'),
})

// 3. 对象形式 - 部分
observable.subscribe({
  next: value => console.log(value),
})

状态管理(终止判定)

javascript
// 防止重复触发的设计
if (this.unsubscribed) return

// error 和 complete 的互斥性
error(value) {
  if (this.unsubscribed) return
  this.unsubscribe() // 立即标记为已取消
  return subscriber.error ? subscriber.error(value) : null
}

内存与资源风险

当前 unsubscribe() 仅做:this.unsubscribed = true,无法:

  • 清除 setInterval / setTimeout
  • 移除 DOM 事件监听
  • 关闭 WebSocket / Abort fetch

所以如果 setup 中创建了持续资源,要么:

  1. 在内部监听某个条件并调用 complete() 时手工清理;
  2. 改造 Observable 支持“setup 返回 teardown”。

使用示例

javascript
// 基本用法
const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
});

// 订阅方式1:函数
const sub1 = observable.subscribe((value) => {
  console.log("Got value:", value);
});

// 订阅方式2:对象
const sub2 = observable.subscribe({
  next: (value) => console.log("Next:", value),
  error: (err) => console.log("Error:", err),
  complete: () => console.log("Complete!"),
});

// 取消订阅(注意:multiSub 的计时器仍继续运行直到内部 complete)
setTimeout(() => {
  sub1.unsubscribe()
  sub2.unsubscribe()
}, 500)

// 错误处理示例
const errorObservable = new Observable((subscriber) => {
  subscriber.next("start");
  subscriber.error("something wrong");
  subscriber.next("this will not be called"); // 被忽略
});

// 多订阅者(每个订阅独立执行 setup,不共享)
const multiSub = new Observable((subscriber) => {
  let count = 0
  const timer = setInterval(() => {
    subscriber.next(++count)
    if (count === 5) {
      subscriber.complete()
      clearInterval(timer) // 仅在正常完成时清理
    }
  }, 100)
  // 返回的清理函数不会被使用(当前实现忽略)
  return () => clearInterval(timer)
})

const subA = multiSub.subscribe(v => console.log('SubA:', v))
const subB = multiSub.subscribe(v => console.log('SubB:', v))

扩展思考与改进

增强版 Observable(支持 teardown + 操作符)

javascript
class BetterObservable {
  constructor(setup) { this._setup = setup }
  subscribe(observer) {
    if (!observer) return { unsubscribe() {} }
    const wrapper = (typeof observer === 'function')
      ? { next: observer }
      : observer
    let closed = false
    let teardown
    const safe = {
      next: v => { if (!closed && wrapper.next) wrapper.next(v) },
      error: e => { if (!closed) { closed = true; wrapper.error && wrapper.error(e); teardown && teardown() } },
      complete: () => { if (!closed) { closed = true; wrapper.complete && wrapper.complete(); teardown && teardown() } },
    }
    try { teardown = this._setup(safe) }
    catch (e) { safe.error(e) }
    return {
      unsubscribe() { if (!closed) { closed = true; teardown && teardown() } },
      get closed() { return closed },
    }
  }
  map(fn) {
    return new BetterObservable(sub => this.subscribe({
      next: v => { try { sub.next(fn(v)) } catch (e) { sub.error(e) } },
      error: e => sub.error(e),
      complete: () => sub.complete(),
    }))
  }
  filter(pred) {
    return new BetterObservable(sub => this.subscribe({
      next: v => { try { if (pred(v)) sub.next(v) } catch (e) { sub.error(e) } },
      error: e => sub.error(e),
      complete: () => sub.complete(),
    }))
  }
  pipe(...ops) { return ops.reduce((acc, op) => op(acc), this) }
}

增加错误捕获

next/error/complete 回调调用用户代码前包裹 try/catch 并转化为 error 事件。

添加 Subject

Subject 可作为桥接:一个源 → 多个订阅共享同一生产者,避免多次执行 setup

Scheduler / 调度

引入一个调度函数参数(如 schedule(cb)),实现:同步、宏任务、微任务、animation frame、priority 等策略统一封装。

Backpressure / 节流

对于高频源(如事件、WebSocket)可在 next 前做缓冲/节流/丢弃策略。

高级特性实现(现有代码需先升级才能正确运作)

javascript
// Subject - 既是 Observable 又是 Observer
class Subject extends Observable {
  constructor() {
    super()
    this.observers = []
    this.closed = false
  }

  subscribe(observer) {
    if (this.closed)
      return
    this.observers.push(observer)
    return {
      unsubscribe: () => {
        const index = this.observers.indexOf(observer)
        if (index > -1)
          this.observers.splice(index, 1)
      },
    }
  }

  next(value) {
    if (this.closed)
      return
    this.observers.forEach((observer) => {
      if (typeof observer === 'function') {
        observer(value)
      }
      else if (observer.next) {
        observer.next(value)
      }
    })
  }

  error(err) {
    if (this.closed)
      return
    this.closed = true
    this.observers.forEach((observer) => {
      if (observer.error)
        observer.error(err)
    })
    this.observers = []
  }

  complete() {
    if (this.closed)
      return
    this.closed = true
    this.observers.forEach((observer) => {
      if (observer.complete)
        observer.complete()
    })
    this.observers = []
  }
}

实际应用场景(需 teardown 支持)

javascript
// 用户输入处理
function createInputObservable(element) {
  return new Observable((subscriber) => {
    const handler = event => subscriber.next(event.target.value)
    element.addEventListener('input', handler)

    return () => {
      element.removeEventListener('input', handler)
    }
  })
}

// HTTP 请求封装
function createHttpObservable(url) {
  return new Observable((subscriber) => {
    fetch(url)
      .then(response => response.json())
      .then((data) => {
        subscriber.next(data)
        subscriber.complete()
      })
      .catch(err => subscriber.error(err))
  })
}

// WebSocket 连接
function createWebSocketObservable(url) {
  return new Observable((subscriber) => {
    const ws = new WebSocket(url)

    ws.onmessage = event => subscriber.next(JSON.parse(event.data))
    ws.onerror = error => subscriber.error(error)
    ws.onclose = () => subscriber.complete()

    return () => ws.close()
  })
}

内容基于 MIT 许可 | 保持节奏 · 持续积累