1.1、创建一个SQLContext, SQLContext(及其子类, 如本节的HiveContext)是Spark SQL所有功能的入口
SqlContext: 应该是对应spark-sql这个project; 与hive解耦,不支持hql查询; HiveContext:应该是对应spark-hive这个项目; 与hive有部分耦合, 支持hql,是SqlContext的子类,也就是说兼容SqlContext; 此处需要注意: 如果使用SQLContext sqlContext = new SQLContext(sc);
会报错。[1.13] failure: ``table'' expected but identifier t2 found
// 构建Spark上下文
SparkConf conf = new SparkConf()
.setAppName("spark-hive")
.setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
//创建SQLontext
SQLContext sqlContext = new HiveContext(sc);
通过HiveContext进行查询,创建表,插入数据
1.2、查询, 通过SQLContext查询, 返回一个DataFrame对象
private static void showTables(SQLContext sqlContext) {
DataFrame df = sqlContext.sql("show tables");
for( Row row:df.take(3)){
System.out.println(row);
}
}
1.3、创建表
private static void createTable(SQLContext sqlContext) {
sqlContext.sql("create table t2(a String, b String)");
}
1.4、插入数据
创建DataFrame, spark提供两种方式的RDD与DataFrame的转化
- 1.4.1、通过反射, 第一个方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,当您在编写Spark应用程序时已经了解了schema时,可以很好地工作。
- 1.4.2、通过接口, 使用RDD和schema构建DataFrame
- 创建RDD
- 创建schema
- 通过RDD和schemae构建DataFrame
- 注册临时表,这个时候的数据只是在SQLContext存活的周期可用, 并没有存入Hvie中
- 把临时表中的数据插入到Hive数据库中
private static void insert(SQLContext sqlContext, JavaSparkContext sc ) {
List rows = new ArrayList();
for(int i = 0; i < 10; i ++) {
//创建Row对象
Row row = RowFactory.create(i, "value" + i);
rows .add(row);
}
JavaRDD rowsRDD = sc.parallelize(rows);//构建RDD
StructType schema2 = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("a", DataTypes.IntegerType, true),
DataTypes.createStructField("b", DataTypes.StringType, true)
));
//创建DataFrame//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2);
//注册临时表,这个时候的数据只是在SQLContext存活的周期可用, 并没有存入Hvie中
df2.registerTempTable("t2_tmp");
// 把临时表中的数据插入到Hive数据库中
sqlContext.sql("insert into t2 select * from t2_tmp");
}
1.5、查询数据
/**
* 获取指定日期范围内的用户访问行为数据
* @param sqlContext SQLContext
* @param taskParam 任务参数
* @return 行为数据RDD
*/
private static JavaRDD getActionRDDByDateRange(
SQLContext sqlContext, JSONObject taskParam) {
//解析请求参数
String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);
String sql =
"select * from user_visit_action "
+ "where start>='" + startDate + "' "
+ "and start
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?