GRPC&Protobuf流模式定义
参考链接:
- https://www.cnblogs.com/FireworksEasyCool/p/12693749.html
- https://naotu.baidu.com/file/f54bd5e5649e7c56b1ecd8128fee315b?token=06237fec57a11eea
grpc
中的stream
,srteam
顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互
RPC数据交互模式
- 简单模式
这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这就是我们上面写的那种简单的
RPC
模式
- 服务端数据流
这种模式是客户端发起一次请求,服务端返回一段连续的数据流。
典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。
- 客户端数据流
与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。
典型的例子是物联网终端向服务器报送数据。
- 双向数据流
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。
典型的例子是聊天机器人。
流模式 protobuf 协议定义&编译
其实和之前写的普通模式差不多~
- 如果客户端传入参数为流数据则需要加
stream
关键词修饰 - 服务端响应为流数据,则需要加
stream
关键词修饰
stream.proto
syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
//声明 包名
option go_package="./steam";
//声明grpc服务
service Greeter {
rpc GetStream (StreamReqData) returns (stream StreamResData){} //服务端 推送流
rpc PutStream (stream StreamReqData) returns (StreamResData){} // 客户端 推送流
rpc AllStream (stream StreamReqData) returns (stream StreamResData){} //双向流。
}
//stream请求结构
message StreamReqData {
string data = 1;
}
//stream返回结构
message StreamResData {
string data = 1;
}
执行编译
protoc -I=. --go_out=./proto --go-grpc_out=./proto ./proto/stream.proto
服务端推流模式
定义服务
方法都是和之前一样的
syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
//声明 包名
option go_package = "./steam";
//声明grpc服务
service Greeter {
// GetStream 服务端接收来自客户端发送的参数StreamReqData,服务端处理后,返回给客户端 流数据类型stream StreamResData
rpc GetStream (StreamReqData) returns (stream StreamResData){} //服务端 推送流
}
//GetStreamReqData GetStreamResData 客户端发送消息给服务端,服务端返回流数据!
message StreamReqData {
string data = 1;
}
message StreamResData {
bytes data = 1;
}
Client.GetStream
Client 调用 GetStream 获取服务器推送的流!
这里我们定义的业务是:
客户端传递一个文件名称发送给服务端!服务端根据文件名在自己服务器目录查找!
这里我们创建了2个测试文件,里面分别有一下内容!
└── server
├── 1.txt
├── 2.txt
❯ cat 1.txt 2.txt
1
大本钟早已响起
午夜的消息
高跟鞋配上暴雨
难过又美丽
灯光在扑朔迷离
危险的气息
沉睡在纸醉金迷
消失的彻底
1
2
诶呀诶呀 怎么忘了我是其中的一个
在每一天都扮演上天安排的角色
诶呀诶呀 怎么忘了付出爱情的代价
你别再问了想要答案就去看看吧
那些孤独的人啊
夜晚是否还回家
那些灵魂早已 无处安放
流浪到天涯
那些无声的回答
2
然后读取文件内容(循环全部文件内容长度,循环每 1 / 500 s读一次文件)返回给客户端 ,然后客户端将流数据转换成本地文件!
// GetStream 客户端调用 GetStream 从服务端获取流数据
func (s SteamServer) GetStream(data *steam.StreamReqData, server steam.Greeter_GetStreamServer) error {
var wg sync.WaitGroup
order := data.Data
fileByteChan := make(chan []byte)
// 一个 GO routine 用于读取服务端文件!
wg.Add(1)
go func() {
// 客户端传递参数为文件名称!然后服务端读取文件内容,然后通过流的方式慢慢的发回给客户端!
filename := fmt.Sprintf("/Users/hope/sdy/microway/server/%v.txt", order)
file, err := os.ReadFile(filename)
if err != nil {
log.Println("ReadFile", err)
}
fileByteChan <- file
wg.Done()
}()
// 一个 GO routine 阻塞在 fileByteChan 等读取完后发给客户端!
wg.Add(1)
go func() {
file := <-fileByteChan
// 每 0.5 秒循环一 byte 发送给客户端!
for i := 0; i < len(file); i++ {
b := []byte{file[i]}
err := server.Send(&steam.StreamResData{Data: b})
if err != nil {
log.Println("StreamResData", err)
}
}
wg.Done()
}()
wg.Wait()
return nil
}
客户端CLIENT端调用代码!客户端接受到的 数据存入 []byte
, 数据传输结束后,写入本地!
func GetStreamCall(client *steam.GreeterClient) {
var wg sync.WaitGroup
c := *client
// 调用服务端的方法!
stream, err := c.GetStream(context.Background(), &steam.StreamReqData{Data: "1"})
if err != nil {
fmt.Println(err.Error())
return
}
// 开一个GO ROUNTINE 去获取服务端流 + 写文件
wg.Add(1)
go func() {
var res []byte
// 调用后就等着流返回!
for {
r, err := stream.Recv() // 持续获取服务端的响应
if err != nil || err == io.EOF { //如果有错误 或者 流传输完成 就结束了
break
}
// 将服务端传递过来的
res = append(res, r.Data...)
}
stream.CloseSend()
//可以使用CloseSend()关闭stream,这样服务端就不会继续产生流消息
//调用CloseSend()后,若继续调用Recv(),会重新激活stream,接着之前结果获取消息
f, err := os.Create("/Users/hope/sdy/microway/client/xx.txt")
if err != nil {
log.Println("Create", err.Error())
}
_, err = f.Write(res)
if err != nil {
log.Println("Write", err.Error())
}
err = f.Sync()
if err != nil {
log.Println("Sync", err.Error())
}
wg.Done()
}()
wg.Wait()
}
客户端推流模式
这个例子比较完整的地方在于:客户端和服务端双方,每个阶段完成任务的时候,都会通知对方!
定义 protobuf
syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
//声明 包名
option go_package = "./steam";
//声明grpc服务
service Greeter {
rpc PutStream (stream StreamReqData) returns (StreamResData){} //客户端 推送流
}
message StreamReqData {
bytes data = 1;
}
message StreamResData {
string data = 1;
}
执行 protoc,生成GO&GRPC
protoc -I=. --go_out=./proto --go-grpc_out=./proto ./proto/stream.proto
这里我们先对客户端进行业务实现!
这里我们写的业务是,拿到一个远程的 音频链接!请求到的数据转换为 []byte
,然后每次发送 100 个 byte
给服务端!
func PutStreamCall(client *steam.GreeterClient) {
c := *client
var wg sync.WaitGroup
var audioDataChan = make(chan []byte)
stream, err := c.PutStream(context.Background())
if err != nil {
log.Println("PutStream", err.Error())
}
// 开一个 GO routine 远程音频数据 放到channel !
wg.Add(1)
go func() {
url := "/Users/hope/sdy/microway/client/Die For You - The Weeknd.mp3"
file, err := os.ReadFile(url)
if err != nil {
log.Println("ReadFile", err)
}
audioDataChan <- file
wg.Done()
}()
wg.Add(1)
// 这里的GO ROUTINE 就是阻塞在音频chan 当有数据过来的时候,就执行发送给服务端
go func() {
all := <-audioDataChan
fmt.Println(len(all))
step := 1
for i := 0; i < len(all); i += step {
err := stream.Send(&steam.StreamReqData{
Data: all[i : i+step],
})
if err != nil {
log.Println("Send", err.Error())
break
}
}
// 告知服务端发送结束!服务端Recv 的 err == EOF
err = stream.CloseSend()
if err != nil {
log.Println("CloseSend", err.Error())
}
wg.Done()
}()
// 需要一个GO ROUTINE 来知道服务端处理完!客户端才能退出
// 存在网络延迟 客户端发送完成!已经退出了,此时客户端断开连接
// 这种情况会导致服务端 RPC 异常! rpc error: code = Canceled desc = context canceled
wg.Add(1)
go func() {
for {
var str steam.StreamResData
err := stream.RecvMsg(&str)
// 客户端知道服务端处理完成后,就可以断开了
if err != nil || str.Data == "ok" {
break
}
}
wg.Done()
}()
wg.Wait()
}
服务端接收完成后!保存为本地文件!
// PutStream 客户端调用推流 服务端接收
func (s SteamServer) PutStream(server steam.Greeter_PutStreamServer) error {
var wg sync.WaitGroup
byChain := make(chan []byte)
wg.Add(1)
go func() {
data := make([]byte, 0)
for {
// 持续接收 客户端发来的流 放入 byChain
rev, err := server.Recv()
if err != nil || err == io.EOF {
log.Println("Recv", err)
break
}
data = append(data, rev.Data...)
}
byChain <- data
wg.Done()
}()
wg.Add(1)
go func() {
data := <-byChain
create, err := os.Create("/Users/hope/sdy/microway/server/xx.mp3")
if err != nil {
log.Println("Create", err)
}
_, err = create.Write(data)
if err != nil {
log.Println("Write", err)
}
err = create.Sync()
if err != nil {
log.Println("Sync", err)
}
// 发送给客户端 告知全部处理完成了!让客户端可以安全断开了
err = server.SendAndClose(&steam.StreamResData{Data: "ok"})
if err != nil {
log.Println("SendAndClose", err)
}
wg.Done()
}()
wg.Wait()
return nil
}
服务端客户端双向推流模式
定义 protobuf !
syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
//声明 包名
option go_package = "./steam";
//声明grpc服务
service Greeter {
// 服务端和客户端都是推流!
rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
}
message StreamReqData {
bytes data = 1;
}
message StreamResData {
bytes data = 1;
}
业务需求!
客户端向服务端推送一个文件的流!
[文件内容]
原因就在于声明式的代码更容易维护!我不用去维护DOM的创建过程,插入过程!
我只在乎,我要的结果!在代码上表现的越直观!
那么就越容易维护!其他的操作,我们就封装在内部!
服务器端接受后,从服务端获取读取授权文件
[文件内容]
========================================================================
== MIT ==
== CH ==
== MIT ==
========================================================================
读取授权信息合并到客户端传来的文件流中
然后再把文件流返回给客户端
[合成后的文件内容]
========================================================================
== MIT ==
== CH ==
== MIT ==
========================================================================
原因就在于声明式的代码更容易维护!我不用去维护DOM的创建过程,插入过程!
我只在乎,我要的结果!在代码上表现的越直观!
那么就越容易维护!其他的操作,我们就封装在内部!
客户端接口到服务器端的流后!保存为文件
SERVER 端实现
func (s SteamServer) AllStream(server steam.Greeter_AllStreamServer) error {
var wg sync.WaitGroup
byteChain := make(chan []byte)
wg.Add(1)
done := false
go func() {
// 用户接受客户端的goroutine!
for {
recv, err := server.Recv()
if err != nil || err == io.EOF {
fmt.Print("Recv", err)
done = true
break
}
byteChain <- recv.Data
}
wg.Done()
}()
wg.Add(1)
go func() {
// 读取Server端的本地授权文件
fileByte, err := os.ReadFile("/Users/hope/sdy/microway/server/auth.txt")
if err != nil {
log.Println(err)
}
//用户合并文件的goroutine
for {
byteData := <-byteChain
fileByte = append(fileByte, byteData...)
fmt.Println(len(fileByte))
if done {
break
}
}
fmt.Println("合并完成!", len(fileByte))
create, _ := os.Create("/Users/hope/sdy/microway/server/auth_sample.txt")
create.Write(fileByte)
create.Sync()
// 然后把弄好的流发回给客户端
err = server.Send(&steam.StreamResData{Data: fileByte})
if err != nil {
log.Println(err)
}
fmt.Println("发送完成!")
wg.Done()
}()
wg.Wait()
return nil
}
这里用到了两 goroutine 一个 g1 用来获取 客户端的流 server.Recv()
, 循环的时候都会将数据放入 byteChain
go func() {
// 用户接受客户端的goroutine!
for {
recv, err := server.Recv()
if err != nil || err == io.EOF {
fmt.Print("Recv", err)
done = true
break
}
byteChain <- recv.Data
}
wg.Done()
}()
当客户端发送完成后!客户端会调用 CloseSend
这样 服务端 Recv()
的 err
就会抛出EOF
服务端就知道已经接受完成!
第二个gorountine读取本地的授权文件,读取结束后!阻塞在 byteChain
,当读取结束后,进入后面的文件合并环节!合并后就发回给客户端!
CLIENT端实现
func AllStreamCall(client *steam.GreeterClient) {
var wg sync.WaitGroup
c := *client
g, err := c.AllStream(context.Background())
if err != nil {
log.Fatalln(err)
}
wg.Add(1)
byteChain := make(chan []byte)
// 1. 首先读取本地的文件 发送给服务端!
go func() {
b, err := os.ReadFile("/Users/hope/sdy/microway/client/sample.txt")
if err != nil {
log.Fatalln("ReadFile", err)
}
step := 1
// 发送数据!
for i := 0; i < len(b); i += step {
err = g.SendMsg(&steam.StreamReqData{
Data: b[i : i+step],
})
}
log.Println("发送完成!")
err = g.CloseSend() // 这个调用了 关闭流 服务端才知道发送结束了!
if err != nil {
log.Fatalln("ReadFile", err)
}
wg.Done()
}()
wg.Add(1)
go func() {
//等待服务端合并文件后的返回响应
for {
rec, err := g.Recv()
if err != nil || err == io.EOF {
fmt.Println(err)
break
}
byteChain <- rec.Data
wg.Done()
}
}()
wg.Add(1)
go func() {
select {
case data := <-byteChain:
fmt.Println(len(data))
create, _ := os.Create("/Users/hope/sdy/microway/client/authed_sample.txt")
create.Write(data)
create.Sync()
}
wg.Done()
}()
wg.Wait()
}
评论区