import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { Map } from '../util/Map'; import { FastMap } from '../util/FastMap'; /* tslint:enable:max-line-length */ /** * Groups the items emitted by an Observable according to a specified criterion, * and emits these grouped items as `GroupedObservables`, one * {@link GroupedObservable} per group. * * * * @example Group objects by id and return as array * Observable.of({id: 1, name: 'aze1'}, * {id: 2, name: 'sf2'}, * {id: 2, name: 'dg2'}, * {id: 1, name: 'erg1'}, * {id: 1, name: 'df1'}, * {id: 2, name: 'sfqfb2'}, * {id: 3, name: 'qfs3'}, * {id: 2, name: 'qsgqsfg2'} * ) * .groupBy(p => p.id) * .flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], [])) * .subscribe(p => console.log(p)); * * // displays: * // [ { id: 1, name: 'aze1' }, * // { id: 1, name: 'erg1' }, * // { id: 1, name: 'df1' } ] * // * // [ { id: 2, name: 'sf2' }, * // { id: 2, name: 'dg2' }, * // { id: 2, name: 'sfqfb2' }, * // { id: 2, name: 'qsgqsfg2' } ] * // * // [ { id: 3, name: 'qfs3' } ] * * @example Pivot data on the id field * Observable.of({id: 1, name: 'aze1'}, * {id: 2, name: 'sf2'}, * {id: 2, name: 'dg2'}, * {id: 1, name: 'erg1'}, * {id: 1, name: 'df1'}, * {id: 2, name: 'sfqfb2'}, * {id: 3, name: 'qfs1'}, * {id: 2, name: 'qsgqsfg2'} * ) * .groupBy(p => p.id, p => p.name) * .flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], ["" + group$.key])) * .map(arr => ({'id': parseInt(arr[0]), 'values': arr.slice(1)})) * .subscribe(p => console.log(p)); * * // displays: * // { id: 1, values: [ 'aze1', 'erg1', 'df1' ] } * // { id: 2, values: [ 'sf2', 'dg2', 'sfqfb2', 'qsgqsfg2' ] } * // { id: 3, values: [ 'qfs1' ] } * * @param {function(value: T): K} keySelector A function that extracts the key * for each item. * @param {function(value: T): R} [elementSelector] A function that extracts the * return element for each item. * @param {function(grouped: GroupedObservable): Observable} [durationSelector] * A function that returns an Observable to determine how long each group should * exist. * @return {Observable>} An Observable that emits * GroupedObservables, each of which corresponds to a unique key value and each * of which emits those items from the source Observable that share that key * value. * @method groupBy * @owner Observable */ export function groupBy(keySelector, elementSelector, durationSelector, subjectSelector) { return (source) => source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); } class GroupByOperator { constructor(keySelector, elementSelector, durationSelector, subjectSelector) { this.keySelector = keySelector; this.elementSelector = elementSelector; this.durationSelector = durationSelector; this.subjectSelector = subjectSelector; } call(subscriber, source) { return source.subscribe(new GroupBySubscriber(subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector)); } } /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ class GroupBySubscriber extends Subscriber { constructor(destination, keySelector, elementSelector, durationSelector, subjectSelector) { super(destination); this.keySelector = keySelector; this.elementSelector = elementSelector; this.durationSelector = durationSelector; this.subjectSelector = subjectSelector; this.groups = null; this.attemptedToUnsubscribe = false; this.count = 0; } _next(value) { let key; try { key = this.keySelector(value); } catch (err) { this.error(err); return; } this._group(value, key); } _group(value, key) { let groups = this.groups; if (!groups) { groups = this.groups = typeof key === 'string' ? new FastMap() : new Map(); } let group = groups.get(key); let element; if (this.elementSelector) { try { element = this.elementSelector(value); } catch (err) { this.error(err); } } else { element = value; } if (!group) { group = this.subjectSelector ? this.subjectSelector() : new Subject(); groups.set(key, group); const groupedObservable = new GroupedObservable(key, group, this); this.destination.next(groupedObservable); if (this.durationSelector) { let duration; try { duration = this.durationSelector(new GroupedObservable(key, group)); } catch (err) { this.error(err); return; } this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this))); } } if (!group.closed) { group.next(element); } } _error(err) { const groups = this.groups; if (groups) { groups.forEach((group, key) => { group.error(err); }); groups.clear(); } this.destination.error(err); } _complete() { const groups = this.groups; if (groups) { groups.forEach((group, key) => { group.complete(); }); groups.clear(); } this.destination.complete(); } removeGroup(key) { this.groups.delete(key); } unsubscribe() { if (!this.closed) { this.attemptedToUnsubscribe = true; if (this.count === 0) { super.unsubscribe(); } } } } /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ class GroupDurationSubscriber extends Subscriber { constructor(key, group, parent) { super(group); this.key = key; this.group = group; this.parent = parent; } _next(value) { this.complete(); } /** @deprecated internal use only */ _unsubscribe() { const { parent, key } = this; this.key = this.parent = null; if (parent) { parent.removeGroup(key); } } } /** * An Observable representing values belonging to the same group represented by * a common key. The values emitted by a GroupedObservable come from the source * Observable. The common key is available as the field `key` on a * GroupedObservable instance. * * @class GroupedObservable */ export class GroupedObservable extends Observable { constructor(key, groupSubject, refCountSubscription) { super(); this.key = key; this.groupSubject = groupSubject; this.refCountSubscription = refCountSubscription; } /** @deprecated internal use only */ _subscribe(subscriber) { const subscription = new Subscription(); const { refCountSubscription, groupSubject } = this; if (refCountSubscription && !refCountSubscription.closed) { subscription.add(new InnerRefCountSubscription(refCountSubscription)); } subscription.add(groupSubject.subscribe(subscriber)); return subscription; } } /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ class InnerRefCountSubscription extends Subscription { constructor(parent) { super(); this.parent = parent; parent.count++; } unsubscribe() { const parent = this.parent; if (!parent.closed && !this.closed) { super.unsubscribe(); parent.count -= 1; if (parent.count === 0 && parent.attemptedToUnsubscribe) { parent.unsubscribe(); } } } } //# sourceMappingURL=groupBy.js.map