617 lines
19 KiB
JavaScript
617 lines
19 KiB
JavaScript
// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
|
|
|
|
;(function (factory) {
|
|
var objectTypes = {
|
|
'function': true,
|
|
'object': true
|
|
};
|
|
|
|
function checkGlobal(value) {
|
|
return (value && value.Object === Object) ? value : null;
|
|
}
|
|
|
|
var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
|
|
var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
|
|
var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
|
|
var freeSelf = checkGlobal(objectTypes[typeof self] && self);
|
|
var freeWindow = checkGlobal(objectTypes[typeof window] && window);
|
|
var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
|
|
var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
|
|
var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
|
|
|
|
// Because of build optimizers
|
|
if (typeof define === 'function' && define.amd) {
|
|
define(['./rx'], function (Rx, exports) {
|
|
return factory(root, exports, Rx);
|
|
});
|
|
} else if (typeof module === 'object' && module && module.exports === freeExports) {
|
|
module.exports = factory(root, module.exports, require('./rx'));
|
|
} else {
|
|
root.Rx = factory(root, {}, root.Rx);
|
|
}
|
|
}.call(this, function (root, exp, Rx, undefined) {
|
|
|
|
// References
|
|
var Observable = Rx.Observable,
|
|
observableProto = Observable.prototype,
|
|
AnonymousObservable = Rx.AnonymousObservable,
|
|
AbstractObserver = Rx.internals.AbstractObserver,
|
|
CompositeDisposable = Rx.CompositeDisposable,
|
|
BinaryDisposable = Rx.BinaryDisposable,
|
|
NAryDisposable = Rx.NAryDisposable,
|
|
Notification = Rx.Notification,
|
|
Subject = Rx.Subject,
|
|
Observer = Rx.Observer,
|
|
disposableEmpty = Rx.Disposable.empty,
|
|
disposableCreate = Rx.Disposable.create,
|
|
inherits = Rx.internals.inherits,
|
|
addProperties = Rx.internals.addProperties,
|
|
defaultScheduler = Rx.Scheduler['default'],
|
|
currentThreadScheduler = Rx.Scheduler.currentThread,
|
|
identity = Rx.helpers.identity,
|
|
isScheduler = Rx.Scheduler.isScheduler,
|
|
isFunction = Rx.helpers.isFunction,
|
|
checkDisposed = Rx.Disposable.checkDisposed;
|
|
|
|
var errorObj = {e: {}};
|
|
|
|
function tryCatcherGen(tryCatchTarget) {
|
|
return function tryCatcher() {
|
|
try {
|
|
return tryCatchTarget.apply(this, arguments);
|
|
} catch (e) {
|
|
errorObj.e = e;
|
|
return errorObj;
|
|
}
|
|
};
|
|
}
|
|
|
|
var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
|
|
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
|
|
return tryCatcherGen(fn);
|
|
};
|
|
|
|
function thrower(e) {
|
|
throw e;
|
|
}
|
|
|
|
/**
|
|
* Used to pause and resume streams.
|
|
*/
|
|
Rx.Pauser = (function (__super__) {
|
|
inherits(Pauser, __super__);
|
|
function Pauser() {
|
|
__super__.call(this);
|
|
}
|
|
|
|
/**
|
|
* Pauses the underlying sequence.
|
|
*/
|
|
Pauser.prototype.pause = function () { this.onNext(false); };
|
|
|
|
/**
|
|
* Resumes the underlying sequence.
|
|
*/
|
|
Pauser.prototype.resume = function () { this.onNext(true); };
|
|
|
|
return Pauser;
|
|
}(Subject));
|
|
|
|
var PausableObservable = (function (__super__) {
|
|
inherits(PausableObservable, __super__);
|
|
function PausableObservable(source, pauser) {
|
|
this.source = source;
|
|
this.controller = new Subject();
|
|
this.paused = true;
|
|
|
|
if (pauser && pauser.subscribe) {
|
|
this.pauser = this.controller.merge(pauser);
|
|
} else {
|
|
this.pauser = this.controller;
|
|
}
|
|
|
|
__super__.call(this);
|
|
}
|
|
|
|
PausableObservable.prototype._subscribe = function (o) {
|
|
var conn = this.source.publish(),
|
|
subscription = conn.subscribe(o),
|
|
connection = disposableEmpty;
|
|
|
|
var pausable = this.pauser.startWith(!this.paused).distinctUntilChanged().subscribe(function (b) {
|
|
if (b) {
|
|
connection = conn.connect();
|
|
} else {
|
|
connection.dispose();
|
|
connection = disposableEmpty;
|
|
}
|
|
});
|
|
|
|
return new NAryDisposable([subscription, connection, pausable]);
|
|
};
|
|
|
|
PausableObservable.prototype.pause = function () {
|
|
this.paused = true;
|
|
this.controller.onNext(false);
|
|
};
|
|
|
|
PausableObservable.prototype.resume = function () {
|
|
this.paused = false;
|
|
this.controller.onNext(true);
|
|
};
|
|
|
|
return PausableObservable;
|
|
|
|
}(Observable));
|
|
|
|
/**
|
|
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false.
|
|
* @example
|
|
* var pauser = new Rx.Subject();
|
|
* var source = Rx.Observable.interval(100).pausable(pauser);
|
|
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
|
|
* @returns {Observable} The observable sequence which is paused based upon the pauser.
|
|
*/
|
|
observableProto.pausable = function (pauser) {
|
|
return new PausableObservable(this, pauser);
|
|
};
|
|
|
|
function combineLatestSource(source, subject, resultSelector) {
|
|
return new AnonymousObservable(function (o) {
|
|
var hasValue = [false, false],
|
|
hasValueAll = false,
|
|
isDone = false,
|
|
values = new Array(2),
|
|
err;
|
|
|
|
function next(x, i) {
|
|
values[i] = x;
|
|
hasValue[i] = true;
|
|
if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
|
|
if (err) { return o.onError(err); }
|
|
var res = tryCatch(resultSelector).apply(null, values);
|
|
if (res === errorObj) { return o.onError(res.e); }
|
|
o.onNext(res);
|
|
}
|
|
isDone && values[1] && o.onCompleted();
|
|
}
|
|
|
|
return new BinaryDisposable(
|
|
source.subscribe(
|
|
function (x) {
|
|
next(x, 0);
|
|
},
|
|
function (e) {
|
|
if (values[1]) {
|
|
o.onError(e);
|
|
} else {
|
|
err = e;
|
|
}
|
|
},
|
|
function () {
|
|
isDone = true;
|
|
values[1] && o.onCompleted();
|
|
}),
|
|
subject.subscribe(
|
|
function (x) {
|
|
next(x, 1);
|
|
},
|
|
function (e) { o.onError(e); },
|
|
function () {
|
|
isDone = true;
|
|
next(true, 1);
|
|
})
|
|
);
|
|
}, source);
|
|
}
|
|
|
|
var PausableBufferedObservable = (function (__super__) {
|
|
inherits(PausableBufferedObservable, __super__);
|
|
function PausableBufferedObservable(source, pauser) {
|
|
this.source = source;
|
|
this.controller = new Subject();
|
|
this.paused = true;
|
|
|
|
if (pauser && pauser.subscribe) {
|
|
this.pauser = this.controller.merge(pauser);
|
|
} else {
|
|
this.pauser = this.controller;
|
|
}
|
|
|
|
__super__.call(this);
|
|
}
|
|
|
|
PausableBufferedObservable.prototype._subscribe = function (o) {
|
|
var q = [], previousShouldFire;
|
|
|
|
function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } }
|
|
|
|
var subscription =
|
|
combineLatestSource(
|
|
this.source,
|
|
this.pauser.startWith(!this.paused).distinctUntilChanged(),
|
|
function (data, shouldFire) {
|
|
return { data: data, shouldFire: shouldFire };
|
|
})
|
|
.subscribe(
|
|
function (results) {
|
|
if (previousShouldFire !== undefined && results.shouldFire !== previousShouldFire) {
|
|
previousShouldFire = results.shouldFire;
|
|
// change in shouldFire
|
|
if (results.shouldFire) { drainQueue(); }
|
|
} else {
|
|
previousShouldFire = results.shouldFire;
|
|
// new data
|
|
if (results.shouldFire) {
|
|
o.onNext(results.data);
|
|
} else {
|
|
q.push(results.data);
|
|
}
|
|
}
|
|
},
|
|
function (err) {
|
|
drainQueue();
|
|
o.onError(err);
|
|
},
|
|
function () {
|
|
drainQueue();
|
|
o.onCompleted();
|
|
}
|
|
);
|
|
return subscription;
|
|
};
|
|
|
|
PausableBufferedObservable.prototype.pause = function () {
|
|
this.paused = true;
|
|
this.controller.onNext(false);
|
|
};
|
|
|
|
PausableBufferedObservable.prototype.resume = function () {
|
|
this.paused = false;
|
|
this.controller.onNext(true);
|
|
};
|
|
|
|
return PausableBufferedObservable;
|
|
|
|
}(Observable));
|
|
|
|
/**
|
|
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false,
|
|
* and yields the values that were buffered while paused.
|
|
* @example
|
|
* var pauser = new Rx.Subject();
|
|
* var source = Rx.Observable.interval(100).pausableBuffered(pauser);
|
|
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
|
|
* @returns {Observable} The observable sequence which is paused based upon the pauser.
|
|
*/
|
|
observableProto.pausableBuffered = function (pauser) {
|
|
return new PausableBufferedObservable(this, pauser);
|
|
};
|
|
|
|
var ControlledObservable = (function (__super__) {
|
|
inherits(ControlledObservable, __super__);
|
|
function ControlledObservable (source, enableQueue, scheduler) {
|
|
__super__.call(this);
|
|
this.subject = new ControlledSubject(enableQueue, scheduler);
|
|
this.source = source.multicast(this.subject).refCount();
|
|
}
|
|
|
|
ControlledObservable.prototype._subscribe = function (o) {
|
|
return this.source.subscribe(o);
|
|
};
|
|
|
|
ControlledObservable.prototype.request = function (numberOfItems) {
|
|
return this.subject.request(numberOfItems == null ? -1 : numberOfItems);
|
|
};
|
|
|
|
return ControlledObservable;
|
|
|
|
}(Observable));
|
|
|
|
var ControlledSubject = (function (__super__) {
|
|
inherits(ControlledSubject, __super__);
|
|
function ControlledSubject(enableQueue, scheduler) {
|
|
enableQueue == null && (enableQueue = true);
|
|
|
|
__super__.call(this);
|
|
this.subject = new Subject();
|
|
this.enableQueue = enableQueue;
|
|
this.queue = enableQueue ? [] : null;
|
|
this.requestedCount = 0;
|
|
this.requestedDisposable = null;
|
|
this.error = null;
|
|
this.hasFailed = false;
|
|
this.hasCompleted = false;
|
|
this.scheduler = scheduler || currentThreadScheduler;
|
|
}
|
|
|
|
addProperties(ControlledSubject.prototype, Observer, {
|
|
_subscribe: function (o) {
|
|
return this.subject.subscribe(o);
|
|
},
|
|
onCompleted: function () {
|
|
this.hasCompleted = true;
|
|
if (!this.enableQueue || this.queue.length === 0) {
|
|
this.subject.onCompleted();
|
|
this.disposeCurrentRequest();
|
|
} else {
|
|
this.queue.push(Notification.createOnCompleted());
|
|
}
|
|
},
|
|
onError: function (error) {
|
|
this.hasFailed = true;
|
|
this.error = error;
|
|
if (!this.enableQueue || this.queue.length === 0) {
|
|
this.subject.onError(error);
|
|
this.disposeCurrentRequest();
|
|
} else {
|
|
this.queue.push(Notification.createOnError(error));
|
|
}
|
|
},
|
|
onNext: function (value) {
|
|
if (this.requestedCount <= 0) {
|
|
this.enableQueue && this.queue.push(Notification.createOnNext(value));
|
|
} else {
|
|
(this.requestedCount-- === 0) && this.disposeCurrentRequest();
|
|
this.subject.onNext(value);
|
|
}
|
|
},
|
|
_processRequest: function (numberOfItems) {
|
|
if (this.enableQueue) {
|
|
while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) {
|
|
var first = this.queue.shift();
|
|
first.accept(this.subject);
|
|
if (first.kind === 'N') {
|
|
numberOfItems--;
|
|
} else {
|
|
this.disposeCurrentRequest();
|
|
this.queue = [];
|
|
}
|
|
}
|
|
}
|
|
|
|
return numberOfItems;
|
|
},
|
|
request: function (number) {
|
|
this.disposeCurrentRequest();
|
|
var self = this;
|
|
|
|
this.requestedDisposable = this.scheduler.schedule(number,
|
|
function(s, i) {
|
|
var remaining = self._processRequest(i);
|
|
var stopped = self.hasCompleted || self.hasFailed;
|
|
if (!stopped && remaining > 0) {
|
|
self.requestedCount = remaining;
|
|
|
|
return disposableCreate(function () {
|
|
self.requestedCount = 0;
|
|
});
|
|
// Scheduled item is still in progress. Return a new
|
|
// disposable to allow the request to be interrupted
|
|
// via dispose.
|
|
}
|
|
});
|
|
|
|
return this.requestedDisposable;
|
|
},
|
|
disposeCurrentRequest: function () {
|
|
if (this.requestedDisposable) {
|
|
this.requestedDisposable.dispose();
|
|
this.requestedDisposable = null;
|
|
}
|
|
}
|
|
});
|
|
|
|
return ControlledSubject;
|
|
}(Observable));
|
|
|
|
/**
|
|
* Attaches a controller to the observable sequence with the ability to queue.
|
|
* @example
|
|
* var source = Rx.Observable.interval(100).controlled();
|
|
* source.request(3); // Reads 3 values
|
|
* @param {bool} enableQueue truthy value to determine if values should be queued pending the next request
|
|
* @param {Scheduler} scheduler determines how the requests will be scheduled
|
|
* @returns {Observable} The observable sequence which only propagates values on request.
|
|
*/
|
|
observableProto.controlled = function (enableQueue, scheduler) {
|
|
|
|
if (enableQueue && isScheduler(enableQueue)) {
|
|
scheduler = enableQueue;
|
|
enableQueue = true;
|
|
}
|
|
|
|
if (enableQueue == null) { enableQueue = true; }
|
|
return new ControlledObservable(this, enableQueue, scheduler);
|
|
};
|
|
|
|
var StopAndWaitObservable = (function (__super__) {
|
|
inherits(StopAndWaitObservable, __super__);
|
|
function StopAndWaitObservable (source) {
|
|
__super__.call(this);
|
|
this.source = source;
|
|
}
|
|
|
|
function scheduleMethod(s, self) {
|
|
return self.source.request(1);
|
|
}
|
|
|
|
StopAndWaitObservable.prototype._subscribe = function (o) {
|
|
this.subscription = this.source.subscribe(new StopAndWaitObserver(o, this, this.subscription));
|
|
return new BinaryDisposable(
|
|
this.subscription,
|
|
defaultScheduler.schedule(this, scheduleMethod)
|
|
);
|
|
};
|
|
|
|
var StopAndWaitObserver = (function (__sub__) {
|
|
inherits(StopAndWaitObserver, __sub__);
|
|
function StopAndWaitObserver (observer, observable, cancel) {
|
|
__sub__.call(this);
|
|
this.observer = observer;
|
|
this.observable = observable;
|
|
this.cancel = cancel;
|
|
this.scheduleDisposable = null;
|
|
}
|
|
|
|
StopAndWaitObserver.prototype.completed = function () {
|
|
this.observer.onCompleted();
|
|
this.dispose();
|
|
};
|
|
|
|
StopAndWaitObserver.prototype.error = function (error) {
|
|
this.observer.onError(error);
|
|
this.dispose();
|
|
};
|
|
|
|
function innerScheduleMethod(s, self) {
|
|
return self.observable.source.request(1);
|
|
}
|
|
|
|
StopAndWaitObserver.prototype.next = function (value) {
|
|
this.observer.onNext(value);
|
|
this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod);
|
|
};
|
|
|
|
StopAndWaitObserver.dispose = function () {
|
|
this.observer = null;
|
|
if (this.cancel) {
|
|
this.cancel.dispose();
|
|
this.cancel = null;
|
|
}
|
|
if (this.scheduleDisposable) {
|
|
this.scheduleDisposable.dispose();
|
|
this.scheduleDisposable = null;
|
|
}
|
|
__sub__.prototype.dispose.call(this);
|
|
};
|
|
|
|
return StopAndWaitObserver;
|
|
}(AbstractObserver));
|
|
|
|
return StopAndWaitObservable;
|
|
}(Observable));
|
|
|
|
|
|
/**
|
|
* Attaches a stop and wait observable to the current observable.
|
|
* @returns {Observable} A stop and wait observable.
|
|
*/
|
|
ControlledObservable.prototype.stopAndWait = function () {
|
|
return new StopAndWaitObservable(this);
|
|
};
|
|
|
|
var WindowedObservable = (function (__super__) {
|
|
inherits(WindowedObservable, __super__);
|
|
function WindowedObservable(source, windowSize) {
|
|
__super__.call(this);
|
|
this.source = source;
|
|
this.windowSize = windowSize;
|
|
}
|
|
|
|
function scheduleMethod(s, self) {
|
|
return self.source.request(self.windowSize);
|
|
}
|
|
|
|
WindowedObservable.prototype._subscribe = function (o) {
|
|
this.subscription = this.source.subscribe(new WindowedObserver(o, this, this.subscription));
|
|
return new BinaryDisposable(
|
|
this.subscription,
|
|
defaultScheduler.schedule(this, scheduleMethod)
|
|
);
|
|
};
|
|
|
|
var WindowedObserver = (function (__sub__) {
|
|
inherits(WindowedObserver, __sub__);
|
|
function WindowedObserver(observer, observable, cancel) {
|
|
this.observer = observer;
|
|
this.observable = observable;
|
|
this.cancel = cancel;
|
|
this.received = 0;
|
|
this.scheduleDisposable = null;
|
|
__sub__.call(this);
|
|
}
|
|
|
|
WindowedObserver.prototype.completed = function () {
|
|
this.observer.onCompleted();
|
|
this.dispose();
|
|
};
|
|
|
|
WindowedObserver.prototype.error = function (error) {
|
|
this.observer.onError(error);
|
|
this.dispose();
|
|
};
|
|
|
|
function innerScheduleMethod(s, self) {
|
|
return self.observable.source.request(self.observable.windowSize);
|
|
}
|
|
|
|
WindowedObserver.prototype.next = function (value) {
|
|
this.observer.onNext(value);
|
|
this.received = ++this.received % this.observable.windowSize;
|
|
this.received === 0 && (this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod));
|
|
};
|
|
|
|
WindowedObserver.prototype.dispose = function () {
|
|
this.observer = null;
|
|
if (this.cancel) {
|
|
this.cancel.dispose();
|
|
this.cancel = null;
|
|
}
|
|
if (this.scheduleDisposable) {
|
|
this.scheduleDisposable.dispose();
|
|
this.scheduleDisposable = null;
|
|
}
|
|
__sub__.prototype.dispose.call(this);
|
|
};
|
|
|
|
return WindowedObserver;
|
|
}(AbstractObserver));
|
|
|
|
return WindowedObservable;
|
|
}(Observable));
|
|
|
|
/**
|
|
* Creates a sliding windowed observable based upon the window size.
|
|
* @param {Number} windowSize The number of items in the window
|
|
* @returns {Observable} A windowed observable based upon the window size.
|
|
*/
|
|
ControlledObservable.prototype.windowed = function (windowSize) {
|
|
return new WindowedObservable(this, windowSize);
|
|
};
|
|
|
|
/**
|
|
* Pipes the existing Observable sequence into a Node.js Stream.
|
|
* @param {Stream} dest The destination Node.js stream.
|
|
* @returns {Stream} The destination stream.
|
|
*/
|
|
observableProto.pipe = function (dest) {
|
|
var source = this.pausableBuffered();
|
|
|
|
function onDrain() {
|
|
source.resume();
|
|
}
|
|
|
|
dest.addListener('drain', onDrain);
|
|
|
|
source.subscribe(
|
|
function (x) {
|
|
!dest.write(x) && source.pause();
|
|
},
|
|
function (err) {
|
|
dest.emit('error', err);
|
|
},
|
|
function () {
|
|
// Hack check because STDIO is not closable
|
|
!dest._isStdio && dest.end();
|
|
dest.removeListener('drain', onDrain);
|
|
});
|
|
|
|
source.resume();
|
|
|
|
return dest;
|
|
};
|
|
|
|
return Rx;
|
|
}));
|