您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Protocol Buffers

宝哥大数据 发布时间:2021-02-04 10:05:29 ,浏览量:0

Protocol Buffers介绍
  • Protocal Buffers(简称protobuf)是谷歌的一项技术,用于结构化的数据序列化、反序列化,常用于RPC 系统和持续数据存储系统。
  • 其类似于XML生成和解析,但protobuf的效率高于XML,不过protobuf生成的是字节码,可读性比XML差,类似的还有json、Java的Serializable等。
  • 很适合做数据存储或 RPC 数据交换格式。可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
  • 参考:https://zhuanlan.zhihu.com/p/53339153
Idea 安装protobuf插件

安装插件protobuf Support,之后重启

  • 找到资料包中的protobuf-jetbrains-plugin-0.13.0.zip,在IDEA中安装插件即可

在这里插入图片描述

使用ProtoBuf序列化数据 配置Maven依赖与插件

        
            com.google.protobuf
            protobuf-java
            3.4.0
        


    
        
            
                kr.motd.maven
                os-maven-plugin
                1.6.2
            
        
        
            
            
                org.xolstice.maven.plugins
                protobuf-maven-plugin
                0.5.0
                
                    ${project.basedir}/src/main/proto
                    
                        com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
                    
                
                
                    
                        
                            compile
                        
                    
                
            
        
    
编写 proto 文件
  • protobuf3的语法参考讲义中的「 protobuf3 语法」

  • 在main文件夹下,创建 proto 目录,并编写proto文件

syntax = "proto3";
option java_package = "com.itheima.protobuf";
option java_outer_classname = "DemoModel";

message User {
    int32 id = 1;
    string name = 2;
    string sex = 3;
}

注意:classname不能与message name一样

protobuf与java类型对照表 .proto TypeJava Type备注doubledoublefloatfloatint32int使用可变长度编码。负数编码效率低下–如果您的字段可能具有负值,请改用sint32。int64long使用可变长度编码。负数编码效率低下–如果您的字段可能具有负值,请改用sint64。uint32int使用可变长度编码。uint64long使用可变长度编码。sint32int使用可变长度编码。有符号的int值。与常规int32相比,它们更有效地编码负数。sint64long使用可变长度编码。有符号的int值。与常规int64相比,它们更有效地编码负数。fixed32int始终为四个字节。如果值通常大于2^28,则比uint32更有效。fixed64long始终为八个字节。如果值通常大于2^56,则比uint64更有效。sfixed32int始终为四个字节。sfixed64long始终为八个字节。boolbooleanstringString字符串必须始终包含UTF-8编码或7位ASCII文本。bytesByteString可以包含任意字节序列。 执行protobuf:compile编译命令
  • 将 proto 文件编译成java代码

在这里插入图片描述

在这里插入图片描述

编写代码使用ProtoBuf序列化、反序列化
public class ProtoBufDemo {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        DemoModel.User.Builder builder = DemoModel.User.newBuilder();
        builder.setId(1);
        builder.setName("张三");
        builder.setSex("男");

        byte[] bytes = builder.build().toByteArray();
        System.out.println("--protobuf---");
        for (byte b : bytes) {
            System.out.print(b);
        }
        System.out.println();
        System.out.println("---");

        DemoModel.User user = DemoModel.User.parseFrom(bytes);

        System.out.println(user.getName());
    }
}
BINLOG转换为ProtoBuf消息 编写proto描述文件:CanalModel.proto
syntax = "proto3";
option java_package = "com.itheima.canal_demo";
option java_outer_classname = "CanalModel";

/* 行数据 */
message RowData {
    string logfilename = 15;
    uint64 logfileoffset = 14;
    uint64 executeTime = 1;
    string schemaName = 2;
    string tableName = 3;
    string eventType = 4;

    /* 列数据 */
    map columns = 5;
}
添加binglogToProtoBuf序列化消息为Protobuf
    // binlog解析为ProtoBuf
    private static byte[] binlogToProtoBuf(Message message) throws InvalidProtocolBufferException {
        // 1. 构建CanalModel.RowData实体
        CanalModel.RowData.Builder rowDataBuilder = CanalModel.RowData.newBuilder();

        // 1. 遍历message中的所有binlog实体
        for (CanalEntry.Entry entry : message.getEntries()) {
            // 只处理事务型binlog
            if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                    entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            // 获取binlog文件名
            String logfileName = entry.getHeader().getLogfileName();
            // 获取logfile的偏移量
            long logfileOffset = entry.getHeader().getLogfileOffset();
            // 获取sql语句执行时间戳
            long executeTime = entry.getHeader().getExecuteTime();
            // 获取数据库名
            String schemaName = entry.getHeader().getSchemaName();
            // 获取表名
            String tableName = entry.getHeader().getTableName();
            // 获取事件类型 insert/update/delete
            String eventType = entry.getHeader().getEventType().toString().toLowerCase();

            rowDataBuilder.setLogfilename(logfileName);
            rowDataBuilder.setLogfileoffset(logfileOffset);
            rowDataBuilder.setExecuteTime(executeTime);
            rowDataBuilder.setSchemaName(schemaName);
            rowDataBuilder.setTableName(tableName);
            rowDataBuilder.setEventType(eventType);

            // 获取所有行上的变更
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List columnDataList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : columnDataList) {
                if(eventType.equals("insert") || eventType.equals("update")) {
                    for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                        rowDataBuilder.putColumns(column.getName(), column.getValue().toString());
                    }
                }
                else if(eventType.equals("delete")) {
                    for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                        rowDataBuilder.putColumns(column.getName(), column.getValue().toString());
                    }
                }
            }
        }

        return rowDataBuilder.build().toByteArray();
    } 
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0767s