// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information. ;(function (factory) { var objectTypes = { 'function': true, 'object': true }; function checkGlobal(value) { return (value && value.Object === Object) ? value : null; } var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null; var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null; var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global); var freeSelf = checkGlobal(objectTypes[typeof self] && self); var freeWindow = checkGlobal(objectTypes[typeof window] && window); var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null; var thisGlobal = checkGlobal(objectTypes[typeof this] && this); var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')(); // Because of build optimizers if (typeof define === 'function' && define.amd) { define(['./rx.lite'], function (Rx, exports) { return factory(root, exports, Rx); }); } else if (typeof module === 'object' && module && module.exports === freeExports) { module.exports = factory(root, module.exports, require('rx-lite')); } else { root.Rx = factory(root, {}, root.Rx); } }.call(this, function (root, exp, Rx, undefined) { // References var Observable = Rx.Observable, observableProto = Observable.prototype, observableNever = Observable.never, observableThrow = Observable['throw'], AnonymousObservable = Rx.AnonymousObservable, ObservableBase = Rx.ObservableBase, AnonymousObserver = Rx.AnonymousObserver, notificationCreateOnNext = Rx.Notification.createOnNext, notificationCreateOnError = Rx.Notification.createOnError, notificationCreateOnCompleted = Rx.Notification.createOnCompleted, Observer = Rx.Observer, observerCreate = Observer.create, AbstractObserver = Rx.internals.AbstractObserver, Subject = Rx.Subject, internals = Rx.internals, helpers = Rx.helpers, ScheduledObserver = internals.ScheduledObserver, SerialDisposable = Rx.SerialDisposable, SingleAssignmentDisposable = Rx.SingleAssignmentDisposable, CompositeDisposable = Rx.CompositeDisposable, BinaryDisposable = Rx.BinaryDisposable, RefCountDisposable = Rx.RefCountDisposable, disposableEmpty = Rx.Disposable.empty, immediateScheduler = Rx.Scheduler.immediate, defaultKeySerializer = helpers.defaultKeySerializer, addRef = Rx.internals.addRef, identity = helpers.identity, isPromise = helpers.isPromise, isFunction = helpers.isFunction, inherits = internals.inherits, bindCallback = internals.bindCallback, noop = helpers.noop, isScheduler = Rx.Scheduler.isScheduler, observableFromPromise = Observable.fromPromise, ArgumentOutOfRangeError = Rx.ArgumentOutOfRangeError; var errorObj = {e: {}}; function tryCatcherGen(tryCatchTarget) { return function tryCatcher() { try { return tryCatchTarget.apply(this, arguments); } catch (e) { errorObj.e = e; return errorObj; } }; } var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) { if (!isFunction(fn)) { throw new TypeError('fn must be a function'); } return tryCatcherGen(fn); }; function thrower(e) { throw e; } function ScheduledDisposable(scheduler, disposable) { this.scheduler = scheduler; this.disposable = disposable; this.isDisposed = false; } function scheduleItem(s, self) { if (!self.isDisposed) { self.isDisposed = true; self.disposable.dispose(); } } ScheduledDisposable.prototype.dispose = function () { this.scheduler.schedule(this, scheduleItem); }; var CheckedObserver = (function (__super__) { inherits(CheckedObserver, __super__); function CheckedObserver(observer) { __super__.call(this); this._observer = observer; this._state = 0; // 0 - idle, 1 - busy, 2 - done } var CheckedObserverPrototype = CheckedObserver.prototype; CheckedObserverPrototype.onNext = function (value) { this.checkAccess(); var res = tryCatch(this._observer.onNext).call(this._observer, value); this._state = 0; res === errorObj && thrower(res.e); }; CheckedObserverPrototype.onError = function (err) { this.checkAccess(); var res = tryCatch(this._observer.onError).call(this._observer, err); this._state = 2; res === errorObj && thrower(res.e); }; CheckedObserverPrototype.onCompleted = function () { this.checkAccess(); var res = tryCatch(this._observer.onCompleted).call(this._observer); this._state = 2; res === errorObj && thrower(res.e); }; CheckedObserverPrototype.checkAccess = function () { if (this._state === 1) { throw new Error('Re-entrancy detected'); } if (this._state === 2) { throw new Error('Observer completed'); } if (this._state === 0) { this._state = 1; } }; return CheckedObserver; }(Observer)); var ObserveOnObserver = (function (__super__) { inherits(ObserveOnObserver, __super__); function ObserveOnObserver(scheduler, observer, cancel) { __super__.call(this, scheduler, observer); this._cancel = cancel; } ObserveOnObserver.prototype.next = function (value) { __super__.prototype.next.call(this, value); this.ensureActive(); }; ObserveOnObserver.prototype.error = function (e) { __super__.prototype.error.call(this, e); this.ensureActive(); }; ObserveOnObserver.prototype.completed = function () { __super__.prototype.completed.call(this); this.ensureActive(); }; ObserveOnObserver.prototype.dispose = function () { __super__.prototype.dispose.call(this); this._cancel && this._cancel.dispose(); this._cancel = null; }; return ObserveOnObserver; })(ScheduledObserver); /** * Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. * If a violation is detected, an Error is thrown from the offending observer method call. * * @returns An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer. */ Observer.prototype.checked = function () { return new CheckedObserver(this); }; /** * Schedules the invocation of observer methods on the given scheduler. * @param {Scheduler} scheduler Scheduler to schedule observer messages on. * @returns {Observer} Observer whose messages are scheduled on the given scheduler. */ Observer.notifyOn = function (scheduler) { return new ObserveOnObserver(scheduler, this); }; /** * Creates an observer from a notification callback. * @param {Function} handler Action that handles a notification. * @returns The observer object that invokes the specified handler using a notification corresponding to each message it receives. */ Observer.fromNotifier = function (handler, thisArg) { var handlerFunc = bindCallback(handler, thisArg, 1); return new AnonymousObserver(function (x) { return handlerFunc(notificationCreateOnNext(x)); }, function (e) { return handlerFunc(notificationCreateOnError(e)); }, function () { return handlerFunc(notificationCreateOnCompleted()); }); }; /** * Creates a notification callback from an observer. * @returns The action that forwards its input notification to the underlying observer. */ Observer.prototype.toNotifier = function () { var observer = this; return function (n) { return n.accept(observer); }; }; /** * Hides the identity of an observer. * @returns An observer that hides the identity of the specified observer. */ Observer.prototype.asObserver = function () { var source = this; return new AnonymousObserver( function (x) { source.onNext(x); }, function (e) { source.onError(e); }, function () { source.onCompleted(); } ); }; var ObserveOnObservable = (function (__super__) { inherits(ObserveOnObservable, __super__); function ObserveOnObservable(source, s) { this.source = source; this._s = s; __super__.call(this); } ObserveOnObservable.prototype.subscribeCore = function (o) { return this.source.subscribe(new ObserveOnObserver(this._s, o)); }; return ObserveOnObservable; }(ObservableBase)); /** * Wraps the source sequence in order to run its observer callbacks on the specified scheduler. * * This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects * that require to be run on a scheduler, use subscribeOn. * * @param {Scheduler} scheduler Scheduler to notify observers on. * @returns {Observable} The source sequence whose observations happen on the specified scheduler. */ observableProto.observeOn = function (scheduler) { return new ObserveOnObservable(this, scheduler); }; var SubscribeOnObservable = (function (__super__) { inherits(SubscribeOnObservable, __super__); function SubscribeOnObservable(source, s) { this.source = source; this._s = s; __super__.call(this); } function scheduleMethod(scheduler, state) { var source = state[0], d = state[1], o = state[2]; d.setDisposable(new ScheduledDisposable(scheduler, source.subscribe(o))); } SubscribeOnObservable.prototype.subscribeCore = function (o) { var m = new SingleAssignmentDisposable(), d = new SerialDisposable(); d.setDisposable(m); m.setDisposable(this._s.schedule([this.source, d, o], scheduleMethod)); return d; }; return SubscribeOnObservable; }(ObservableBase)); /** * Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used; * see the remarks section for more information on the distinction between subscribeOn and observeOn. * This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer * callbacks on a scheduler, use observeOn. * @param {Scheduler} scheduler Scheduler to perform subscription and unsubscription actions on. * @returns {Observable} The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. */ observableProto.subscribeOn = function (scheduler) { return new SubscribeOnObservable(this, scheduler); }; var GenerateObservable = (function (__super__) { inherits(GenerateObservable, __super__); function GenerateObservable(state, cndFn, itrFn, resFn, s) { this._initialState = state; this._cndFn = cndFn; this._itrFn = itrFn; this._resFn = resFn; this._s = s; __super__.call(this); } function scheduleRecursive(state, recurse) { if (state.first) { state.first = false; } else { state.newState = tryCatch(state.self._itrFn)(state.newState); if (state.newState === errorObj) { return state.o.onError(state.newState.e); } } var hasResult = tryCatch(state.self._cndFn)(state.newState); if (hasResult === errorObj) { return state.o.onError(hasResult.e); } if (hasResult) { var result = tryCatch(state.self._resFn)(state.newState); if (result === errorObj) { return state.o.onError(result.e); } state.o.onNext(result); recurse(state); } else { state.o.onCompleted(); } } GenerateObservable.prototype.subscribeCore = function (o) { var state = { o: o, self: this, first: true, newState: this._initialState }; return this._s.scheduleRecursive(state, scheduleRecursive); }; return GenerateObservable; }(ObservableBase)); /** * Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages. * * @example * var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; }); * var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; }, Rx.Scheduler.timeout); * @param {Mixed} initialState Initial state. * @param {Function} condition Condition to terminate generation (upon returning false). * @param {Function} iterate Iteration step function. * @param {Function} resultSelector Selector function for results produced in the sequence. * @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not provided, defaults to Scheduler.currentThread. * @returns {Observable} The generated sequence. */ Observable.generate = function (initialState, condition, iterate, resultSelector, scheduler) { isScheduler(scheduler) || (scheduler = currentThreadScheduler); return new GenerateObservable(initialState, condition, iterate, resultSelector, scheduler); }; var UsingObservable = (function (__super__) { inherits(UsingObservable, __super__); function UsingObservable(resFn, obsFn) { this._resFn = resFn; this._obsFn = obsFn; __super__.call(this); } UsingObservable.prototype.subscribeCore = function (o) { var disposable = disposableEmpty; var resource = tryCatch(this._resFn)(); if (resource === errorObj) { return new BinaryDisposable(observableThrow(resource.e).subscribe(o), disposable); } resource && (disposable = resource); var source = tryCatch(this._obsFn)(resource); if (source === errorObj) { return new BinaryDisposable(observableThrow(source.e).subscribe(o), disposable); } return new BinaryDisposable(source.subscribe(o), disposable); }; return UsingObservable; }(ObservableBase)); /** * Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime. * @param {Function} resourceFactory Factory function to obtain a resource object. * @param {Function} observableFactory Factory function to obtain an observable sequence that depends on the obtained resource. * @returns {Observable} An observable sequence whose lifetime controls the lifetime of the dependent resource object. */ Observable.using = function (resourceFactory, observableFactory) { return new UsingObservable(resourceFactory, observableFactory); }; /** * Propagates the observable sequence or Promise that reacts first. * @param {Observable} rightSource Second observable sequence or Promise. * @returns {Observable} {Observable} An observable sequence that surfaces either of the given sequences, whichever reacted first. */ observableProto.amb = function (rightSource) { var leftSource = this; return new AnonymousObservable(function (observer) { var choice, leftChoice = 'L', rightChoice = 'R', leftSubscription = new SingleAssignmentDisposable(), rightSubscription = new SingleAssignmentDisposable(); isPromise(rightSource) && (rightSource = observableFromPromise(rightSource)); function choiceL() { if (!choice) { choice = leftChoice; rightSubscription.dispose(); } } function choiceR() { if (!choice) { choice = rightChoice; leftSubscription.dispose(); } } var leftSubscribe = observerCreate( function (left) { choiceL(); choice === leftChoice && observer.onNext(left); }, function (e) { choiceL(); choice === leftChoice && observer.onError(e); }, function () { choiceL(); choice === leftChoice && observer.onCompleted(); } ); var rightSubscribe = observerCreate( function (right) { choiceR(); choice === rightChoice && observer.onNext(right); }, function (e) { choiceR(); choice === rightChoice && observer.onError(e); }, function () { choiceR(); choice === rightChoice && observer.onCompleted(); } ); leftSubscription.setDisposable(leftSource.subscribe(leftSubscribe)); rightSubscription.setDisposable(rightSource.subscribe(rightSubscribe)); return new BinaryDisposable(leftSubscription, rightSubscription); }); }; function amb(p, c) { return p.amb(c); } /** * Propagates the observable sequence or Promise that reacts first. * @returns {Observable} An observable sequence that surfaces any of the given sequences, whichever reacted first. */ Observable.amb = function () { var acc = observableNever(), items; if (Array.isArray(arguments[0])) { items = arguments[0]; } else { var len = arguments.length; items = new Array(items); for(var i = 0; i < len; i++) { items[i] = arguments[i]; } } for (var i = 0, len = items.length; i < len; i++) { acc = amb(acc, items[i]); } return acc; }; /** * Continues an observable sequence that is terminated normally or by an exception with the next observable sequence. * @param {Observable} second Second observable sequence used to produce results after the first sequence terminates. * @returns {Observable} An observable sequence that concatenates the first and second sequence, even if the first sequence terminates exceptionally. */ observableProto.onErrorResumeNext = function (second) { if (!second) { throw new Error('Second observable is required'); } return onErrorResumeNext([this, second]); }; var OnErrorResumeNextObservable = (function(__super__) { inherits(OnErrorResumeNextObservable, __super__); function OnErrorResumeNextObservable(sources) { this.sources = sources; __super__.call(this); } function scheduleMethod(state, recurse) { if (state.pos < state.sources.length) { var current = state.sources[state.pos++]; isPromise(current) && (current = observableFromPromise(current)); var d = new SingleAssignmentDisposable(); state.subscription.setDisposable(d); d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse))); } else { state.o.onCompleted(); } } OnErrorResumeNextObservable.prototype.subscribeCore = function (o) { var subscription = new SerialDisposable(), state = {pos: 0, subscription: subscription, o: o, sources: this.sources }, cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod); return new BinaryDisposable(subscription, cancellable); }; return OnErrorResumeNextObservable; }(ObservableBase)); var OnErrorResumeNextObserver = (function(__super__) { inherits(OnErrorResumeNextObserver, __super__); function OnErrorResumeNextObserver(state, recurse) { this._state = state; this._recurse = recurse; __super__.call(this); } OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); }; OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); }; OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); }; return OnErrorResumeNextObserver; }(AbstractObserver)); /** * Continues an observable sequence that is terminated normally or by an exception with the next observable sequence. * @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally. */ var onErrorResumeNext = Observable.onErrorResumeNext = function () { var sources = []; if (Array.isArray(arguments[0])) { sources = arguments[0]; } else { var len = arguments.length; sources = new Array(len); for(var i = 0; i < len; i++) { sources[i] = arguments[i]; } } return new OnErrorResumeNextObservable(sources); }; function toArray(x) { return x.toArray(); } function notEmpty(x) { return x.length > 0; } /** * Projects each element of an observable sequence into zero or more buffers which are produced based on element count information. * @param {Number} count Length of each buffer. * @param {Number} [skip] Number of elements to skip between creation of consecutive buffers. If not provided, defaults to the count. * @returns {Observable} An observable sequence of buffers. */ observableProto.bufferWithCount = observableProto.bufferCount = function (count, skip) { typeof skip !== 'number' && (skip = count); return this.windowWithCount(count, skip) .flatMap(toArray) .filter(notEmpty); }; /** * Projects each element of an observable sequence into zero or more windows which are produced based on element count information. * @param {Number} count Length of each window. * @param {Number} [skip] Number of elements to skip between creation of consecutive windows. If not specified, defaults to the count. * @returns {Observable} An observable sequence of windows. */ observableProto.windowWithCount = observableProto.windowCount = function (count, skip) { var source = this; +count || (count = 0); Math.abs(count) === Infinity && (count = 0); if (count <= 0) { throw new ArgumentOutOfRangeError(); } skip == null && (skip = count); +skip || (skip = 0); Math.abs(skip) === Infinity && (skip = 0); if (skip <= 0) { throw new ArgumentOutOfRangeError(); } return new AnonymousObservable(function (observer) { var m = new SingleAssignmentDisposable(), refCountDisposable = new RefCountDisposable(m), n = 0, q = []; function createWindow () { var s = new Subject(); q.push(s); observer.onNext(addRef(s, refCountDisposable)); } createWindow(); m.setDisposable(source.subscribe( function (x) { for (var i = 0, len = q.length; i < len; i++) { q[i].onNext(x); } var c = n - count + 1; c >= 0 && c % skip === 0 && q.shift().onCompleted(); ++n % skip === 0 && createWindow(); }, function (e) { while (q.length > 0) { q.shift().onError(e); } observer.onError(e); }, function () { while (q.length > 0) { q.shift().onCompleted(); } observer.onCompleted(); } )); return refCountDisposable; }, source); }; var TakeLastBufferObserver = (function (__super__) { inherits(TakeLastBufferObserver, __super__); function TakeLastBufferObserver(o, c) { this._o = o; this._c = c; this._q = []; __super__.call(this); } TakeLastBufferObserver.prototype.next = function (x) { this._q.push(x); this._q.length > this._c && this._q.shift(); }; TakeLastBufferObserver.prototype.error = function (e) { this._o.onError(e); }; TakeLastBufferObserver.prototype.completed = function () { this._o.onNext(this._q); this._o.onCompleted(); }; return TakeLastBufferObserver; }(AbstractObserver)); /** * Returns an array with the specified number of contiguous elements from the end of an observable sequence. * * @description * This operator accumulates a buffer with a length enough to store count elements. Upon completion of the * source sequence, this buffer is produced on the result sequence. * @param {Number} count Number of elements to take from the end of the source sequence. * @returns {Observable} An observable sequence containing a single array with the specified number of elements from the end of the source sequence. */ observableProto.takeLastBuffer = function (count) { if (count < 0) { throw new ArgumentOutOfRangeError(); } var source = this; return new AnonymousObservable(function (o) { return source.subscribe(new TakeLastBufferObserver(o, count)); }, source); }; var DefaultIfEmptyObserver = (function (__super__) { inherits(DefaultIfEmptyObserver, __super__); function DefaultIfEmptyObserver(o, d) { this._o = o; this._d = d; this._f = false; __super__.call(this); } DefaultIfEmptyObserver.prototype.next = function (x) { this._f = true; this._o.onNext(x); }; DefaultIfEmptyObserver.prototype.error = function (e) { this._o.onError(e); }; DefaultIfEmptyObserver.prototype.completed = function () { !this._f && this._o.onNext(this._d); this._o.onCompleted(); }; return DefaultIfEmptyObserver; }(AbstractObserver)); /** * Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty. * * var res = obs = xs.defaultIfEmpty(); * 2 - obs = xs.defaultIfEmpty(false); * * @memberOf Observable# * @param defaultValue The value to return if the sequence is empty. If not provided, this defaults to null. * @returns {Observable} An observable sequence that contains the specified default value if the source is empty; otherwise, the elements of the source itself. */ observableProto.defaultIfEmpty = function (defaultValue) { var source = this; defaultValue === undefined && (defaultValue = null); return new AnonymousObservable(function (o) { return source.subscribe(new DefaultIfEmptyObserver(o, defaultValue)); }, source); }; // Swap out for Array.findIndex function arrayIndexOfComparer(array, item, comparer) { for (var i = 0, len = array.length; i < len; i++) { if (comparer(array[i], item)) { return i; } } return -1; } function HashSet(comparer) { this.comparer = comparer; this.set = []; } HashSet.prototype.push = function(value) { var retValue = arrayIndexOfComparer(this.set, value, this.comparer) === -1; retValue && this.set.push(value); return retValue; }; var DistinctObservable = (function (__super__) { inherits(DistinctObservable, __super__); function DistinctObservable(source, keyFn, cmpFn) { this.source = source; this._keyFn = keyFn; this._cmpFn = cmpFn; __super__.call(this); } DistinctObservable.prototype.subscribeCore = function (o) { return this.source.subscribe(new DistinctObserver(o, this._keyFn, this._cmpFn)); }; return DistinctObservable; }(ObservableBase)); var DistinctObserver = (function (__super__) { inherits(DistinctObserver, __super__); function DistinctObserver(o, keyFn, cmpFn) { this._o = o; this._keyFn = keyFn; this._h = new HashSet(cmpFn); __super__.call(this); } DistinctObserver.prototype.next = function (x) { var key = x; if (isFunction(this._keyFn)) { key = tryCatch(this._keyFn)(x); if (key === errorObj) { return this._o.onError(key.e); } } this._h.push(key) && this._o.onNext(x); }; DistinctObserver.prototype.error = function (e) { this._o.onError(e); }; DistinctObserver.prototype.completed = function () { this._o.onCompleted(); }; return DistinctObserver; }(AbstractObserver)); /** * Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer. * Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large. * * @example * var res = obs = xs.distinct(); * 2 - obs = xs.distinct(function (x) { return x.id; }); * 2 - obs = xs.distinct(function (x) { return x.id; }, function (a,b) { return a === b; }); * @param {Function} [keySelector] A function to compute the comparison key for each element. * @param {Function} [comparer] Used to compare items in the collection. * @returns {Observable} An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence. */ observableProto.distinct = function (keySelector, comparer) { comparer || (comparer = defaultComparer); return new DistinctObservable(this, keySelector, comparer); }; /** * Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence * can be resubscribed to, even if all prior subscriptions have ended. (unlike `.publish().refCount()`) * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source. */ observableProto.singleInstance = function() { var source = this, hasObservable = false, observable; function getObservable() { if (!hasObservable) { hasObservable = true; observable = source['finally'](function() { hasObservable = false; }).publish().refCount(); } return observable; } return new AnonymousObservable(function(o) { return getObservable().subscribe(o); }); }; return Rx; }));