hive使用

hive使用

Hive简介

hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
Hive是建立在 Hadoop 上的数据仓库基础构架。它提供了一系列的工具,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。Hive 定义了简单的类 SQL 查询语言,称为 HQL,它允许熟悉 SQL 的用户查询数据。同时,这个语言也允许熟悉 MapReduce 开发者的开发自定义的 mapper 和 reducer 来处理内建的 mapper 和 reducer 无法完成的复杂的分析工作。
安装见详情

Hive使用之JAVA API

首先是maven配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.3</version>
</dependency>

第二步在application.yml中配置hive相关参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
hive:
url: jdbc:hive2://10.4.20.136:10000
driver-class-name: org.apache.hive.jdbc.HiveDriver
type: com.alibaba.druid.pool.DruidDataSource
user: root
password: root
initialSize: 1
minIdle: 3
maxActive: 20
maxWait: 6000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 30000
validationQuery: select 1
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
connectionErrorRetryAttempts: 0
breakAfterAcquireFailure: true

第三步配置hive数据源和连接池(也可以直接使用):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package gbdpcloudproviderspark.gbdpcloudproviderspark.configurer;


import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;

/**
* @author liuzhangmin
* @date 2019/1/12 10:40
*/
@Configuration
@ConfigurationProperties(prefix = "hive")
public class HiveDruidConfig {

private String url;
private String user;
private String password;
private String driverClassName;
private int initialSize;
private int minIdle;
private int maxActive;
private int maxWait;
private int timeBetweenEvictionRunsMillis;
private int minEvictableIdleTimeMillis;
private String validationQuery;
private boolean testWhileIdle;
private boolean testOnBorrow;
private boolean testOnReturn;
private boolean poolPreparedStatements;
private int maxPoolPreparedStatementPerConnectionSize;
private int connectionErrorRetryAttempts;
private boolean breakAfterAcquireFailure;

@Bean(name = "hiveDruidDataSource")
@Qualifier("hiveDruidDataSource")
public DataSource dataSource() {
DruidDataSource datasource = new DruidDataSource();
datasource.setUrl(url);
datasource.setUsername(user);
datasource.setPassword(password);
datasource.setDriverClassName(driverClassName);
datasource.setInitialSize(initialSize);
datasource.setMinIdle(minIdle);
datasource.setMaxActive(maxActive);
datasource.setMaxWait(maxWait);
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
datasource.setValidationQuery(validationQuery);
datasource.setTestWhileIdle(testWhileIdle);
datasource.setTestOnBorrow(testOnBorrow);
datasource.setTestOnReturn(testOnReturn);
datasource.setPoolPreparedStatements(poolPreparedStatements);
datasource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize);
datasource.setConnectionErrorRetryAttempts(connectionErrorRetryAttempts);
datasource.setBreakAfterAcquireFailure(breakAfterAcquireFailure);
return datasource;
}

