Spark常用数据库的数据的读写
Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如下两个场景:
1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析
1日志数据:电商网站的商家操作日志
2订单数据:保险行业订单数据
2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL
1表中网站基本分析(pv、uVv。。。。。)
HBase数据源Spark可以从HBase表中读写(Read/Write)数据,底层采用TablelnputFormat和TableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输格式OutPutFormat
package hbase
import java.util
import com.hankcs.hanlp.HanLP
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import com.hankcs.hanlp.seg.common.Term
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import search.SougouRecord
/**
* Hbase的数据连接
*/
object HbaseConnection {
def main(args: Array[String]): Unit = {
def main(args: Array[String]): Unit = {
//TODO 构建一个spark的对象
//构建Spark Application 应用的入口实例
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
SparkContext.getOrCreate(sparkConf)
}
//TODO:1 加载搜狗的数据的集合使用小数据集合
val inputpath = "E:\\GItHub_project\\Big_Data\\Spark\\Sparkday02_2.11\\src\\main\\resources\\SogouQ.sample"
val sougouRDD = sc.textFile(inputpath, minPartitions = 2)
print(s"count=${sougouRDD.count()}")
println(sougouRDD.first())
//TODO 2:数据的ETL操作的
val etlRDD: RDD[SougouRecord] = sougouRDD
.filter(line => null != line && line.trim.split("\\s+").length == 6)
.mapPartitions { iter =>
iter.map { line =>
val array = line.trim.split("\\s+")
//构建一个对象
SougouRecord(
array(0), array(1),
array(2).replace("\\[\\]", ""),
array(3).toInt, array(4).toInt,
array(5)
)
}
}
//由于数据使用多次 需要缓存数据
etlRDD.persist(StorageLevel.MEMORY_AND_DISK)
//TODO:搜索关键次统计
val resultRDD: RDD[(String, Int)] = etlRDD
.filter(recode => null != recode.queryWords && recode.queryWords.trim.length > 0)
.flatMap { record =>
//360安全卫士
val words = record.queryWords.trim
//使用的HanLP分词进行中文分词 360 安全 卫士
val terms: util.List[Term] = HanLP.segment(words)
//将java中的list转化为的scala中的list
import scala.collection.JavaConverters._
//封装到二元组的中的表示每一个搜索单词的出现的一次
val result = terms.asScala.map {
term => (term.word, 1)
}
//返回的结果
result
}
//分组聚合
.reduceByKey((tmp, item) => tmp + item)
//TODO:将来结果的数据保存都hbase
/**
* 表名:htb_wordCount
* rowkey word
* columFamily :info
* columns:count
*
* 创建表的语句是
*/
//第一步是将RDD转换为的RDD[(IMMutableByWriteTable,Put)]
val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map { case (word, count) =>
//创建的rowkey
val rowkey = new ImmutableBytesWritable(Bytes.toBytes(word))
//创建put对象
val put: Put = new Put(rowkey.get())
//添加column
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count.toString))
//返回二元组
(rowkey, put)
}
//TODO 保存数据的Hbase
/**
* def saveAsNewAPIHadoopFile(
* path: String,
* keyClass: Class[_],
* valueClass: Class[_],
* outputFormatClass: Class[_ line.trim.split("\\s+"))
//转为二元组 表示的是每一个单词的出现的次数
.map(word => word -> 1)
//分组聚合
.reduceByKey((tmp, item) => tmp + item)
resultRDD
//降低分区数
.coalesce(numPartitions = 1)
.foreachPartition({ iter => saveToMYSQL(iter)
})
//TODO 关闭的spark资源
sc.stop()
}
/**
* 将RDD每一个分区数据的保存到MYSQL表汇总
*
* @param datas
*/
def saveToMYSQL(datas: Iterator[(String, Int)]) = {
//加载驱动
Class.forName("")
var conn: Connection = null
var pstmt: PreparedStatement = null;
try {
//获取连接
conn = DriverManager.getConnection(
"",
"",
""
)
//获取事务级别
val autoConmmit = conn.getAutoCommit
conn.setAutoCommit(false)
//插入数据
val insertsql = ""
pstmt = conn.prepareStatement(insertsql)
datas.foreach({ case (word, count) =>
pstmt.setString(1, word)
pstmt.setString(2, count.toString)
//加入批量操作
pstmt.addBatch()
})
//批次插入
pstmt.executeBatch()
//手动提交
conn.commit()
//还原数据库的原来的事务级别状态
conn.setAutoCommit(autoConmmit)
} catch {
case e: Exception => e.printStackTrace()
} finally {
//关闭连接
if (null != pstmt) pstmt.close()
if (null != conn) conn.close()
}
}
}