windowTime and windowToggle Opertor in Angular

In Angular, specifically when using RxJS for reactive programming, windowTime and windowToggle are operators that allow you to divide an observable into windows of emissions. These windows are themselves observables that emit the values from the source observable.

windowTime

The windowTime operator divides the source observable into windows based on time. Each window is an observable that emits items from the source observable during a specified time span. When the time span elapses, a new window starts.

Usage

import { of } from 'rxjs';
import { windowTime, mergeAll, map, take } from 'rxjs/operators';

// Create an observable that emits a value every 100ms
const source$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).pipe(
  map((val, index) => index),
  take(10)
);

// Create a window that collects values for 300ms
const example$ = source$.pipe(
  windowTime(300),
  mergeAll() // Flatten the observable of windows into a single observable
);

example$.subscribe(value => console.log(value));

Parameters

  • windowTimeSpan: The time span (in milliseconds) of each window.
  • windowCreationInterval (optional): The interval (in milliseconds) at which to start new windows.
  • maxWindowSize (optional): The maximum number of values each window can hold before being closed.

windowToggle

The windowToggle operator opens and closes windows based on emissions from two other observables: one that signals when to open a window, and another that signals when to close the window.

Usage

import { interval, Subject } from 'rxjs';
import { windowToggle, mergeAll, take } from 'rxjs/operators';

// Create an observable that emits a value every second
const source$ = interval(1000).pipe(take(10));

// Create a notifier that emits every 3 seconds to open a window
const open$ = interval(3000);

// Create a notifier that emits after 2 seconds to close a window
const close$ = interval(2000);

// Use windowToggle to create windows based on the open$ and close$ observables
const example$ = source$.pipe(
  windowToggle(open$, () => close$),
  mergeAll() // Flatten the observable of windows into a single observable
);

example$.subscribe(value => console.log(value));

Parameters

  • openings: An observable that signals when to open a new window.
  • closingSelector: A function that returns an observable signaling when to close the window opened by the corresponding emissions from the openings observable.

Practical Example

import { of, timer } from 'rxjs';
import { windowTime, windowToggle, mergeAll, map } from 'rxjs/operators';

// Example using windowTime
const source1$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
const example1$ = source1$.pipe(
  windowTime(2000), // window every 2 seconds
  mergeAll()
);

example1$.subscribe(val => console.log('windowTime:', val));

// Example using windowToggle
const source2$ = timer(0, 1000); // emit value every second
const openNotifier$ = timer(0, 5000); // open window every 5 seconds
const closeNotifier$ = timer(3000); // close window 3 seconds after it opens

const example2$ = source2$.pipe(
  windowToggle(openNotifier$, () => closeNotifier$),
  mergeAll()
);

example2$.subscribe(val => console.log('windowToggle:', val));

In these examples

  • The windowTime operator creates windows of emissions every 2 seconds, printing each value within those time windows.
  • The windowToggle operator opens a new window every 5 seconds and closes each window 3 seconds after it is opened, printing the values emitted during those windows.

These operators are powerful for batching, grouping, or buffering data streams in a time-based or event-driven manner.


Similar Articles