@Bean(name = "hiveDruidTemplate")
public JdbcTemplate hiveDruidTemplate(@Qualifier("hiveDruidDataSource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}

//省略getter、setter
}

配置完成就可以开始使用hive了,下面是针对一系列hive操作的java api:

1. Create Table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 示例:创建新表
*/
@RequestMapping("/create")
public String createTable() {
StringBuffer sql = new StringBuffer("CREATE TABLE IF NOT EXISTS ");
sql.append("user_sample");
sql.append("(user_num BIGINT, user_name STRING, user_gender STRING, user_age INT)");
sql.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' "); // 定义分隔符
sql.append("STORED AS TEXTFILE"); // 作为文本存储

logger.info("Running: " + sql);
String result = "Create table successfully...";
try {
// hiveJdbcTemplate.execute(sql.toString());
hiveDruidTemplate.execute(sql.toString());
} catch (DataAccessException dae) {
result = "Create table encounter an error: " + dae.getMessage();
logger.error(result);
}
return result;
}

创建hive表还可以关联hbase表

  • hbase表不存在的情况:
    创建hive表hive_hbase_table映射hbase表hbase_table,会自动创建hbase表hbase_table,且会随着hive表删除而删除。这里需要指定hive的schema到hbase schema的映射关系。
1
2
3
4
5
CREATE TABLE hive_hbase_table(key int, value string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
TBLPROPERTIES ("hbase.table.name" = "hbase_table",
"hbase.mapred.output.outputtable" = "hbase_table");

说明:第一个hbase_table是定义在hbase的table名称
第二个hbase_table是存储数据表的名称(“hbase.mapred.output.outputtable” = “hbase_table”这个可以不要,表数据就存储在第一个表中了) 。

1
2
可通过操作hive表为hbase中相应表添加数据
insert into table hive_hbase_table select * from hive_data;
  • hbase表存在的情况:
    创建hive的外表关联hbase表,注意hive schema到hbase schema的映射关系。删除外表不会删除对应hbase表
1
2
3
4
5
CREATE EXTERNAL TABLE hive_hbase_external_table(key int, value string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
TBLPROPERTIES ("hbase.table.name" = "hbase_table",
"hbase.mapred.output.outputtable" = "hbase_table");

2. 查看表/表数据

  查看表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RequestMapping("/show")
public List<String> showtables() {
List<String> list = new ArrayList<String>();
Statement statement = null;
try {
statement = hiveDruidDataSource.getConnection().createStatement();
String sql = "show tables";
logger.info("Running: " + sql);
ResultSet res = statement.executeQuery(sql);
while (res.next()) {
list.add(res.getString(1));
}
} catch (SQLException e) {
e.printStackTrace();
}
return list;
}

  查看某表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 查询Hive库中的某张数据表字段信息
*/
@RequestMapping("/describe")
public List<String> describeTable(String tableName) throws SQLException {
List<String> list = new ArrayList<String>();
Statement statement = hiveDruidDataSource.getConnection().createStatement();
String sql = "describe " + tableName;
logger.info("Running: " + sql);
ResultSet res = statement.executeQuery(sql);
while (res.next()) {
list.add(res.getString(1));
}
return list;
}


@RequestMapping("/metadata")
public List<Map<String, String>> metadata(String tableName) throws SQLException{
List<Map<String, String>> metadataList = new ArrayList<>();
Statement statement = hiveDruidDataSource.getConnection().createStatement();
String sql = "DESC " + tableName;
logger.info("Running: " + sql);
ResultSet res = statement.executeQuery(sql);
while (res.next()) {
Map<String, String> colAndType = new HashMap<>();
String column = res.getString(1);
String type = res.getString(2);
colAndType.put("column", column);
colAndType.put("type", type);
metadataList.add(colAndType);
}
return metadataList;
}

  查看表数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 查询指定tableName表中的数据
*/
@RequestMapping("/select")
public List<String> selectFromTable(String tableName) throws SQLException {
Statement statement = hiveDruidDataSource.getConnection().createStatement();
String sql = "select * from " + tableName;
logger.info("Running: " + sql);
ResultSet res = statement.executeQuery(sql);
List<String> list = new ArrayList<String>();
int count = res.getMetaData().getColumnCount();
String str = null;
while (res.next()) {
str = "";
for (int i = 1; i < count; i++) {
str += res.getString(i) + " ";
}
str += res.getString(count);
logger.info(str);
list.add(str);
}
return list;
}

3. 加载/插入数据

HIVE装载数据没有做任何转换加载到表中的数据只是进入相应的配置单元表的位置移动数据文件。纯加载操作复制/移动操作。
Load 操作只是单纯的复制/移动操作,将数据文件移动到 Hive 表对应的位置。
如:从本地导入数据到表格并追加原表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 示例:将Hive服务器本地文档中的数据加载到Hive表中
*/
@RequestMapping("/load")
public String loadIntoTable() {
String filepath = "/home/hadoop/user_sample.txt";
String sql = "load data local inpath '" + filepath + "' into table user_sample";
String result = "Load data into table successfully...";
try {
// hiveJdbcTemplate.execute(sql);
hiveDruidTemplate.execute(sql);
} catch (DataAccessException dae) {
result = "Load data into table encounter an error: " + dae.getMessage();
logger.error(result);
}
return result;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
@RequestMapping("/insert")
public String insertIntoTable() {
String sql = "INSERT INTO TABLE user_sample(user_num,user_name,user_gender,user_age) VALUES(888,'Plum','M',32)";
String result = "Insert into table successfully...";
try {
// hiveJdbcTemplate.execute(sql);
hiveDruidTemplate.execute(sql);
} catch (DataAccessException dae) {
result = "Insert into table encounter an error: " + dae.getMessage();
logger.error(result);
}
return result;
}

4.删除数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 示例:删除表
*/
@RequestMapping("/delete")
public String delete(String tableName) {
String sql = "DROP TABLE IF EXISTS "+tableName;
String result = "Drop table successfully...";
logger.info("Running: " + sql);
try {
// hiveJdbcTemplate.execute(sql);
hiveDruidTemplate.execute(sql);
} catch (DataAccessException dae) {
result = "Drop table encounter an error: " + dae.getMessage();
logger.error(result);
}
return result;
}