您当前的位置: 首页 >  区块链
  • 0浏览

    0关注

    1477博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

区块链Hyperledger Fabric背书过程中链码是并行还是串行?

软件工程小施同学 发布时间:2021-03-14 12:27:29 ,浏览量:0

声明:链码开发语言是golang,源码分析是基于fabric 1.4.0版本

 

用户链码与peer的关系

用户链码是一个独立的进程,使用docker封装(非dev模式下)。

链码容器由peer创建,在启动容器时指定了peer的地址,所以链码容器启动后能够找到peer,并建立tcp长连接,其中peer为服务端,协议是:grpc->http2->tcp。

switch ccType {
	case pb.ChaincodeSpec_GOLANG.String(), pb.ChaincodeSpec_CAR.String():
		lc.Args = []string{"chaincode", fmt.Sprintf("-peer.address=%s", c.PeerAddress)}
	case pb.ChaincodeSpec_JAVA.String():
		lc.Args = []string{"/root/chaincode-java/start", "--peerAddress", c.PeerAddress}
	case pb.ChaincodeSpec_NODE.String():
		lc.Args = []string{"/bin/sh", "-c", fmt.Sprintf("cd /usr/local/src; npm start -- --peer.address %s", c.PeerAddress)}
	default:
		return nil, errors.Errorf("unknown chaincodeType: %s", ccType)
	}

 

链码进程在和peer建立长连接之后,会有一个注册的动作,注册完成之后,链码处于ready状态,开始接收背书请求。 注:由于链码容器只会和一个peer建立长连接,所以一个链码容器不可能被多个peer使用,但可以被同一个peer的多个通道共用。

 

 

背书过程分析

peer在同时收到多个client的交易提案时(接收动作是并行,因为是不同的tcp长连接),

而把提案信息发送给链码,这个发送动作必然是串行的,因为只有一个tcp长连接!不过这些都不影响我们的链码编写。

接下来看链码侧逻辑。

链码进程启动之后,核心代码:

负责与peer交互的
函数:func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {}
位置:core/chaincode/shim/chaincode.go: 321

函数:func (handler *Handler) handleReady(msg *pb.ChaincodeMessage, errc chan error) error {}
位置:core/chaincode/shim/handler.go: 774

函数:func (handler *Handler) handleTransaction(msg *pb.ChaincodeMessage, errc chan error) {}
位置:core/chaincode/shim/handler.go: 238

在chatWithPeer函数里面,使用一个无限for循环,不停的接收peer发送的消息。

如果peer是ready状态,调用handleTransaction函数,这个函数里启动了一个goroutine(关键点),在这个goroutine函数里调用了我们的Invoke函数,并把invoke的结果返回给peer!

 

如果极短的时间内,链码收到了多个背书请求,就会创建多个goroutine,那么这些goroutine就可能在同一时刻被执行。更多细节可以了解goroutine调度相关资料。 现在确认了多次调用Invoke是并行的(至少开始执行是并行的),不管是否是同一个接口!

在我们的业务逻辑中,不可避免的会读写世界状态,两个goroutine中同时调用GetState/PutState,是否会互相影响,最终导致串行呢? 分析源码,以GetState为例,调用堆栈如下:

func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {}
	func (handler *Handler) handleGetState(collection string, key string, channelId string, txid string) ([]byte, error) {}
		func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {}
			func (handler *Handler) createChannel(channelID, txid string) (chan pb.ChaincodeMessage, error) {}
			func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {}
				func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) {}

callPeerWithChaincodeMsg源码如下:

func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {
	// Create the channel on which to communicate the response from the peer
	var respChan chan pb.ChaincodeMessage
	var err error
	if respChan, err = handler.createChannel(channelID, txid); err != nil { // 创建管道
		return pb.ChaincodeMessage{}, err
	}

	defer handler.deleteChannel(channelID, txid)  // 删除管道

	return handler.sendReceive(msg, respChan)
}

createChannel会创建一个管道,并以channelID+txid为key,缓存到Handler对象中,在收到peer返回的数据后,根据channelID+txid找到对应的管道,将数据放入管道中。

注意:创建管道时,如果channelID+txid对应的管道已存在,就返回失败,这意味着Invoke中无法通过创建goroutine来调用GetState等涉及与peer交互的接口。

 

sendReceive的源码如下:  

func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {
	errc := make(chan error, 1)
	handler.serialSendAsync(msg, errc) // 异步发送,发送动作的结果通过errc管道返回

	for { // 为了处理serialSendAsync发送失败的情况,才使用for
		select {
		case err :=             
关注
打赏
1665320866
查看更多评论
0.0429s