var Rx = require('./dist/rx.all');
require('./dist/rx.sorting');
require('./dist/rx.testing');

// Add specific Node functions
var EventEmitter = require('events').EventEmitter,
  Observable = Rx.Observable;

Rx.Node = {
  /**
   * @deprecated Use Rx.Observable.fromCallback from rx.async.js instead.
   *
   * Converts a callback function to an observable sequence.
   *
   * @param {Function} func Function to convert to an asynchronous function.
   * @param {Mixed} [context] The context for the func parameter to be executed.  If not specified, defaults to undefined.
   * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
   * @returns {Function} Asynchronous function.
   */
  fromCallback: function (func, context, selector) {
    return Observable.fromCallback(func, context, selector);
  },

  /**
   * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead.
   *
   * Converts a Node.js callback style function to an observable sequence.  This must be in function (err, ...) format.
   *
   * @param {Function} func The function to call
   * @param {Mixed} [context] The context for the func parameter to be executed.  If not specified, defaults to undefined.
   * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
   * @returns {Function} An async function which when applied, returns an observable sequence with the callback arguments as an array.
   */
  fromNodeCallback: function (func, context, selector) {
    return Observable.fromNodeCallback(func, context, selector);
  },

  /**
   * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead.
   *
   * Handles an event from the given EventEmitter as an observable sequence.
   *
   * @param {EventEmitter} eventEmitter The EventEmitter to subscribe to the given event.
   * @param {String} eventName The event name to subscribe
   * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
   * @returns {Observable} An observable sequence generated from the named event from the given EventEmitter.  The data will be returned as an array of arguments to the handler.
   */
  fromEvent: function (eventEmitter, eventName, selector) {
    return Observable.fromEvent(eventEmitter, eventName, selector);
  },

  /**
   * Converts the given observable sequence to an event emitter with the given event name.
   * The errors are handled on the 'error' event and completion on the 'end' event.
   * @param {Observable} observable The observable sequence to convert to an EventEmitter.
   * @param {String} eventName The event name to emit onNext calls.
   * @returns {EventEmitter} An EventEmitter which emits the given eventName for each onNext call in addition to 'error' and 'end' events.
   *   You must call publish in order to invoke the subscription on the Observable sequuence.
   */
  toEventEmitter: function (observable, eventName, selector) {
    var e = new EventEmitter();

    // Used to publish the events from the observable
    e.publish = function () {
      e.subscription = observable.subscribe(
        function (x) {
          var result = x;
          if (selector) {
            try {
              result = selector(x);
            } catch (e) {
              e.emit('error', e);
              return;
            }
          }

          e.emit(eventName, result);
        },
        function (err) {
          e.emit('error', err);
        },
        function () {
          e.emit('end');
        });
    };

    return e;
  },

  /**
   * Converts a flowing stream to an Observable sequence.
   * @param {Stream} stream A stream to convert to a observable sequence.
   * @param {String} [finishEventName] Event that notifies about closed stream. ("end" by default)
   * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and finish events like `end` or `finish`.
   */
  fromStream: function (stream, finishEventName) {
    stream.pause();

    finishEventName || (finishEventName = 'end');

    return Observable.create(function (observer) {
      function dataHandler (data) {
        observer.onNext(data);
      }

      function errorHandler (err) {
        observer.onError(err);
      }

      function endHandler () {
        observer.onCompleted();
      }

      stream.addListener('data', dataHandler);
      stream.addListener('error', errorHandler);
      stream.addListener(finishEventName, endHandler);

      stream.resume();

      return function () {
        stream.removeListener('data', dataHandler);
        stream.removeListener('error', errorHandler);
        stream.removeListener(finishEventName, endHandler);
      };
    }).publish().refCount();
  },

  /**
   * Converts a flowing readable stream to an Observable sequence.
   * @param {Stream} stream A stream to convert to a observable sequence.
   * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events.
   */
  fromReadableStream: function (stream) {
    return this.fromStream(stream, 'end');
  },

  /**
   * Converts a flowing writeable stream to an Observable sequence.
   * @param {Stream} stream A stream to convert to a observable sequence.
   * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
   */
  fromWritableStream: function (stream) {
    return this.fromStream(stream, 'finish');
  },

  /**
   * Converts a flowing transform stream to an Observable sequence.
   * @param {Stream} stream A stream to convert to a observable sequence.
   * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events.
   */
  fromTransformStream: function (stream) {
    return this.fromStream(stream, 'finish');
  },

  /**
   * Writes an observable sequence to a stream
   * @param {Observable} observable Observable sequence to write to a stream.
   * @param {Stream} stream The stream to write to.
   * @param {String} [encoding] The encoding of the item to write.
   * @returns {Disposable} The subscription handle.
   */
  writeToStream: function (observable, stream, encoding) {
    var source = observable.pausableBuffered();

    function onDrain() {
      source.resume();
    }

    stream.addListener('drain', onDrain);

    return source.subscribe(
      function (x) {
        !stream.write(String(x), encoding) && source.pause();
      },
      function (err) {
        stream.emit('error', err);
      },
      function () {
        // Hack check because STDIO is not closable
        !stream._isStdio && stream.end();
        stream.removeListener('drain', onDrain);
      });

    source.resume();
  }
};

/**
 * Pipes the existing Observable sequence into a Node.js Stream.
 * @param {Stream} dest The destination Node.js stream.
 * @returns {Stream} The destination stream.
 */
Rx.Observable.prototype.pipe = function (dest) {
  var source = this.pausableBuffered();

  function onDrain() {
    source.resume();
  }

  dest.addListener('drain', onDrain);

  source.subscribe(
    function (x) {
      !dest.write(String(x)) && source.pause();
    },
    function (err) {
      dest.emit('error', err);
    },
    function () {
      // Hack check because STDIO is not closable
      !dest._isStdio && dest.end();
      dest.removeListener('drain', onDrain);
    });

  source.resume();

  return dest;
};

module.exports = Rx;
