// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information. ;(function (factory) { var objectTypes = { 'function': true, 'object': true }; function checkGlobal(value) { return (value && value.Object === Object) ? value : null; } var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null; var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null; var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global); var freeSelf = checkGlobal(objectTypes[typeof self] && self); var freeWindow = checkGlobal(objectTypes[typeof window] && window); var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null; var thisGlobal = checkGlobal(objectTypes[typeof this] && this); var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')(); // Because of build optimizers if (typeof define === 'function' && define.amd) { define(['./rx'], function (Rx, exports) { return factory(root, exports, Rx); }); } else if (typeof module === 'object' && module && module.exports === freeExports) { module.exports = factory(root, module.exports, require('./rx')); } else { root.Rx = factory(root, {}, root.Rx); } }.call(this, function (root, exp, Rx, undefined) { var Observable = Rx.Observable, observableProto = Observable.prototype, AnonymousObservable = Rx.AnonymousObservable, ObservableBase = Rx.ObservableBase, Subject = Rx.Subject, AsyncSubject = Rx.AsyncSubject, Observer = Rx.Observer, ScheduledObserver = Rx.internals.ScheduledObserver, disposableCreate = Rx.Disposable.create, disposableEmpty = Rx.Disposable.empty, BinaryDisposable = Rx.BinaryDisposable, currentThreadScheduler = Rx.Scheduler.currentThread, isFunction = Rx.helpers.isFunction, inherits = Rx.internals.inherits, addProperties = Rx.internals.addProperties, checkDisposed = Rx.Disposable.checkDisposed; // Utilities function cloneArray(arr) { var len = arr.length, a = new Array(len); for(var i = 0; i < len; i++) { a[i] = arr[i]; } return a; } var MulticastObservable = (function (__super__) { inherits(MulticastObservable, __super__); function MulticastObservable(source, fn1, fn2) { this.source = source; this._fn1 = fn1; this._fn2 = fn2; __super__.call(this); } MulticastObservable.prototype.subscribeCore = function (o) { var connectable = this.source.multicast(this._fn1()); return new BinaryDisposable(this._fn2(connectable).subscribe(o), connectable.connect()); }; return MulticastObservable; }(ObservableBase)); /** * Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each * subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's * invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay. * * @example * 1 - res = source.multicast(observable); * 2 - res = source.multicast(function () { return new Subject(); }, function (x) { return x; }); * * @param {Function|Subject} subjectOrSubjectSelector * Factory function to create an intermediate subject through which the source sequence's elements will be multicast to the selector function. * Or: * Subject to push source elements into. * * @param {Function} [selector] Optional selector function which can use the multicasted source sequence subject to the policies enforced by the created subject. Specified only if 0; }, /** * Notifies all subscribed observers about the end of the sequence. */ onCompleted: function () { checkDisposed(this); if (this.isStopped) { return; } this.isStopped = true; for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { os[i].onCompleted(); } this.observers.length = 0; }, /** * Notifies all subscribed observers about the exception. * @param {Mixed} error The exception to send to all observers. */ onError: function (error) { checkDisposed(this); if (this.isStopped) { return; } this.isStopped = true; this.hasError = true; this.error = error; for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { os[i].onError(error); } this.observers.length = 0; }, /** * Notifies all subscribed observers about the arrival of the specified element in the sequence. * @param {Mixed} value The value to send to all observers. */ onNext: function (value) { checkDisposed(this); if (this.isStopped) { return; } this.value = value; for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { os[i].onNext(value); } }, /** * Unsubscribe all observers and release resources. */ dispose: function () { this.isDisposed = true; this.observers = null; this.value = null; this.error = null; } }); return BehaviorSubject; }(Observable)); /** * Represents an object that is both an observable sequence as well as an observer. * Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies. */ var ReplaySubject = Rx.ReplaySubject = (function (__super__) { var maxSafeInteger = Math.pow(2, 53) - 1; function createRemovableDisposable(subject, observer) { return disposableCreate(function () { observer.dispose(); !subject.isDisposed && subject.observers.splice(subject.observers.indexOf(observer), 1); }); } inherits(ReplaySubject, __super__); /** * Initializes a new instance of the ReplaySubject class with the specified buffer size, window size and scheduler. * @param {Number} [bufferSize] Maximum element count of the replay buffer. * @param {Number} [windowSize] Maximum time length of the replay buffer. * @param {Scheduler} [scheduler] Scheduler the observers are invoked on. */ function ReplaySubject(bufferSize, windowSize, scheduler) { this.bufferSize = bufferSize == null ? maxSafeInteger : bufferSize; this.windowSize = windowSize == null ? maxSafeInteger : windowSize; this.scheduler = scheduler || currentThreadScheduler; this.q = []; this.observers = []; this.isStopped = false; this.isDisposed = false; this.hasError = false; this.error = null; __super__.call(this); } addProperties(ReplaySubject.prototype, Observer.prototype, { _subscribe: function (o) { checkDisposed(this); var so = new ScheduledObserver(this.scheduler, o), subscription = createRemovableDisposable(this, so); this._trim(this.scheduler.now()); this.observers.push(so); for (var i = 0, len = this.q.length; i < len; i++) { so.onNext(this.q[i].value); } if (this.hasError) { so.onError(this.error); } else if (this.isStopped) { so.onCompleted(); } so.ensureActive(); return subscription; }, /** * Indicates whether the subject has observers subscribed to it. * @returns {Boolean} Indicates whether the subject has observers subscribed to it. */ hasObservers: function () { checkDisposed(this); return this.observers.length > 0; }, _trim: function (now) { while (this.q.length > this.bufferSize) { this.q.shift(); } while (this.q.length > 0 && (now - this.q[0].interval) > this.windowSize) { this.q.shift(); } }, /** * Notifies all subscribed observers about the arrival of the specified element in the sequence. * @param {Mixed} value The value to send to all observers. */ onNext: function (value) { checkDisposed(this); if (this.isStopped) { return; } var now = this.scheduler.now(); this.q.push({ interval: now, value: value }); this._trim(now); for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { var observer = os[i]; observer.onNext(value); observer.ensureActive(); } }, /** * Notifies all subscribed observers about the exception. * @param {Mixed} error The exception to send to all observers. */ onError: function (error) { checkDisposed(this); if (this.isStopped) { return; } this.isStopped = true; this.error = error; this.hasError = true; var now = this.scheduler.now(); this._trim(now); for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { var observer = os[i]; observer.onError(error); observer.ensureActive(); } this.observers.length = 0; }, /** * Notifies all subscribed observers about the end of the sequence. */ onCompleted: function () { checkDisposed(this); if (this.isStopped) { return; } this.isStopped = true; var now = this.scheduler.now(); this._trim(now); for (var i = 0, os = cloneArray(this.observers), len = os.length; i < len; i++) { var observer = os[i]; observer.onCompleted(); observer.ensureActive(); } this.observers.length = 0; }, /** * Unsubscribe all observers and release resources. */ dispose: function () { this.isDisposed = true; this.observers = null; } }); return ReplaySubject; }(Observable)); var RefCountObservable = (function (__super__) { inherits(RefCountObservable, __super__); function RefCountObservable(source) { this.source = source; this._count = 0; this._connectableSubscription = null; __super__.call(this); } RefCountObservable.prototype.subscribeCore = function (o) { var subscription = this.source.subscribe(o); ++this._count === 1 && (this._connectableSubscription = this.source.connect()); return new RefCountDisposable(this, subscription); }; function RefCountDisposable(p, s) { this._p = p; this._s = s; this.isDisposed = false; } RefCountDisposable.prototype.dispose = function () { if (!this.isDisposed) { this.isDisposed = true; this._s.dispose(); --this._p._count === 0 && this._p._connectableSubscription.dispose(); } }; return RefCountObservable; }(ObservableBase)); var ConnectableObservable = Rx.ConnectableObservable = (function (__super__) { inherits(ConnectableObservable, __super__); function ConnectableObservable(source, subject) { this.source = source; this._connection = null; this._source = source.asObservable(); this._subject = subject; __super__.call(this); } function ConnectDisposable(parent, subscription) { this._p = parent; this._s = subscription; } ConnectDisposable.prototype.dispose = function () { if (this._s) { this._s.dispose(); this._s = null; this._p._connection = null; } }; ConnectableObservable.prototype.connect = function () { if (!this._connection) { if (this._subject.isStopped) { return disposableEmpty; } var subscription = this._source.subscribe(this._subject); this._connection = new ConnectDisposable(this, subscription); } return this._connection; }; ConnectableObservable.prototype._subscribe = function (o) { return this._subject.subscribe(o); }; ConnectableObservable.prototype.refCount = function () { return new RefCountObservable(this); }; return ConnectableObservable; }(Observable)); /** * Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence * can be resubscribed to, even if all prior subscriptions have ended. (unlike `.publish().refCount()`) * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source. */ observableProto.singleInstance = function() { var source = this, hasObservable = false, observable; function getObservable() { if (!hasObservable) { hasObservable = true; observable = source['finally'](function() { hasObservable = false; }).publish().refCount(); } return observable; } return new AnonymousObservable(function(o) { return getObservable().subscribe(o); }); }; return Rx; }));