声明:链码开发语言是golang,源码分析是基于fabric 1.4.0版本
用户链码与peer的关系
用户链码是一个独立的进程,使用docker封装(非dev模式下)。
链码容器由peer创建,在启动容器时指定了peer的地址,所以链码容器启动后能够找到peer,并建立tcp长连接,其中peer为服务端,协议是:grpc->http2->tcp。
switch ccType {
case pb.ChaincodeSpec_GOLANG.String(), pb.ChaincodeSpec_CAR.String():
lc.Args = []string{"chaincode", fmt.Sprintf("-peer.address=%s", c.PeerAddress)}
case pb.ChaincodeSpec_JAVA.String():
lc.Args = []string{"/root/chaincode-java/start", "--peerAddress", c.PeerAddress}
case pb.ChaincodeSpec_NODE.String():
lc.Args = []string{"/bin/sh", "-c", fmt.Sprintf("cd /usr/local/src; npm start -- --peer.address %s", c.PeerAddress)}
default:
return nil, errors.Errorf("unknown chaincodeType: %s", ccType)
}
链码进程在和peer建立长连接之后,会有一个注册的动作,注册完成之后,链码处于ready状态,开始接收背书请求。 注:由于链码容器只会和一个peer建立长连接,所以一个链码容器不可能被多个peer使用,但可以被同一个peer的多个通道共用。
背书过程分析
peer在同时收到多个client的交易提案时(接收动作是并行,因为是不同的tcp长连接),
而把提案信息发送给链码,这个发送动作必然是串行的,因为只有一个tcp长连接!不过这些都不影响我们的链码编写。
接下来看链码侧逻辑。
链码进程启动之后,核心代码:
负责与peer交互的
函数:func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {}
位置:core/chaincode/shim/chaincode.go: 321
函数:func (handler *Handler) handleReady(msg *pb.ChaincodeMessage, errc chan error) error {}
位置:core/chaincode/shim/handler.go: 774
函数:func (handler *Handler) handleTransaction(msg *pb.ChaincodeMessage, errc chan error) {}
位置:core/chaincode/shim/handler.go: 238
在chatWithPeer函数里面,使用一个无限for循环,不停的接收peer发送的消息。
如果peer是ready状态,调用handleTransaction函数,这个函数里启动了一个goroutine(关键点),在这个goroutine函数里调用了我们的Invoke函数,并把invoke的结果返回给peer!
如果极短的时间内,链码收到了多个背书请求,就会创建多个goroutine,那么这些goroutine就可能在同一时刻被执行。更多细节可以了解goroutine调度相关资料。 现在确认了多次调用Invoke是并行的(至少开始执行是并行的),不管是否是同一个接口!
在我们的业务逻辑中,不可避免的会读写世界状态,两个goroutine中同时调用GetState/PutState,是否会互相影响,最终导致串行呢? 分析源码,以GetState为例,调用堆栈如下:
func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {}
func (handler *Handler) handleGetState(collection string, key string, channelId string, txid string) ([]byte, error) {}
func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {}
func (handler *Handler) createChannel(channelID, txid string) (chan pb.ChaincodeMessage, error) {}
func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {}
func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) {}
callPeerWithChaincodeMsg源码如下:
func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {
// Create the channel on which to communicate the response from the peer
var respChan chan pb.ChaincodeMessage
var err error
if respChan, err = handler.createChannel(channelID, txid); err != nil { // 创建管道
return pb.ChaincodeMessage{}, err
}
defer handler.deleteChannel(channelID, txid) // 删除管道
return handler.sendReceive(msg, respChan)
}
createChannel会创建一个管道,并以channelID+txid为key,缓存到Handler对象中,在收到peer返回的数据后,根据channelID+txid找到对应的管道,将数据放入管道中。
注意:创建管道时,如果channelID+txid对应的管道已存在,就返回失败,这意味着Invoke中无法通过创建goroutine来调用GetState等涉及与peer交互的接口。
sendReceive的源码如下:
func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {
errc := make(chan error, 1)
handler.serialSendAsync(msg, errc) // 异步发送,发送动作的结果通过errc管道返回
for { // 为了处理serialSendAsync发送失败的情况,才使用for
select {
case err :=
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?