您当前的位置: 首页 >  大数据

庄小焱

暂无认证

  • 1浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

大数据云计算——Spark实战(常用数据库的读写实战)

庄小焱 发布时间:2020-11-06 14:18:43 ,浏览量:1

Spark常用数据库的数据的读写

Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如下两个场景:

1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析

           1日志数据:电商网站的商家操作日志

           2订单数据:保险行业订单数据

2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL

           1表中网站基本分析(pv、uVv。。。。。)

HBase数据源

Spark可以从HBase表中读写(Read/Write)数据,底层采用TablelnputFormat和TableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输格式OutPutFormat

Spar数据的写入Hbase
package hbase

import java.util

import com.hankcs.hanlp.HanLP
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import com.hankcs.hanlp.seg.common.Term
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import search.SougouRecord


/**
 * Hbase的数据连接
 */
object HbaseConnection {
  def main(args: Array[String]): Unit = {
    def main(args: Array[String]): Unit = {
      //TODO 构建一个spark的对象
      //构建Spark Application 应用的入口实例
      val sc: SparkContext = {
        val sparkConf: SparkConf = new SparkConf()
          .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
          .setMaster("local[2]")
        SparkContext.getOrCreate(sparkConf)
      }
      //TODO:1 加载搜狗的数据的集合使用小数据集合
      val inputpath = "E:\\GItHub_project\\Big_Data\\Spark\\Sparkday02_2.11\\src\\main\\resources\\SogouQ.sample"
      val sougouRDD = sc.textFile(inputpath, minPartitions = 2)
      print(s"count=${sougouRDD.count()}")
      println(sougouRDD.first())

      //TODO 2:数据的ETL操作的
      val etlRDD: RDD[SougouRecord] = sougouRDD
        .filter(line => null != line && line.trim.split("\\s+").length == 6)
        .mapPartitions { iter =>
          iter.map { line =>
            val array = line.trim.split("\\s+")
            //构建一个对象
            SougouRecord(
              array(0), array(1),
              array(2).replace("\\[\\]", ""),
              array(3).toInt, array(4).toInt,
              array(5)
            )
          }
        }
      //由于数据使用多次 需要缓存数据
      etlRDD.persist(StorageLevel.MEMORY_AND_DISK)

      //TODO:搜索关键次统计
      val resultRDD: RDD[(String, Int)] = etlRDD
        .filter(recode => null != recode.queryWords && recode.queryWords.trim.length > 0)
        .flatMap { record =>
          //360安全卫士
          val words = record.queryWords.trim
          //使用的HanLP分词进行中文分词 360  安全  卫士
          val terms: util.List[Term] = HanLP.segment(words)
          //将java中的list转化为的scala中的list
          import scala.collection.JavaConverters._
          //封装到二元组的中的表示每一个搜索单词的出现的一次
          val result = terms.asScala.map {
            term => (term.word, 1)
          }
          //返回的结果
          result
        }
        //分组聚合
        .reduceByKey((tmp, item) => tmp + item)

      //TODO:将来结果的数据保存都hbase
      /**
       * 表名:htb_wordCount
       * rowkey  word
       * columFamily :info
       * columns:count
       *
       * 创建表的语句是
       */
      //第一步是将RDD转换为的RDD[(IMMutableByWriteTable,Put)]
      val putsRDD: RDD[(ImmutableBytesWritable, Put)] = resultRDD.map { case (word, count) =>
        //创建的rowkey
        val rowkey = new ImmutableBytesWritable(Bytes.toBytes(word))
        //创建put对象
        val put: Put = new Put(rowkey.get())
        //添加column
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(count.toString))
        //返回二元组
        (rowkey, put)
      }

      //TODO 保存数据的Hbase
      /**
       * def saveAsNewAPIHadoopFile(
       * path: String,
       * keyClass: Class[_],
       * valueClass: Class[_],
       * outputFormatClass: Class[_  line.trim.split("\\s+"))
      //转为二元组 表示的是每一个单词的出现的次数
      .map(word => word -> 1)
      //分组聚合
      .reduceByKey((tmp, item) => tmp + item)

    resultRDD
      //降低分区数
      .coalesce(numPartitions = 1)
      .foreachPartition({ iter => saveToMYSQL(iter)
      })

    //TODO 关闭的spark资源
    sc.stop()
  }

  /**
   * 将RDD每一个分区数据的保存到MYSQL表汇总
   *
   * @param datas
   */
  def saveToMYSQL(datas: Iterator[(String, Int)]) = {
    //加载驱动
    Class.forName("")
    var conn: Connection = null
    var pstmt: PreparedStatement = null;
    try {
      //获取连接
      conn = DriverManager.getConnection(
        "",
        "",
        ""
      )

      //获取事务级别
      val autoConmmit = conn.getAutoCommit
      conn.setAutoCommit(false)
      //插入数据
      val insertsql = ""
      pstmt = conn.prepareStatement(insertsql)
      datas.foreach({ case (word, count) =>
        pstmt.setString(1, word)
        pstmt.setString(2, count.toString)
        //加入批量操作
        pstmt.addBatch()
      })
      //批次插入
      pstmt.executeBatch()
      //手动提交
      conn.commit()
      //还原数据库的原来的事务级别状态
      conn.setAutoCommit(autoConmmit)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      //关闭连接
      if (null != pstmt) pstmt.close()
      if (null != conn) conn.close()
    }
  }

}
关注
打赏
1657692713
查看更多评论
立即登录/注册

微信扫码登录

0.0645s