rpc调用的实现,从本服的角度来说既可是服务器,又可以是客户端,调用都是作为客户端发起的,回复则一般是从服务端来回复的。
RPCStream是本服到其他服的客户端,RPCStreamMap用来维护这些客户端,而RPCStreamMap又会被session类引用,这样设计的目的就是为了从网关服过来的消息跳转到其他服的时候是带客户端标记的。这里要求所有的RPC客户端只能被Session来持有,也就说所有的客户端都是有归属的,要么归属一个本服的一个默认session,要么归属一个从别的服来的session(一般归属客户端session)
例如客户端x和agenta服一个sessionXA,客户端y有个sessionYA,这两个session是被agent服持有的,此时如果客户端要发消息给unique服,会产生两个不同的session,分布是sessionXAU和sessionYAU,这两个服务是被unique持有的,从unique看来,sessionXAU和sessionYAU虽然底层是从agaent服来的,但是从逻辑上来讲,这两个session就是直接从客户端直连过来的,这样的好处就是,逻辑直连,底层中转
这里面有个两个核心的方法就是handle和rout, 如果在game和unique是作为服务端收到包那么就先处理,那么可以视为这个包是从别的服中转过来的,这个服就是终点了,处理完就原通道返回。如果是在agent中则还是中转,本身agent就是中转服 如果是作为客户端收到消息,那么基本上是从game或者unique收到消息就需要按包中转了
rpc调用帧头
Message:bytes
rpc服务
rpc Stream(stream pb.Frame) retuns (stream pb.Frame)
pb.Service的具体实现要重新Stream方法
Stream(stream:pb.Service_streamServer)
rpc流调用需要实现的方法,一次调用就会在真实的tcp通道上一个虚拟的流,也就是产生一个会话session,读取会话开始的头,会知道这个会话的UserID,如果有表示这个会话是从客户端发起并且中转过来的,读取clientServerKey来获取这个会话是从哪个服务器过来的,比如/backends/agent/agent2或/backends/unique/unique。读取clientServerType上面来源服务器的类型,比如agent或者unique,会话产生后放到map中保存
会话是核心概念,会话都是站在服务端的角度来说的,一个会话代表一个客户端的虚拟连接,所以会话的引用都是被服务端持有的
Flag:int32
info
UserID:string
客户端ID,比如134,代表用户的唯一id是app的id,如果id为空则表示这个session为内部连接,否则表示这个session表示逻辑上客户端连接过来的,也就是通过agent中转过来的
serverType:string
本服务器的类型,例如unique agent
serverKey:string
本服务的字符串,比如/backends/agent/agent2或/backends/unique/unique
clientServerKey:string
是通过哪个服务器作为客户端中转过来的,比如/backends/agent/agent2或/backends/unique/unique,这两个代表是从网关服和唯一服中转过来的
clientServerType:string
这个session对应的中转客户端的类型,比如agent或者unique
ServerStream:pb.Server_StreamServer
grpc客户端连接过来的时候调用的方法,这个是实现pb中的方法,这个方法中会建立新的session,同时会读取新建session的时候,客户端发过来的信息,设置userid、clientServerKey、clientServerType,同时开启新建session的recv和handle监听
outSideDie:chan struct{}
关闭session的标记
inFrame:chan *pb.Frame
协程通道,作为服务端接受客户端过来的通道,把通过recv方法接受过来的消息写入这个通道,与之对应的是MQ通道
closeCallBackList:*methodList
当本session关闭的回调函数列表
rpcClients:*rpcclients.PRCStreamMap
存储本session作为客户端到其他服务器的连接,这些客户端收到的消息都会写入关键的MQ通道
MQ:chan pb.Frame
协程通道,作为客户端从别的服务器接受到的通道消息,因为一个session还有持有多个去往别的服务器的rpc客户端,这个mq就是接受客户端的消息,例如agent服的mq就会是game服回给客户端的消息
SessionClose()
关闭这个session
DelRPCClientWithServerType(serverType:string)
删除本session作为客户端通往其他服务端类型的rpc连接
RegisterSessionDead(callback func(string)error)
删除本session作为客户端通往其他服务端ID的rpc连接
SendToStreamWithServerKey(key:string,m:proto.Message)
使用服务器id发送到服务器上,比如在agent上的session需要往unique1发送一条消息,那么就用这个方法key就是目的地的服务器id
SendToStreamServer(serverType:string,m:proto.Message)
使用服务器类型发送到服务上,比如unique上的session需要往game服发送一条消息,如果这个session已经有到game服的通道就使用这个通道,如果没有通道系统会从game类型的一组服务器中挑选一个,然后把这个消息发送到这个服务上面
SendToRandomStreamServer(serverType:string,m:proto.Message)
和SendToStreamServer类似,SendToStreamServer是如果有的话就不随机了,这个是强制随机
SendToClientAsServer(m proto.Message)
作为服务端把消息回给客户端
GetRPCStream(serverType:string)*rpcclient.RPCStream
获取一个到serveType类型(例如game unique)服务器的rpc客户端
GetRandomRPCStream(serverType:string)*rpcclient.RPCStream
类似GetRPCStream,只不过这个是随机
GetRPCStreamWithID(id:string)*rpcclient.RPCStream
获取一个到服务器ID(例如/backends/unique/unique)的rpc客户端
GetServerKey()string
返回serverType
recv(stream:pb.Service_StreamServer)
[go]接受流数据,读取数据,如果有数据,把数据写入inFrame通道
Handle(stream:pb.Service_StreamServer)
这个函数处理inFrame和MQ这两个通道,其中inFrame是作为服务器接受的消息,MQ是作为客户端的消息,收到消息后都是先进入路由rout方法处理,然后返回参数,如果参数为空表示这个包不在本机处理,已经在rout中转发到新的目的地了,如果不为空则表示已经在本机是这个包的重点,已经被处理过了,需要返回新包,这时候有个微妙的区别,如果是inFrame的新包,就直接通过本session的客户端通道返回了,如果是MQ的包表示这个是客户端收到的消息,在本机处理过后有可能接着往下个服务器中转,就调用sessionSendToServerUseProto来中转
route(p:[]byte)proto.Message
路由中转方法,首先解析这个包的目的地,
sessionSendToServerUseProto(data:proto.Message)
根据消息本身目的地,把这个消息中转往对应的服务器
close(stream:pb.Service_StreamServer)
当session被系统关闭,执行cloaseBallBackList的回调函数
客户端连接
list:map[string]map[string]*RPCStream
存储客户端连接的map,结构是map[serverType][serverKey]=RPCClient,例如map[“game”][“/backends/game/game1”] = RPCClient
lock *sync.RWMutex
操作map的锁
GetPRCStream(serverType:string)*RPCStrame
根据serverType,从list中取到一个客户端
DelPRCStream(serverType:string)
从map中删除serverType的客户端列表
Close()
循环关闭list中的所有session
TryGetRPCStream(serverType:string,myServerKey:string,userid:string,fr:chan pb.Frame)*RPCStream
关键方法,获取一个通往serverType服务器的客户端连接,userid是session维护的角色id,fr是用来传输数据的MQ通道,会先检查serverType的链接有没有,有的话直接返回,没有的话会从etcd中找到一组serverType服务器,并且从中挑选一个并发起连接
getRandomRPCStream(serverType:string,myServerKey:string,userid:string,fr:chan pb.Frame)*RPCStream
关键方法,类似TryGetRPCStream,这个是从serverType中强制随机一个
TryGetRPCStreamWithID(serverType:string,myServerKey:string,userid:string,fr:chan pb.Frame)*RPCStream
关键方法,和TryGetRPCStream是类似方法,只不过这个是指定ID服务器
GRPC客户端连接
GSID:string
连接的服务端的serverKey
UserID:string
如果是从客户端来的session需要记录客户端的userID
Stream:pb.Service_StreamClient
原生的客户端连接
close()
关闭客户端
rpcClientRecv(mq chan pb.Frame)
[go]方法,收取来自服务端的消息,并写入mq