Rx

What is Rx?

Rx is the abbreviation for ReactiveX. ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

Rx supports almost all the popular programming languages, YoMo uses RxGo and also adds some new operators. YoMo built atop Functional Reactive Programming to reduce the complexity of streaming computing.

Rx

Why use Rx?

For the senario of continuous high-frequency data, for example, IoT devices generate data in 24 hours. The data needs to be processed in real-time through streaming computing, and the learning cost of streaming computing is very high.

The ReactiveX (Rx) Observable model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays. It frees you from tangled webs of callbacks, and thereby makes your code more readable and less prone to bugs.

getDataFromNetwork()
  .skip(10)
  .take(5)
  .map((s) => s + ' transformed')
  .subscribe((it) => {
    console.log('onNext => ' + it)
  })

YoMo uses QUIC protocol to transfer data, and abstracts QUIC Stream into RxStream in yomo-flow. It supports all operators provided by RxGo library, and adds several new operators for YoMo scenarios.

Supported operators in RxStream

New Operators in YoMo

  • AuditTime - ignore for given time then emit most recent item
  • StdOut - print the item in standard output
  • Subscribe - observe the key of streaming data via Y3 Codec
  • Y3Decoder - trigger the callback function and decode the data while the key is observed by Y3 Codec

Transforming Observables

  • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
  • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
  • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
  • GroupByDynamic — divide an Observable into a dynamic set of Observables that each emit GroupedObservables from the original Observable, organized by key
  • Map — transform the items emitted by an Observable by applying a function to each item
  • Marshal — transform the items emitted by an Observable by applying a marshalling function to each item
  • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • Unmarshal — transform the items emitted by an Observable by applying an unmarshalling function to each item
  • Window — apply a function to each item emitted by an Observable, sequentially, and emit each successive value

Filtering Observables

  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
  • Distinct/DistinctUntilChanged — suppress duplicate items emitted by an Observable
  • ElementAt — emit only item n emitted by an Observable
  • Filter — emit only those items from an Observable that pass a predicate test
  • Find — emit the first item passing a predicate then complete
  • First/FirstOrDefault — emit only the first item or the first item that meets a condition, from an Observable
  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
  • Last/LastOrDefault — emit only the last item emitted by an Observable
  • Sample — emit the most recent item emitted by an Observable within periodic time intervals
  • Skip — suppress the first n items emitted by an Observable
  • SkipLast — suppress the last n items emitted by an Observable
  • Take — emit only the first n items emitted by an Observable
  • TakeLast — emit only the last n items emitted by an Observable

Combining Observables

  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
  • Merge — combine multiple Observables into one by merging their emissions
  • StartWithIterable — emit a specified sequence of items before beginning to emit the items from the source Iterable
  • ZipFromIterable — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Error Handling Operators

  • Catch — recover from an onError notification by continuing the sequence without error
  • Retry/BackOffRetry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error

Observable Utility Operators

  • Do - register an action to take upon a variety of Observable lifecycle events
  • Run — create an Observer without consuming the emitted items
  • Send — send the Observable items in a specific channel
  • Serialize — force an Observable to make serialized calls and to be well-behaved
  • TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
  • Timestamp — attach a timestamp to each item emitted by an Observable

Conditional and Boolean Operators

  • All — determine whether all items emitted by an Observable meet some criteria
  • Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
  • Contains — determine whether an Observable emits a particular item or not
  • DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
  • SequenceEqual — determine whether two Observables emit the same sequence of items
  • SkipWhile — discard items emitted by an Observable until a specified condition becomes false
  • TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
  • TakeWhile — discard items emitted by an Observable after a specified condition becomes false

Mathematical and Aggregate Operators

  • Average — calculates the average of numbers emitted by an Observable and emits this average
  • Concat — emit the emissions from two or more Observables without interleaving them
  • Count — count the number of items emitted by the source Observable and emit only this value
  • Max — determine, and emit, the maximum-valued item emitted by an Observable
  • Min — determine, and emit, the minimum-valued item emitted by an Observable
  • Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
  • Sum — calculate the sum of numbers emitted by an Observable and emit this sum

Operators to Convert Observables