Demo演示

初始状态下两个🔔都订阅了notify事件, 区别是右边的🔔在被通知一次后就取消订阅, 试试看。

  1. 点击publish通知, 观察到左边badge部分数字随着点击增加, 而右边数字仅增加一次
  2. 点击unsubscribe再点击publish通知, 观察到左边数字不再增加.
  3. 点击reset重置
0
0
subscribeOnce

相关逻辑

const pubsub = new PubSub()
 
function CountSubscriber({...props}) {
  const [subCount, dispatch] = useReducer(x=> x+1, 0)
  const [subCountOnce, dispatchOnce] = useReducer(x=> x+1, 0)
  const token = useRef(null)
 
  useEffect(()=>{
    token.current = pubsub.subscribe('notify', dispatch)
    //In development mode when reactStrictMode is true, subscribeOnce will be execute twice 
    pubsub.subscribeOnce('notify', dispatchOnce)
    return () =>{
      pubsub.unsubscribe(token.current)
    }
  }, [])
  return (
    <div className="inline-flex justify-around border-blue-200 rounded border-2 outline outline-offset-4 outline-blue-300">
      <div className="inline-flex flex-col items-center py-4 px-8">
        <Badge badgeContent={subCount}>🔔</Badge>
        <button 
          className="block mx-auto leading-[20px] text-center rounded-full bg-blue-300 py-1 px-2 text-white"
          onClick={() => { pubsub.unsubscribe(token.current) }}
        >
          unsubscribe
        </button>
      </div>
      <div className="inline-flex flex-col items-center py-4 px-8">
        <Badge badgeContent={subCountOnce}>🔔</Badge>
        <span 
          className="block mx-auto leading-[20px] text-center rounded-full py-1 px-2"
        >
          subscribeOnce
        </span>
      </div>
    </div>
  )
}

Vue中的发布订阅

  • 简要思路
let data = { price: 5, quantity: 2}
let target, total, salePrice
 
class Dep {
  constructor() {
    this.subscribers = []
  }
  depend() {
    if(target && !this.subscribers.includes(target)) {
      this.subscribers.push(target)
    }
  }
  notify() {
    this.subscribers.forEach(sub => sub())
  }
}
 
Object.keys(data).forEach(key => {
  let internalVal = data[key]
 
  const dep = new Dep()
 
  Object.defineProperty(data, key, {
    get() {
      dep.depend()
      return internalVal
    },
    set(newVal) {
      internalVal = newVal
      dep.notify()
    }
  })
})
 
function watcher(myFunc) {
  target = myFunc
  target()
  target = null
}
 
watcher(() => {
  total = data.price * data.quantity
})
  • 使用proxy
let deps = new Map()
 
Object.keys(data).forEach(key => {
  deps.set(key, new Dep())
})
 
let data_without_proxy = data
data = new Proxy(data_without_proxy, {
  get(obj, key) {
    deps.get(key).depend();
    return obj[key]
  },
  set(obj, key, newVal) {
    obj[key] = newVal;
    deps.get(key).notify();
    return true
  }
})

具体实现思路

三个部分, 订阅(sunscribe)、通知(publish)、消息中心(messages)

先贴一个openAI回答的🤔

// 订阅
function subscribe (topic, callback) {
    if (!this._topics[topic]) this._topics[topic] = [];
    this._topics[topic].push(callback);
}
 
// 发布
function publish (topic) {
    var args;
    if (!this._topics[topic]) return false;
    args = Array.prototype.slice.call(arguments, 1);
    for (var i = 0; i < this._topics[topic].length; i++) {
        this._topics[topic][i].apply(this, args);
    }
    return true;
}
 
// 应用示例
var pubSubBroker = {
    _topics: {},
    subscribe: subscribe,
    publish: publish
};
 
// 调用示例
pubSubBroker.subscribe('event', callbackFunction);
pubSubBroker.publish('event', arg1, arg2);

个人实现

export default class PubSub {
  messages
  lastUid: number
  constructor() {
    this.messages = {};
    this.lastUid = -1;
  }
  
  #throwException(e) {
    return function() {
      throw e;
    }
  }
 
  #callSubscriber(subscriber, msg, data) {
    try {
      subscriber(msg, data);
    } catch(err) {
      setTimeout(this.#throwException(err), 0)
    }
  }
 
  #deliverMessage(msg, data) {
    var subscribers = this.messages[msg];
    var _this = this
    return function() {
      for(let s in subscribers) {
        if( Object.prototype.hasOwnProperty.call(subscribers, s)) {
          _this.#callSubscriber(subscribers[s], msg, data);
        }
      }
    }
  }
 
  #messageHasSubcribers(msg) {
    var topic = String(msg),
        found = Object.prototype.hasOwnProperty.call(this.messages, topic)
    return found;
  }
 
  #publishMsg(msg, data, async) {
    var deliver = this.#deliverMessage(msg, data);
    var hasSubscribers = this.#messageHasSubcribers(msg);
    if(!hasSubscribers) {
      return false;
    }
    if(async === true) {
      setTimeout(deliver, 0)
    } else {
      deliver()
    }
    return true;
  }
 
  #clearSubscriptions(topic) {
    var m;
    for(m in this.messages){
      if(Object.prototype.hasOwnProperty.call(this.messages, m) && m.indexOf(topic) === 0){
        delete this.messages[m];
      }
    }
  }
 
  publish(msg, data) {
    this.#publishMsg(msg, data, false)
  }
  publishAsync(msg, data) {
    this.#publishMsg(msg, data, true)
  }
  subscribe(msg, func) {
    if(typeof func !== 'function') {
      return false;
    }
 
    // msg is not registed
    if( !Object.prototype.hasOwnProperty.call(this.messages, msg) ) {
      this.messages[msg] = {};
    }
    // 不同实例订阅的msg可能有不同行为,故以token为标识
    var token = 'uid_' + String(++this.lastUid);
    this.messages[msg][token] = func;
    // return token for unsubscribing
    return token;
  }
/* 
 * @param {String | Function } value A token, function or topic
 */
  unsubscribe(value) {
    var isTopic = typeof value === 'string' && Object.prototype.hasOwnProperty.call(this.messages, value),
        isToken = !isTopic && typeof value === 'string',
        isFunction = typeof value === 'function',
        result = false,
        m, message, t
    if(isTopic) {
      this.#clearSubscriptions(value);
      return;
    }
    for (m in this.messages) {
      if(Object.prototype.hasOwnProperty.call(this.messages, m) ) {
        message = this.messages[m];
 
        if( isToken && message[value]){
          delete message[value];
          result = value;
          break;
        }
 
        if(isFunction) {
          for (t in message) {
            if(Object.prototype.hasOwnProperty.call(message, t) && message[t] === value) {
              delete message[t];
              result = true;
            }
          }
        }
      }
    }
  }
  subscribeOnce(msg, func) {
    var _this = this
    var token = this.subscribe(msg, function() {
      _this.unsubscribe(token);
      //func.bind(null, ...arguments)()
      func.apply(null, arguments)
    })
    return this
  }
}