export class Broker {
  constructor(connection, packer) {
    this.packer = packer;
    this.onEvent = this._onEvent.bind(this);
    this.onConnect = this._onConnect.bind(this);
    this.onDisconnect = this._onDisconnect.bind(this);
    this.subscriptions = {};
    connection.onReady(this.onConnect);
    connection.onLost(this.onDisconnect);
  }

  _onConnect(props) {
    this.session = props[0];
    Object.keys(this.subscriptions).forEach(function (room) {
      this._subscribe(
        room,
        this.subscriptions[room],
        this.subscriptions[room].options
      );
    }, this);
  }

  _onDisconnect(details) {
    this.session = null;
  }

  // _onFlow(room, at, data) {
  //   const instance = this.subscriptions[room];

  //   if (!instance) return;

  //   var handles = instance.handles || [],
  //     new_data = [at, data];

  //   handles.forEach(function(e) {
  //     e.callback.call(e.context, new_data, instance.data);
  //   });

  //   instance.data = new_data;
  //   instance.data_kwargs = null;
  // }

  _onOther(args, kwargs, details) {
    const instance = this.subscriptions[details.topic];

    if (!instance) return;

    var handles = instance.handles || [];

    handles.forEach(function (e) {
      e.callback.call(e.context, args, kwargs, instance.args, instance.kwargs);
    });
    instance.data = args;
    instance.data_kwargs = kwargs;
  }

  _onEvent(args, kwargs, details) {
    this._onOther(args, kwargs, details);
  }

  _subscribe(topic, instance, options) {
    let self = this;

    if (instance.tx || !self.session) return;

    instance.tx = setTimeout(async function () {
      // var subscribe_options = {
      //   match: "exact",
      //   get_retained: true
      // };
      try {
        const sub = await self.session.subscribe(topic, self.onEvent, options);
        instance["sub"] = sub;
        console.log("subscribed successfully", sub);
      } catch (err) {
        console.log("failed to subscribed: " + err);
      }

      instance.tx = null;
    }, Math.floor(Math.random() * 400) + 100);
  }

  subscribe(topic, callback, context, options) {
    var room = topic || "";
    // options = options || { get_retained: true, match: "exact"};
    options = options || {};

    var subscriber = {
      room: room,
      callback: callback,
      context: context || callback,
    };

    var instance = this.subscriptions[room];
    if (typeof instance === "undefined") {
      this.subscriptions[room] = instance = {
        data: null,
        data_kwargs: null,
        handles: [],
        options: options,
      };
      this._subscribe(room, instance, options);
    } else if (
      options.get_retained &&
      (instance.data || instance.data_kwargs)
    ) {
      subscriber.callback.call(
        subscriber.context,
        instance.data,
        instance.data_kwargs
      );
    }

    instance.handles.push(subscriber);
    return subscriber;
  }

  unsubscribe(subscriber) {
    var room = subscriber.room,
      instance = this.subscriptions[room];

    if (!instance) return;

    var handles = instance.handles,
      i = handles.length,
      self = this;

    while (i--) {
      if (handles[i] === subscriber) handles.splice(i, 1);
    }

    if (handles.length === 0) {
      if (instance.tx_out) {
        clearTimeout(instance.tx_out);
        instance.tx_out = null;
      }

      instance.tx_out = setTimeout(async function () {
        if (instance.handles.length === 0) {
          try {
            if (self.session && self.session.isOpen && instance["sub"]) {
              await self.session.unsubscribe(instance["sub"]);
              console.log("unsubscribe successfully: " + instance["sub"].topic);
            }
            delete self.subscriptions[room];
          } catch (err) {
            console.log("failed to unsubscribed: " + err);
          }
        }

        instance.tx_out = null;
      }, 3000);
    }
  }
}
