Skip to content
Documentation
API
StreamFunction

API StreamFunction

StreamFunction is a stateful serverless function that handle chunked data from Zipper, and return a chunked data to Zipper.

func yomo.NewStreamFunction

yomo.NewStreamFunction(name, zipperAddr string, opts ...SfnOption) StreamFunction

Create a stream function instance.

  • name: The name of the stream function.
  • zipperAddr: The endpoint of the Zipper to connect to.
  • opts: The SfnOption when create the stream function.

example:

sfn := yomo.NewStreamFunction(
  "stream-llm-inf-response", 
  "localhost:9000",
  yomo.WithCredential("token:123456abcdefg"),
)

type StreamFunction

sfn.SetObserveDataTags(tags ...Tag)

Set the data Tag list that will be observed from Zipper.

  • tags: The data Tag list.

sfn.Init(fn)

ℹ️

This feature is introduced in version 1.14

Set the fn as initializer for this serverless, It's Optional, if setted, the fn will be invoked only once at the first time this serverless start.

  • fn: The initializer handler with func () error signature.
sfn := NewStreamFunction(
	"test-sfn",
	"localhost:9000",
)
defer sfn.Close()
 
// Init fn, load the 7b model to GPU momery when this serverless start.
err := sfn.Init(func() error {
  return LoadModelToMemory('llama-2-7b-chat', 'tokenizer.model')
})
 
// handle every data to be predicted
sfn.SetHandler(...)
 
sfn.Connect()

sfn.SetHandler(fn AsyncHandler) error

Set the handler function in async mode, which accept the raw bytes data from Zipper, and return the raw bytes data to Zipper.

sfn := yomo.NewStreamFunction(
  "my-fn",
  "localhost:9000",
)
defer sfn.Close()
 
sfn.SetObserveDataTags(0x10)
 
sfn.SetHandler(func (ctx serverless.Context) {
  data := ctx.Data()
  log.Printf("✅ [my-fn] received <- %v", string(data))
})
 
err = sfn.Connect()

sfn.SetPipeHandler(fn PipeHandler) error

Set the handler function in blocking mode, which accept the raw bytes data from Zipper, and return the raw bytes data to Zipper.

sfn.SetErrorHandler(fn func(err error))

Set the error handler function when server error occurs.

  • fn: The error handler function.
    • err: The error.

sfn.Connect() error

Create a connection to Zipper, when data is received, the handler function will be called.

sfn.Write(tag Tag, data []byte) error

Write data to Zipper.

  • tag: The data Tag.
  • data: The raw bytes data to be wrote.

sfn.Close() error

Close the connection to Zipper.

sfn.Wait()

Wait until the function fulfilled.

type SfnOption

func WithObserveDataTags(tags ...Tag) SfnOption

Set data tag list which observed by this stream function.

  • tags: The Tag list.

func WithCredential(token string) SfnOption

Set the credential method when this Stream Function instance connect to Zipper.

  • token: The token string.

func WithClientTLSConfig(tc *tls.Config) SfnOption

Set TLS config for this Stream Function instance.

  • tc: The TLS config.

func WithClientQuicConfig(qc *quic.Config) SfnOption

Set QUIC config for this Stream Function instance.

WithLogger(logger *slog.Logger) SfnOption

Set the logger for this Source instance.

  • logger: The logger.

WithSfnTracerProvider(tp *tracesdk.TracerProvider) SfnOption

Set the tracer provider for this Source instance.

  • tp: The tracer provider.
tp, shutdown, err := trace.NewTracerProvider("yomo-sfn")
if err == nil {
	log.Println("[sfn] 🛰 trace enabled")
}
defer shutdown(context.Background())
 
// stateful function
sfn := yomo.NewStreamFunction(
	"sfn",
	"localhost:9000",
	yomo.WithSfnTracerProvider(tp),  // use tracer provider
)

More about YoMo Observability.

type AsyncHandler

type AsyncHandler func(reqData []byte) (respTag Tag, respData []byte)

The request-response mode handler function, async mode.

  • reqData: The raw bytes data received from Zipper.

Returns:

  • respTag: The Tag of the response data.
  • respData: The raw bytes data to be wrote to Zipper.

AsyncHandler is used to handle high concurrent requests, and the response data will be sent to Zipper after the handler function returns.

type PipeHandler

type PipeHandler func(in <-chan []byte, out chan<- *PayloadFrame)

The blocking mode handler function.

  • in: The input channel of the raw bytes data received from Zipper.
  • out: The output channel of the PayloadFrame to be wrote to Zipper.

PipeHandler is used to handle chunked stream data, like video stream, audio stream, behavior sequence data, etc. Ingress data will be guarantee the order, and the egress data will be guarantee the order too. By this, developers can read video stream data continuously, then handle the frames by an AI model, and write the inference result back to user instantly.