您当前的位置: 首页 >  阿里云云栖号 ar

Spark On MaxCompute如何访问Phonix数据

阿里云云栖号 发布时间:2020-10-14 13:40:22 ,浏览量:1

简介: 如何使用Spark On MaxCompute连接Phonix,将Hbase的数据写入到MaxCompute的对应表中,目前没有对应的案例,为了满足用户的需求。本文主要讲解使用Spark连接Phonix访问Hbase的数据再写入到MaxCompute方案实践。该方案的验证是使用hbase1.1对应Phonix为4.12.0。本文从阿里云Hbase版本的选择、确认VPC、vswitchID、设置白名单和访问方式,Phonix4.12.0的客户端安装,在客户端实现Phonix表的创建和写入,Spark代码在本地IDEA的编写以及pom文件以及vpcList的配置,打包上传jar包并进行冒烟测试。

一、购买Hbase1.1并设置对应资源 1.1购买hbase

hbase主要版本为2.0与1.1,这边选择对应hbase对应的版本为1.1 Hbase与Hbase2.0版本的区别HBase1.1版本 1.1版本基于HBase社区1.1.2版本开发。HBase2.0版本 2.0版本是基于社区2018年发布的HBase2.0.0版本开发的全新版本。同样,在此基础上,做了大量的改进和优化,吸收了众多阿里内部成功经验,比社区HBase版本具有更好的稳定性和性能。1600152836102-e55b53ac-380f-468f-8abb-d858bd02f6d8.png

1.2确认VPC,vsWitchID

确保测试联通性的可以方便可行,该hbase的VPCId,vsWitchID尽量与购买的独享集成资源组的为一致的,独享集成资源的文档可以参考https://help.aliyun.com/document_detail/137838.html1600152882272-d5f5849e-b0b8-4485-bcb3-c74fe4ff3169.png

1.3设置hbase白名单,其中DataWorks白名单如下,个人ECS也可添加

image.png 根据文档链接选择对应的DataWorks的region下的白名单进行添加https://help.aliyun.com/document_detail/137792.html1600153040149-a1350bfb-febc-43cd-bd8b-5c1a7b0a2e03.png

1.4查看hbase对应的版本和访问地址

打开数据库链接的按钮,可以查看到Hbase的主版本以及Hbase的专有网络访问地址,以及是否开通公网访问的方式进行连接。

1600152748791-1a0c3c5d-80a8-4296-9210-90d459d80480.png

二、安装Phonix客户端,并创建表和插入数据 2.1安装客户端

根据hbase的版本为1.1选择Phonix的版本为4.12.0根据文档https://help.aliyun.com/document_detail/53600.html 下载对应的客户端文件ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz 登陆客户端执行命令

./bin/sqlline.py 172.16.0.13,172.16.0.15,172.16.0.12:2181

1600077441347-fde10ae4-91d0-4ca3-99d5-5fd89a75be6b.png 创建表:

CREATE TABLE IF NOT EXISTS users_phonix
(
    id       INT   ,
    username STRING,
    password STRING
) ;

插入数据:

UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');
2.2查看是否创建和插入成功

在客户端执行命令,查看当前表与数据是否上传成功

select * from users;

1600078146386-cbadedf0-702a-4e3b-9ff1-2176cb85bfcd.png

三、编写对应代码逻辑 3.1编写代码逻辑

在IDEA按照对应得Pom文件进行配置本地得开发环境,将代码涉及到得配置信息填写完整,进行编写测试,这里可以先使用Hbase得公网访问链接进行测试,代码逻辑验证成功后可调整配置参数,具体代码如下


package com.git.phonix
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.phoenix.spark._
/**
  * 本实例适用于Phoenix 4.x版本
  */
object SparkOnPhoenix4xSparkSession {
  def main(args: Array[String]): Unit = {
    //HBase集群的ZK链接地址。
    //格式为:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181
    val zkAddress = args(0)
    //Phoenix侧的表名,需要在Phoenix侧提前创建。Phoenix表创建可以参考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW
    val phoenixTableName = args(1)
    //Spark侧的表名。
    val ODPSTableName = args(2)
    val sparkSession = SparkSession
      .builder()
      .appName("SparkSQL-on-MaxCompute")
      .config("spark.sql.broadcastTimeout", 20 * 60)
      .config("spark.sql.crossJoin.enabled", true)
      .config("odps.exec.dynamic.partition.mode", "nonstrict")
      //.config("spark.master", "local[4]") // 需设置spark.master为local[N]才能直接运行,N为并发数
      .config("spark.hadoop.odps.project.name", "***")
      .config("spark.hadoop.odps.access.id", "***")
      .config("spark.hadoop.odps.access.key", "***")
      //.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api")
      .config("spark.sql.catalogImplementation", "odps")
      .getOrCreate()
    //第一种插入方式
    var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load()
    df.show()
    df.write.mode("overwrite").insertInto(ODPSTableName)
  }
}
3.2对应Pom文件

