yomo-flow
What is yomo-flow?
yomo-flow
is a Streaming Serverless
function, the users only need to write the business logic code in this function to process the stream data.
For example:
func Handler(rxstream rx.RxStream) rx.RxStream {
stream := rxstream.
Subscribe(0x10).
OnObserve(callback).
AuditTime(100 * time.Millisecond).
Map(printer).
StdOut()
return stream
}
What can yomo-flow do?
For the real-time processing scenario of continuous high-frequency data, YoMo uses Functional Reactive Programming for programming paradigm to reduce the complexity of streaming computing
. YoMo uses QUIC
protocol to transfer data, and abstracts QUIC Stream
into RxStream
in yomo-flow, the users can use the operators in Rx to process the stream data.
Examples
Map to transform data
UseTakeLast to take last n items
UseHow to write yomo-flow?
1. Install CLI
# Make sure to use $GOPATH since golang requires the plugin and the main
# application to be highly coupled
$ echo $GOPATH
If $GOPATH
is not set, check here first: Setting $GOPATH and $GOBIN.
$ GO111MODULE=off go get github.com/yomorun/yomo
$ cd $GOPATH/src/github.com/yomorun/yomo
$ make install
2. Create a yomo-flow App
$ mkdir -p $GOPATH/src/github.com/{YOUR_GITHUB_USERNAME} && cd $_
$ yomo init yomo-app-demo
2020/12/29 13:03:57 Initializing the Serverless app...
2020/12/29 13:04:00 ✅ Congratulations! You have initialized the serverless app successfully.
2020/12/29 13:04:00 🎉 You can enjoy the YoMo Serverless via the command: yomo dev
$ cd yomo-app-demo
YoMo CLI will automatically create an app.go
with the following content:
package main
import (
"context"
"fmt"
"time"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/pkg/rx"
)
type NoiseData struct {
Noise float32 `y3:"0x11"`
Time int64 `y3:"0x12"`
From string `y3:"0x13"`
}
var printer = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(NoiseData)
fmt.Println("serverless get value:", value.Noise)
return value, nil
}
var callback = func(v []byte) (interface{}, error) {
var mold NoiseData
err := y3.ToObject(v, &mold)
if err != nil {
return nil, err
}
mold.Noise = mold.Noise / 10
return mold, nil
}
// Handler will handle data in Rx way
func Handler(rxstream rx.RxStream) rx.RxStream {
stream := rxstream.
Subscribe(0x10).
OnObserve(callback).
AuditTime(100 * time.Millisecond).
Map(printer).
StdOut()
return stream
}
3. Build and Run
Run yomo dev
from the terminal. You should see the following message:
Congratulations! You have created your first yomo-flow.
BTW:
yomo dev
automatically uses the mocking 'noise' data,yomo run
uses the real data fromyomo-source
.
4. Modify the code to your business code
-
YoMo encodes the data via Y3 Codec, the
Handler
method inyomo-flow
, the first step is decoding the data viaY3
, the first parameter ofY3Decoder
is the observekey
, the second parameter uses to store thedecode
value. -
Use operators to process the stream data.
Optional: Setting $GOPATH and $GOBIN
For the current session only:
export GOPATH=~/.go
export PATH=$GOPATH/bin:$PATH
To permanently set these variables, you need to edit .zshrc
or .bashrc
:
For zsh
users:
echo "export GOPATH=~/.go" >> .zshrc
echo "path+=$GOPATH/bin" >> .zshrc
For bash
users:
echo 'export GOPATH=~/.go' >> .bashrc
echo 'export PATH="$GOPATH/bin:$PATH"' >> ~/.bashrc