您当前的位置: 首页 >  Java
  • 0浏览

    0关注

    1477博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

fisco bcos java-sdk-demo ParallelOkDemo.java 源码解析阅读注释

软件工程小施同学 发布时间:2021-04-30 11:34:31 ,浏览量:0

 

/**
 * Copyright 2014-2020 [fisco-dev]
 *
 * 

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * *

http://www.apache.org/licenses/LICENSE-2.0 * *

Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing permissions and * limitations under the License. */ package org.fisco.bcos.sdk.demo.perf.parallel; import com.google.common.util.concurrent.RateLimiter; import java.io.IOException; import java.math.BigInteger; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import org.fisco.bcos.sdk.demo.contract.ParallelOk; import org.fisco.bcos.sdk.demo.perf.callback.ParallelOkCallback; import org.fisco.bcos.sdk.demo.perf.collector.PerformanceCollector; import org.fisco.bcos.sdk.demo.perf.model.DagTransferUser; import org.fisco.bcos.sdk.demo.perf.model.DagUserInfo; import org.fisco.bcos.sdk.model.TransactionReceipt; import org.fisco.bcos.sdk.transaction.model.exception.ContractException; import org.fisco.bcos.sdk.utils.ThreadPoolService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ParallelOkDemo { private static final Logger logger = LoggerFactory.getLogger(ParallelOkDemo.class); private static AtomicInteger sended = new AtomicInteger(0); private AtomicInteger getted = new AtomicInteger(0); private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private final ParallelOk parallelOk; private final ThreadPoolService threadPoolService; private final PerformanceCollector collector; private final DagUserInfo dagUserInfo; public ParallelOkDemo( ParallelOk parallelOk, DagUserInfo dagUserInfo, ThreadPoolService threadPoolService) { this.threadPoolService = threadPoolService; this.parallelOk = parallelOk; this.dagUserInfo = dagUserInfo; this.collector = new PerformanceCollector(); } /** * 验证交易是否正确 * @param qps [description] * @throws InterruptedException [description] */ public void veryTransferData(BigInteger qps) throws InterruptedException { RateLimiter rateLimiter = RateLimiter.create(qps.intValue()); System.out.println("==================================================================="); AtomicInteger verifyFailed = new AtomicInteger(0); AtomicInteger verifySuccess = new AtomicInteger(0); final List userInfo = dagUserInfo.getUserList(); int userSize = userInfo.size(); for (int i = 0; i < userSize; i++) { rateLimiter.acquire(); final int userIndex = i; threadPoolService .getThreadPool() .execute( new Runnable() { @Override public void run() { try { String user = userInfo.get(userIndex).getUser(); // 链上数据 BigInteger balance = parallelOk.balanceOf(user); // 本地数据 BigInteger localAmount = userInfo.get(userIndex).getAmount(); // 线上数据是否和本地数据一致 if (localAmount.compareTo(balance) != 0) { logger.error( "local balance is not the same as the remote, user: {}, local balance: {}, remote balance: {}", user, localAmount, balance); verifyFailed.incrementAndGet(); } else { verifySuccess.incrementAndGet(); } } catch (ContractException exception) { verifyFailed.incrementAndGet(); logger.error( "get remote balance failed, error info: " + exception.getMessage()); } } }); } while (verifySuccess.get() + verifyFailed.get() < userSize) { Thread.sleep(40); } System.out.println("validation:"); System.out.println(" \tuser count is " + userSize); System.out.println(" \tverify_success count is " + verifySuccess); System.out.println(" \tverify_failed count is " + verifyFailed); } /** * 新增用户 * @param userCount [用户总数] * @param qps [发送速率] * @throws InterruptedException [description] * @throws IOException [description] */ public void userAdd(BigInteger userCount, BigInteger qps) throws InterruptedException, IOException { System.out.println("==================================================================="); System.out.println("Start UserAdd test, count " + userCount); // 设置速度 RateLimiter limiter = RateLimiter.create(qps.intValue()); // 当前时间 long currentSeconds = System.currentTimeMillis() / 1000L; // 分类门槛 Integer area = userCount.intValue() / 10; // 开始时间 long startTime = System.currentTimeMillis(); // 设置收集器需要搜集的交易总数 collector.setTotal(userCount.intValue()); // 设置收集器的开始时间 collector.setStartTimestamp(startTime); // 发送失败的总数 AtomicInteger sendFailed = new AtomicInteger(0); for (Integer i = 0; i < userCount.intValue(); i++) { final Integer index = i; limiter.acquire(); threadPoolService .getThreadPool() .execute( new Runnable() { @Override public void run() { // generate the user according to currentSeconds // 账户地址 String user = Long.toHexString(currentSeconds) + Integer.toHexString(index); // 账户余额 BigInteger amount = new BigInteger("1000000000"); // 用户对象 DagTransferUser dtu = new DagTransferUser(); dtu.setUser(user); dtu.setAmount(amount); // 设置回调 ParallelOkCallback callback = new ParallelOkCallback( collector, dagUserInfo, ParallelOkCallback.ADD_USER_CALLBACK); callback.setTimeout(0); callback.setUser(dtu); try { callback.recordStartTime(); // 发送交易 parallelOk.set(user, amount, callback); // 当前已经发送的交易总数 int current = sended.incrementAndGet(); if (current >= area && ((current % area) == 0)) { long elapsed = System.currentTimeMillis() - startTime; double sendSpeed = current / ((double) elapsed / 1000); System.out.println( "Already sended: " + current + "/" + userCount + " transactions" + ",QPS=" + sendSpeed); } } catch (Exception e) { logger.warn( "addUser failed, error info: {}", e.getMessage()); sendFailed.incrementAndGet(); TransactionReceipt receipt = new TransactionReceipt(); receipt.setStatus("-1"); receipt.setMessage( "userAdd failed, error info: " + e.getMessage()); callback.onResponse(receipt); } } }); } while (collector.getReceived().intValue() != userCount.intValue()) { logger.info( " sendFailed: {}, received: {}, total: {}", sendFailed.get(), collector.getReceived().intValue(), collector.getTotal()); Thread.sleep(100); } dagUserInfo.setContractAddr(parallelOk.getContractAddress()); dagUserInfo.writeDagTransferUser(); System.exit(0); } /** * 查询账户余额 * @param qps [description] * @throws InterruptedException [description] */ public void queryAccount(BigInteger qps) throws InterruptedException { final List allUsers = dagUserInfo.getUserList(); RateLimiter rateLimiter = RateLimiter.create(qps.intValue()); AtomicInteger sent = new AtomicInteger(0); for (Integer i = 0; i < allUsers.size(); i++) { final Integer index = i; rateLimiter.acquire(); threadPoolService .getThreadPool() .execute( new Runnable() { @Override public void run() { try { BigInteger result = parallelOk.balanceOf(allUsers.get(index).getUser()); allUsers.get(index).setAmount(result); int all = sent.incrementAndGet(); if (all >= allUsers.size()) { System.out.println( dateFormat.format(new Date()) + " Query account finished"); } } catch (ContractException exception) { logger.warn( "queryAccount for {} failed, error info: {}", allUsers.get(index).getUser(), exception.getMessage()); System.exit(0); } } }); } while (sent.get() < allUsers.size()) { Thread.sleep(50); } } /** * 转账交易 * @param count [转账交易的数量] * @param qps [发送速率] * @throws InterruptedException [description] * @throws IOException [description] */ public void userTransfer(BigInteger count, BigInteger qps) throws InterruptedException, IOException { System.out.println("Querying account info..."); // 查询账户余额 queryAccount(qps); System.out.println("Sending transfer transactions..."); // 设置交易发送速率 RateLimiter limiter = RateLimiter.create(qps.intValue()); // 分类门槛 int division = count.intValue() / 10; // 开始时间 long startTime = System.currentTimeMillis(); collector.setStartTimestamp(startTime); collector.setTotal(count.intValue()); // 发送失败的总数 AtomicInteger sendFailed = new AtomicInteger(0); for (Integer i = 0; i < count.intValue(); i++) { limiter.acquire(); final int index = i; threadPoolService .getThreadPool() .execute( new Runnable() { @Override public void run() { try { // 要转账的金额 Random random = new Random(); int r = random.nextInt(100); BigInteger amount = BigInteger.valueOf(r); ParallelOkCallback callback = new ParallelOkCallback( collector, dagUserInfo, ParallelOkCallback.TRANS_CALLBACK); callback.setTimeout(0); // 转账人 DagTransferUser from = dagUserInfo.getFrom(index); // 收账人 DagTransferUser to = dagUserInfo.getTo(index); callback.setFromUser(from); callback.setToUser(to); callback.setAmount(amount); callback.recordStartTime(); // 发送交易 parallelOk.transfer( from.getUser(), to.getUser(), amount, callback); int current = sended.incrementAndGet(); if (current >= division && ((current % division) == 0)) { long elapsed = System.currentTimeMillis() - startTime; double sendSpeed = current / ((double) elapsed / 1000); System.out.println( "Already sent: " + current + "/" + count + " transactions" + ",QPS=" + sendSpeed); } } catch (Exception e) { logger.error( "call transfer failed, error info: {}", e.getMessage()); TransactionReceipt receipt = new TransactionReceipt(); receipt.setStatus("-1"); receipt.setMessage( "call transfer failed, error info: " + e.getMessage()); collector.onMessage(receipt, Long.valueOf(0)); sendFailed.incrementAndGet(); } } }); } while (collector.getReceived().intValue() != count.intValue()) { Thread.sleep(3000); logger.info( "userTransfer: sendFailed: {}, received: {}, total: {}", sendFailed.get(), collector.getReceived().intValue(), collector.getTotal()); } veryTransferData(qps); System.exit(0); } }

 

关注
打赏
1665320866
查看更多评论
立即登录/注册

微信扫码登录

0.0421s