yomo-flow

What is yomo-flow?

yomo-flow is a Streaming Serverless function, the users only need to write the business logic code in this function to process the stream data. For example:

func Handler(rxstream rx.RxStream) rx.RxStream {
    stream := rxstream.
        Subscribe(0x10).
        OnObserve(callback).
        AuditTime(100 * time.Millisecond).
        Map(printer).
        StdOut()

    return stream
}

What can yomo-flow do?

For the real-time processing scenario of continuous high-frequency data, YoMo uses Functional Reactive Programming for programming paradigm to reduce the complexity of streaming computing. YoMo uses QUIC protocol to transfer data, and abstracts QUIC Stream into RxStream in yomo-flow, the users can use the operators in Rx to process the stream data.

Rx

Examples

Use Map to transform data

Map

Use TakeLast to take last n items

TakeLast

How to write yomo-flow?

1. Install CLI

# Make sure to use $GOPATH since golang requires the plugin and the main
# application to be highly coupled
$ echo $GOPATH

If $GOPATH is not set, check here first: Setting $GOPATH and $GOBIN.

$ GO111MODULE=off go get github.com/yomorun/yomo
$ cd $GOPATH/src/github.com/yomorun/yomo
$ make install

YoMo Tutorial 1

2. Create a yomo-flow App

$ mkdir -p $GOPATH/src/github.com/{YOUR_GITHUB_USERNAME} && cd $_
$ yomo init yomo-app-demo
2020/12/29 13:03:57 Initializing the Serverless app...
2020/12/29 13:04:00 ✅ Congratulations! You have initialized the serverless app successfully.
2020/12/29 13:04:00 🎉 You can enjoy the YoMo Serverless via the command: yomo dev
$ cd yomo-app-demo

YoMo Tutorial 2

YoMo CLI will automatically create an app.go with the following content:

package main

import (
    "context"
    "fmt"
    "time"

    y3 "github.com/yomorun/y3-codec-golang"
    "github.com/yomorun/yomo/pkg/rx"
)

type NoiseData struct {
    Noise float32 `y3:"0x11"`
    Time  int64   `y3:"0x12"`
    From  string  `y3:"0x13"`
}

var printer = func(_ context.Context, i interface{}) (interface{}, error) {
    value := i.(NoiseData)
    fmt.Println("serverless get value:", value.Noise)
    return value, nil
}

var callback = func(v []byte) (interface{}, error) {
    var mold NoiseData
    err := y3.ToObject(v, &mold)
    if err != nil {
        return nil, err
    }
    mold.Noise = mold.Noise / 10
    return mold, nil
}

// Handler will handle data in Rx way
func Handler(rxstream rx.RxStream) rx.RxStream {
    stream := rxstream.
        Subscribe(0x10).
        OnObserve(callback).
        AuditTime(100 * time.Millisecond).
        Map(printer).
        StdOut()

    return stream
}

3. Build and Run

Run yomo dev from the terminal. You should see the following message:

YoMo Tutorial 3

Congratulations! You have created your first yomo-flow.

BTW: yomo dev automatically uses the mocking 'noise' data,yomo run uses the real data from yomo-source.

4. Modify the code to your business code

  1. YoMo encodes the data via Y3 Codec, the Handler method in yomo-flow, the first step is decoding the data via Y3, the first parameter of Y3Decoder is the observe key, the second parameter uses to store the decode value.

  2. Use operators to process the stream data.

Optional: Setting $GOPATH and $GOBIN

For the current session only:

export GOPATH=~/.go
export PATH=$GOPATH/bin:$PATH

To permanently set these variables, you need to edit .zshrc or .bashrc:

For zsh users:

echo "export GOPATH=~/.go" >> .zshrc
echo "path+=$GOPATH/bin" >> .zshrc

For bash users:

echo 'export GOPATH=~/.go' >> .bashrc
echo 'export PATH="$GOPATH/bin:$PATH"' >> ~/.bashrc