目 录CONTENT

文章目录
Go

GRPC&Protobuf流模式定义

Hello!你好!我是村望~!
2023-05-10 / 0 评论 / 3 点赞 / 213 阅读 / 3,189 字
温馨提示:
我不想探寻任何东西的意义,我只享受当下思考的快乐~

GRPC&Protobuf流模式定义

参考链接:

grpc 中的 streamsrteam 顾名思义 就是 一种 ,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端客户端 长时间 数据交互

RPC数据交互模式

  1. 简单模式

这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这就是我们上面写的那种简单的RPC模式

  1. 服务端数据流

这种模式是客户端发起一次请求服务端返回一段连续的数据流

典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端

  1. 客户端数据流

与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应

典型的例子是物联网终端向服务器报送数据。

  1. 双向数据流

顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送也就是可以实现实时交互

典型的例子是聊天机器人。

流模式 protobuf 协议定义&编译

其实和之前写的普通模式差不多~

  • 如果客户端传入参数为流数据则需要加stream 关键词修饰
  • 服务端响应为流数据,则需要加 stream关键词修饰

stream.proto

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名
option go_package="./steam";

//声明grpc服务
service Greeter {
  rpc GetStream (StreamReqData) returns (stream StreamResData){} //服务端 推送流
  rpc PutStream (stream StreamReqData) returns (StreamResData){} // 客户端 推送流
  rpc AllStream (stream StreamReqData) returns (stream StreamResData){} //双向流。
}


//stream请求结构
message StreamReqData {
  string data = 1;
}
//stream返回结构
message StreamResData {
  string data = 1;
}

执行编译

protoc -I=. --go_out=./proto --go-grpc_out=./proto ./proto/stream.proto

服务端推流模式

定义服务

方法都是和之前一样的

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名
option go_package = "./steam";

//声明grpc服务
service Greeter {
  // GetStream 服务端接收来自客户端发送的参数StreamReqData,服务端处理后,返回给客户端 流数据类型stream StreamResData
  rpc GetStream (StreamReqData) returns (stream StreamResData){} //服务端 推送流
}


//GetStreamReqData GetStreamResData 客户端发送消息给服务端,服务端返回流数据!
message StreamReqData {
  string data = 1;
}
message StreamResData {
  bytes data = 1;
}

Client.GetStream Client 调用 GetStream 获取服务器推送的流!

这里我们定义的业务是:

客户端传递一个文件名称发送给服务端!服务端根据文件名在自己服务器目录查找!

这里我们创建了2个测试文件,里面分别有一下内容!

└── server
    ├── 1.txt
    ├── 2.txt
❯ cat 1.txt 2.txt
1
大本钟早已响起
午夜的消息
高跟鞋配上暴雨
难过又美丽
灯光在扑朔迷离
危险的气息
沉睡在纸醉金迷
消失的彻底
1
2
诶呀诶呀 怎么忘了我是其中的一个
在每一天都扮演上天安排的角色
诶呀诶呀 怎么忘了付出爱情的代价
你别再问了想要答案就去看看吧
那些孤独的人啊
夜晚是否还回家
那些灵魂早已 无处安放
流浪到天涯
那些无声的回答
2

然后读取文件内容(循环全部文件内容长度,循环每 1 / 500 s读一次文件)返回给客户端 ,然后客户端将流数据转换成本地文件!

// GetStream 客户端调用 GetStream 从服务端获取流数据
func (s SteamServer) GetStream(data *steam.StreamReqData, server steam.Greeter_GetStreamServer) error {
	var wg sync.WaitGroup
	order := data.Data
	fileByteChan := make(chan []byte)

	// 一个 GO routine 用于读取服务端文件!
	wg.Add(1)
	go func() {
		// 客户端传递参数为文件名称!然后服务端读取文件内容,然后通过流的方式慢慢的发回给客户端!
		filename := fmt.Sprintf("/Users/hope/sdy/microway/server/%v.txt", order)
		file, err := os.ReadFile(filename)
		if err != nil {
			log.Println("ReadFile", err)
		}
		fileByteChan <- file
		wg.Done()
	}()

	// 一个 GO routine 阻塞在 fileByteChan 等读取完后发给客户端!
	wg.Add(1)
	go func() {
		file := <-fileByteChan
		// 每 0.5 秒循环一 byte 发送给客户端!
		for i := 0; i < len(file); i++ {
			b := []byte{file[i]}
			err := server.Send(&steam.StreamResData{Data: b})
			if err != nil {
				log.Println("StreamResData", err)
			}
		}
		wg.Done()
	}()
	wg.Wait()
	return nil
}

客户端CLIENT端调用代码!客户端接受到的 数据存入 []byte, 数据传输结束后,写入本地!