pom文件中分为Spark依赖,与ali-phoenix-spark相关的依赖,由于涉及到ODPS的jar包,会在集群中引起jar冲突,所以要将ODPS的包排除掉




    4.0.0
    
        2.3.0
        3.3.8-public
        2.11.8
        2.11
        4.12.0-HBase-1.1
    
    com.aliyun.odps
    Spark-Phonix
    1.0.0-SNAPSHOT
    jar
    
        
            org.jpmml
            pmml-model
            1.3.8
        
        
            org.jpmml
            pmml-evaluator
            1.3.10
        
        
            org.apache.spark
            spark-core_${scala.binary.version}
            ${spark.version}
            provided
            
                
                    org.scala-lang
                    scala-library
                
                
                    org.scala-lang
                    scalap
                
            
        
        
            org.apache.spark
            spark-sql_${scala.binary.version}
            ${spark.version}
            provided
        
        
            org.apache.spark
            spark-mllib_${scala.binary.version}
            ${spark.version}
            provided
        
        
            org.apache.spark
            spark-streaming_${scala.binary.version}
            ${spark.version}
            provided
        
        
            com.aliyun.odps
            cupid-sdk
            ${cupid.sdk.version}
            provided
        
        
            com.aliyun.phoenix
            ali-phoenix-core
            4.12.0-AliHBase-1.1-0.8
            
                
                    com.aliyun.odps
                    odps-sdk-mapred
                
                
                    com.aliyun.odps
                    odps-sdk-commons
                
            
        
        
            com.aliyun.phoenix
            ali-phoenix-spark
            4.12.0-AliHBase-1.1-0.8
            
                
                    com.aliyun.phoenix
                    ali-phoenix-core
                
            
        
    
    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                2.4.3
                
                    
                        package
                        
                            shade
                        
                        
                            false
                            true
                            
                                
                                    
                                    *:*
                                
                            
                            
                                
                                    *:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                        **/log4j.properties
                                    
                                
                            
                            
                                
                                    reference.conf
                                
                                
                                    META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
                                
                            
                        
                    
                
            
            
                net.alchim31.maven
                scala-maven-plugin
                3.3.2
                
                    
                        scala-compile-first
                        process-resources
                        
                            compile
                        
                    
                    
                        scala-test-compile-first
                        process-test-resources
                        
                            testCompile
                        
                    
                
            
        
    
四、打包上传到DataWorks进行冒烟测试 4.1创建要传入的MaxCompute表
CREATE TABLE IF NOT EXISTS users_phonix
(
    id       INT   ,
    username STRING,
    password STRING
) ;
4.2打包上传到MaxCompute

在IDEA打包要打成shaded包,将所有的依赖包,打入jar包中,由于DatadWork界面方式上传jar包有50M的限制,因此采用MaxCompute客户端进行jar包1600073394094-678da37c-b504-432b-a430-a022d302bfec.png

4.3选择对应的project环境,查看上传资源,并点击添加到数据开发

进入DataWorks界面选择左侧资源图标,选择对应的环境位开发换进,输入删除文件时的文件名称进行搜索,列表中展示该资源已经上传成,点击提交到数据开发

1600073680606-96fb9057-f274-4afa-a98c-b2c4d94c2c33.png点击提交按钮

1600073719127-33ab00de-0a81-4b3c-a76f-24cadc8e0176.png

4.4配置对应的vpcList参数并提交任务测试

其中的配置vpcList文件的配置信息如下,可具体根据个人hbase的链接,进行配置

{
    "regionId":"cn-beijing",
    "vpcs":[
        {
            "vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk",
            "zones":[
                {
                    "urls":[
                        {
                            "domain":"172.16.0.12",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.12",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.12",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":16020
                        }
                    ]
                }
            ]
        }
    ]
}

Spark任务提交任务的配置参数,主类,以及对应的参数 该参数主要为3个参数第一个为Phonix的链接,第二个为Phonix的表名称,第三个为传入的MaxCompute表name.png

点击冒烟测试按钮,可以看到任务执行成功1600071547065-f1b1ff1e-8460-4bfc-9e42-7dc1e30f9d0f.png 在临时查询节点中执行查询语句,可以得到数据已经写入MaxCompute的表中1600756783637-33baf55a-e314-4370-83f8-a97180679bfa.png

总结:

使用Spark on MaxCompute访问Phonix的数据,并将数据写入到MaxCompute的表中经过实践,该方案时可行的。但在实践的时有几点注意事项: 1.结合实际使用情况选择对应的Hbase以及Phonix版本,对应的版本一致,并且所使用的客户端,以及代码依赖都会有所改变。 2.使用公网在IEAD进行本地测试,要注意Hbase白名单,不仅要设置DataWorks的白名单,还需将自己本地的地址加入到白名单中。 3.代码打包时需要将pom中的依赖关系进行梳理,避免ODPS所存在的包在对应的依赖中,进而引起jar包冲突,并且打包时打成shaded包,避免缺失遗漏对应的依赖。

 

 

 

原文链接 本文为阿里云原创内容,未经允许不得转载。

关注
打赏
1688896170
查看更多评论

阿里云云栖号

暂无认证

  • 1浏览

    0关注

    4522博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0618s