sqoop安装
- Hadoop相关配置
因为sqoop访问Hadoop的MapReduce使用的是代理的方式,必须在Hadoop中配置所接受的proxy用户和组。
编辑hadoop-2.6.5/etc/hadoop/core-site.xml文件。
在里面添加如下内容:1
2
3
4
5
6
7
8
9<!-- sqoop2 -->
<property>
<name>hadoop.proxyuser.hadoop.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.groups</name>
<value>*</value>
</property>
其中hadoop.proxyuser.hadoop.hosts(groups)的格式应该是:hadoop.proxyuser.${SYSTEM_USER}.hosts表示运行hadoop的系统用户名,这里我就是使用hadoop用户运行的,所以就这样配置。
Sqoop2相关配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28# 设置sqoop的环境变量
# etc/profile
export SQOOP_HOME=/usr/local/sqoop
# sqoop使用的第三方组件的路径,此目录需手动创建,mysql的jdbc驱动的jar包需复制到此目录
export SQOOP_SERVER_EXTRA_LIB=$SQOOP_HOME/extra
export PATH=$PATH:$SQOOP_HOME/bin
# 配置server/conf目录下的sqoop.properties
# 路径为hadoop的配置文件目录
org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/usr/local/hadoop/etc/hadoop
# 配置server/conf目录下的catalina.properties
# 将其中引用jar的目录地址改成hadoop中的地址(刚开始没有配置此项,sqoop2-tool verify报错,配置后运行正常)
common.loader=${catalina.base}/lib,${catalina.base}/lib/*.jar,${catalina.home}/lib,${catalina.home}/lib/*.jar,${catalina.home}/../lib/*.jar,/usr/local/hadoop/share/hadoop/common/*.jar,/usr/local/hadoop/share/hadoop/common/lib/*.jar,/usr/local/hadoop/share/hadoop/hdfs/*.jar,/usr/local/hadoop/share/hadoop/hdfs/lib/*.jar,/usr/local/hadoop/share/hadoop/mapreduce/*.jar,/usr/local/hadoop/share/hadoop/mapreduce/lib/*.jar,/usr/local/hadoop/share/hadoop/tools/*.jar,/usr/local/hadoop/share/hadoop/tools/lib/*.jar,/usr/local/hadoop/share/hadoop/yarn/*.jar,/usr/local/hadoop/share/hadoop/yarn/lib/*.jarsqoop初始化(第一次启动时)
1
2
3# 脚本在$SQOOP_HOME/bin目录下
sqoop2-tool upgrade验证配置与启动服务器
1
2
3
4
5
6# 脚本在$SQOOP_HOME/bin目录下
sqoop2-tool verify
# 脚本在$SQOOP_HOME/bin目录下
sqoop.sh server start
sqoop2-server start相关驱动
添加MySQL驱动
这篇主要是从MySQL数据库导入数据,所以需要添加MySQL官方的JDBC驱动。
从官网下载:(https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.41.zip)
解压下其中的jar包文件,将其放到Sqoop的server/lib和shell/lib目录下。
sqoop2使用
首先需要先运行Hadoop服务,进入hadoop-2.6.5/sbin/。启动dfs、yarn和jobhistory
1
2
3$ ./start-dfs.sh
$ ./start-yarn.sh
$ ./start-jobhistory.sh启动sqoop2服务,进入sqoop-1.99.7-bin-hadoop200/bin/
1
$ ./sqoop.sh server start
执行以下命令进入sqoop2命令行交互界面
1
$ ./sqoop2-shell
连接服务端,配置参数
1
sqoop:000> set server --host 127.0.0.1 --port 12000 --webapp sqoop
命令行操作sqoop2可以转至(https://blog.csdn.net/lusyoe/article/details/60478226)
- 以下为java操作sqoop2
首先添加maven依赖1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-client</artifactId>
<version>1.99.7</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/sqoop-client-1.99.7.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId>
<version>1.99.7</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/sqoop-common-1.99.7.jar</systemPath>
</dependency>
以下是java操作sqoop实现mysqltohdfs1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97public class SqoopUtil {
public static void mysqlToHDFS() {
//初始化
String url = "http://10.4.20.136:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
System.out.println(client);
//创建一个源链接 JDBC
MLink fromLink = client.createLink("generic-jdbc-connector");
fromLink.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 8));
fromLink.setCreationUser("liuzhangmin");
MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://10.4.20.151:3306/spring?userUnicode=true&characterEncoding=UTF8&useSSL=false");
fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
fromLinkConfig.getStringInput("linkConfig.username").setValue("root");
fromLinkConfig.getStringInput("linkConfig.password").setValue("Root@123");
fromLinkConfig.getStringInput("dialect.identifierEnclose").setValue(" ");
Status fromStatus = client.saveLink(fromLink);
if (fromStatus.canProceed()) {
System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
} else {
System.out.println("创建JDBC Link失败");
}
//创建一个目的地链接HDFS
MLink toLink = client.createLink("hdfs-connector");
toLink.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 8));
toLink.setCreationUser("liuzhangmin");
MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://10.4.20.136:9000");
Status toStatus = client.saveLink(toLink);
if (toStatus.canProceed()) {
System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
} else {
System.out.println("创建HDFS Link失败");
}
//创建一个任务
String fromLinkName = fromLink.getName();
String toLinkName = toLink.getName();
MJob job = client.createJob(fromLinkName, toLinkName);
job.setName("mysqltohdfs2");
job.setCreationUser("liuzhangmin");
// 设置源链接任务配置信息
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("test1");
// fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
// 设置目标链接任务配置信息
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/hdfs4jdbc");
toJobConfig.getBooleanInput("toJobConfig.appendMode").setValue(true);
// toJobConfig.getEnumInput("toJobConfig.storageType").setValue("HDFS");
toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
// Job resources
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
// driverConfig.getIntegerInput("throttlingConfig.loaders").setValue(1);
Status status = client.saveJob(job);
if (status.canProceed()) {
System.out.println("JOB创建成功,ID为: " + job.getPersistenceId());
} else {
System.out.println("JOB创建失败。");
}
//启动任务
String jobName = job.getName();
MSubmission submission = client.startJob(jobName);
System.out.println(jobName);
System.out.println("JOB提交状态为 : " + submission.getStatus());
while (submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
//三秒报告一次进度
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("JOB执行结束... ...");
System.out.println("Hadoop任务ID为 :" + submission.getExternalJobId());
Counters counters = submission.getCounters();
if (counters != null) {
System.out.println("计数器:");
for (CounterGroup group : counters) {
System.out.print("\t");
System.out.println(group.getName());
for (Counter counter : group) {
System.out.print("\t\t");
System.out.print(counter.getName());
System.out.print(": ");
System.out.println(counter.getValue());
}
}
}
System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
}
}
sqoop2使用时遇到的问题
- 创建jdbc链接时报错
相关驱动如MySQL没配置好 - 运行时报错spark is not allowed to simulate ${user}
这是由于hadoop的proxy用户和组没配置正确 - sqoop作业只能运行一次而且无法暂停和查看状态
这是因为hadoop的jobhistory服务未开启,具体见详情