func GetStreamCall(client *steam.GreeterClient) {
	var wg sync.WaitGroup
	c := *client
	// 调用服务端的方法!
	stream, err := c.GetStream(context.Background(), &steam.StreamReqData{Data: "1"})
	if err != nil {
		fmt.Println(err.Error())
		return
	}

	// 开一个GO ROUNTINE 去获取服务端流 + 写文件
	wg.Add(1)
	go func() {
		var res []byte
		// 调用后就等着流返回!
		for {
			r, err := stream.Recv()          // 持续获取服务端的响应
			if err != nil || err == io.EOF { //如果有错误 或者 流传输完成 就结束了
				break
			}
			// 将服务端传递过来的
			res = append(res, r.Data...)
		}
		stream.CloseSend()
		//可以使用CloseSend()关闭stream,这样服务端就不会继续产生流消息
		//调用CloseSend()后,若继续调用Recv(),会重新激活stream,接着之前结果获取消息
		f, err := os.Create("/Users/hope/sdy/microway/client/xx.txt")
		if err != nil {
			log.Println("Create", err.Error())
		}
		_, err = f.Write(res)
		if err != nil {
			log.Println("Write", err.Error())

		}
		err = f.Sync()
		if err != nil {
			log.Println("Sync", err.Error())
		}
		wg.Done()
	}()
	wg.Wait()
}

May-09-2023 15-07-11

客户端推流模式

这个例子比较完整的地方在于:客户端和服务端双方,每个阶段完成任务的时候,都会通知对方!

定义 protobuf

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名
option go_package = "./steam";

//声明grpc服务
service Greeter {
  rpc PutStream (stream StreamReqData) returns (StreamResData){} //客户端 推送流
}

message StreamReqData {
  bytes data = 1;
}
message StreamResData {
  string data = 1;
}

执行 protoc,生成GO&GRPC

protoc -I=. --go_out=./proto --go-grpc_out=./proto ./proto/stream.proto

这里我们先对客户端进行业务实现!

这里我们写的业务是,拿到一个远程的 音频链接!请求到的数据转换为 []byte ,然后每次发送 100 个 byte 给服务端!

func PutStreamCall(client *steam.GreeterClient) {
	c := *client
	var wg sync.WaitGroup
	var audioDataChan = make(chan []byte)
	stream, err := c.PutStream(context.Background())
	if err != nil {
		log.Println("PutStream", err.Error())
	}
	// 开一个 GO routine 远程音频数据 放到channel !
	wg.Add(1)
	go func() {
		url := "/Users/hope/sdy/microway/client/Die For You - The Weeknd.mp3"
		file, err := os.ReadFile(url)
		if err != nil {
			log.Println("ReadFile", err)
		}
		audioDataChan <- file
		wg.Done()
	}()
	wg.Add(1)
	// 这里的GO ROUTINE 就是阻塞在音频chan 当有数据过来的时候,就执行发送给服务端
	go func() {
		all := <-audioDataChan
		fmt.Println(len(all))

		step := 1
		for i := 0; i < len(all); i += step {
			err := stream.Send(&steam.StreamReqData{
				Data: all[i : i+step],
			})
			if err != nil {
				log.Println("Send", err.Error())
				break
			}
		}
		// 告知服务端发送结束!服务端Recv 的 err  ==  EOF
		err = stream.CloseSend()
		if err != nil {
			log.Println("CloseSend", err.Error())
		}
		wg.Done()
	}()

	// 需要一个GO ROUTINE 来知道服务端处理完!客户端才能退出
	// 存在网络延迟 客户端发送完成!已经退出了,此时客户端断开连接
	// 这种情况会导致服务端 RPC 异常! rpc error: code = Canceled desc = context canceled
	wg.Add(1)
	go func() {
		for {
			var str steam.StreamResData
			err := stream.RecvMsg(&str)
			// 客户端知道服务端处理完成后,就可以断开了
			if err != nil || str.Data == "ok" {
				break
			}
		}
		wg.Done()
	}()
	wg.Wait()
}

服务端接收完成后!保存为本地文件!

// PutStream 客户端调用推流 服务端接收
func (s SteamServer) PutStream(server steam.Greeter_PutStreamServer) error {
	var wg sync.WaitGroup
	byChain := make(chan []byte)
	wg.Add(1)
	go func() {
		data := make([]byte, 0)
		for {
			// 持续接收 客户端发来的流 放入 byChain
			rev, err := server.Recv()
			if err != nil || err == io.EOF {
				log.Println("Recv", err)
				break
			}
			data = append(data, rev.Data...)
		}
		byChain <- data
		wg.Done()
	}()
	wg.Add(1)
	go func() {
		data := <-byChain
		create, err := os.Create("/Users/hope/sdy/microway/server/xx.mp3")
		if err != nil {
			log.Println("Create", err)
		}
		_, err = create.Write(data)
		if err != nil {
			log.Println("Write", err)
		}
		err = create.Sync()
		if err != nil {
			log.Println("Sync", err)
		}

		// 发送给客户端 告知全部处理完成了!让客户端可以安全断开了
		err = server.SendAndClose(&steam.StreamResData{Data: "ok"})
		if err != nil {
			log.Println("SendAndClose", err)
		}
		wg.Done()
	}()
	wg.Wait()
	return nil
}

服务端客户端双向推流模式

定义 protobuf !

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名
option go_package = "./steam";

//声明grpc服务
service Greeter {
  // 服务端和客户端都是推流!
  rpc AllStream (stream StreamReqData) returns (stream StreamResData){} 
}

message StreamReqData {
  bytes data = 1;
}
message StreamResData {
  bytes data = 1;
}

业务需求!

客户端向服务端推送一个文件的流!

[文件内容]

原因就在于声明式的代码更容易维护!我不用去维护DOM的创建过程,插入过程!
我只在乎,我要的结果!在代码上表现的越直观!
那么就越容易维护!其他的操作,我们就封装在内部!

服务器端接受后,从服务端获取读取授权文件

[文件内容]

========================================================================
==    MIT    ==
==    CH    ==
==    MIT    ==
========================================================================

读取授权信息合并到客户端传来的文件流中

然后再把文件流返回给客户端

[合成后的文件内容]

========================================================================
==    MIT    ==
==    CH    ==
==    MIT    ==
========================================================================


原因就在于声明式的代码更容易维护!我不用去维护DOM的创建过程,插入过程!
我只在乎,我要的结果!在代码上表现的越直观!
那么就越容易维护!其他的操作,我们就封装在内部!

客户端接口到服务器端的流后!保存为文件

SERVER 端实现

func (s SteamServer) AllStream(server steam.Greeter_AllStreamServer) error {
	var wg sync.WaitGroup
	byteChain := make(chan []byte)
	wg.Add(1)
	done := false
	go func() {
		// 用户接受客户端的goroutine!
		for {
			recv, err := server.Recv()
			if err != nil || err == io.EOF {
				fmt.Print("Recv", err)
				done = true
				break
			}
			byteChain <- recv.Data
		}
		wg.Done()
	}()
	wg.Add(1)
  
	go func() {
    // 读取Server端的本地授权文件
		fileByte, err := os.ReadFile("/Users/hope/sdy/microway/server/auth.txt")
		if err != nil {
			log.Println(err)
		}
		//用户合并文件的goroutine
		for {
			byteData := <-byteChain
			fileByte = append(fileByte, byteData...)
			fmt.Println(len(fileByte))
			if done {
				break
			}
		}
		fmt.Println("合并完成!", len(fileByte))
		create, _ := os.Create("/Users/hope/sdy/microway/server/auth_sample.txt")
		create.Write(fileByte)
		create.Sync()
		// 然后把弄好的流发回给客户端
		err = server.Send(&steam.StreamResData{Data: fileByte})
		if err != nil {
			log.Println(err)
		}
		fmt.Println("发送完成!")
		wg.Done()
	}()
	wg.Wait()
	return nil
}

这里用到了两 goroutine 一个 g1 用来获取 客户端的流 server.Recv() , 循环的时候都会将数据放入 byteChain

	go func() {
		// 用户接受客户端的goroutine!
		for {
			recv, err := server.Recv()
			if err != nil || err == io.EOF {
				fmt.Print("Recv", err)
				done = true
				break
			}
			byteChain <- recv.Data
		}
		wg.Done()
	}()

当客户端发送完成后!客户端会调用 CloseSend 这样 服务端 Recv()err 就会抛出EOF

服务端就知道已经接受完成!

第二个gorountine读取本地的授权文件,读取结束后!阻塞在 byteChain ,当读取结束后,进入后面的文件合并环节!合并后就发回给客户端!

CLIENT端实现

func AllStreamCall(client *steam.GreeterClient) {
	var wg sync.WaitGroup
	c := *client
	g, err := c.AllStream(context.Background())
	if err != nil {
		log.Fatalln(err)
	}
	wg.Add(1)
	byteChain := make(chan []byte)
	// 1. 首先读取本地的文件 发送给服务端!
	go func() {
		b, err := os.ReadFile("/Users/hope/sdy/microway/client/sample.txt")
		if err != nil {
			log.Fatalln("ReadFile", err)
		}
		step := 1
		// 发送数据!
		for i := 0; i < len(b); i += step {
			err = g.SendMsg(&steam.StreamReqData{
				Data: b[i : i+step],
			})
		}
		log.Println("发送完成!")
		err = g.CloseSend() // 这个调用了 关闭流 服务端才知道发送结束了!
		if err != nil {
			log.Fatalln("ReadFile", err)
		}
		wg.Done()
	}()
	wg.Add(1)
	go func() {
		//等待服务端合并文件后的返回响应
		for {
			rec, err := g.Recv()
			if err != nil || err == io.EOF {
				fmt.Println(err)
				break
			}
			byteChain <- rec.Data
			wg.Done()
		}
	}()
	wg.Add(1)
	go func() {
		select {
		case data := <-byteChain:
			fmt.Println(len(data))
			create, _ := os.Create("/Users/hope/sdy/microway/client/authed_sample.txt")
			create.Write(data)
			create.Sync()
		}
		wg.Done()
	}()
	wg.Wait()
}

May-10-2023 17-05-43

3

评论区