写点什么

Apache Sqoop 中最重要的核心概念 - 导入导出

发布于: 2021 年 03 月 09 日
Apache Sqoop中最重要的核心概念-导入导出

一、Sqoop 导入


“导入工具”导入单个表从 RDBMS 到 HDFS。表中的每一行被视为 HDFS 的记录。所有记录都存储为文本文件的文本数据


下面的语法用于将数据导入 HDFS。


$ sqoop import (generic-args) (import-args)


Sqoop 测试表数据


在 mysql 中创建数据库 userdb,然后执行参考资料中的 sql 脚本:


创建三张表: emp 雇员表、 emp_add 雇员地址表、emp_conn 雇员联系表

1. 全量导入 mysql 表数据到 HDFS


下面的命令用于从 MySQL 数据库服务器中的 emp 表导入 HDFS。


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--delete-target-dir \
--target-dir /sqoopresult \
--table emp --m 1
复制代码


其中--target-dir 可以用来指定导出数据存放至 HDFS 的目录;


mysql jdbc url 请使用 ip 地址


为了验证在 HDFS 导入的数据,请使用以下命令查看导入的数据:


hdfs dfs -cat /sqoopresult/part-m-00000


可以看出它会在 HDFS 上默认用逗号,分隔 emp 表的数据和字段。可以通过


--fields-terminated-by '\t'来指定分隔符


shell1201,gopal,manager,50000,TP
1202,manisha,Proof reader,50000,TP
1203,khalil,php dev,30000,AC
1204,prasanth,php dev,30000,AC
1205,kranthi,admin,20000,TP
复制代码


2. 全量导入 mysql 表数据到 HIVE

2.1. 方式一:先复制表结构到 hive 中再导入数据


将关系型数据的表结构复制到 hive 中


bin/sqoop create-hive-table \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--table emp_add \
--username root \
--password hadoop \
--hive-table test.emp_add_sp
复制代码


其中:


--table emp_add 为 mysql 中的数据库 sqoopdb 中的表。


--hive-table empaddsp 为 hive 中新建的表名称。


从关系数据库导入文件到 hive 中


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--table emp_add \
--hive-table test.emp_add_sp \
--hive-import \
--m 1
复制代码


2.2 方式二:直接复制表结构数据到 hive 中


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--table emp_conn \
--hive-import \
--m 1 \
--hive-database test;
复制代码


3. 导入表数据子集(where 过滤)


--where 可以指定从关系数据库导入数据时的查询条件。它执行在数据库服务器相应的 SQL 查询,并将结果存储在 HDFS 的目标目录。


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--where "city ='sec-bad'" \
--target-dir /wherequery \
--table emp_add --m 1
复制代码


4. 导入表数据子集(query 查询)


注意事项:


使用 query sql 语句来进行查找不能加参数--table ;


并且必须要添加 where 条件;


并且 where 条件后面必须带一个 $CONDITIONS 这个字符串;


并且这个 sql 语句必须用单引号,不能用双引号;


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--target-dir /wherequery12 \
--query 'select id,name,deg from emp WHERE id>1203 and $CONDITIONS' \
--split-by id \
--fields-terminated-by '\t' \
--m 2
复制代码


sqoop 命令中,--split-by id 通常配合 -m 10 参数使用。用于指定根据哪个字段进行划分并启动多少个 maptask。

5. 增量导入


在实际工作当中,数据的导入,很多时候都是只需要导入增量数据即可,并不需要将表中的数据每次都全部导入到 hive 或者 hdfs 当中去,这样会造成数据重复的问题。因此一般都是选用一些字段进行增量的导入, sqoop 支持增量的导入数据。


增量导入是仅导入新添加的表中的行的技术。


--check-column (col)


用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似。


注意:这些被指定的列的类型不能使任意字符类型,如 char、varchar 等类型都是不可以的,同时-- check-column 可以去指定多个列。


--incremental (mode)


append:追加,比如对大于 last-value 指定的值之后的记录进行追加导入。lastmodified:最后的修改时间,追加 last-value 指定的日期之后的记录


--last-value (value)


指定自从上次导入后列的最大值(大于该指定的值),也可以自己设定某一值

5.1. Append 模式增量导入


  • 执行以下指令先将我们之前的数据导入:


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--target-dir /appendresult \
--table emp --m 1
复制代码


  • 使用 hadoop fs -cat 查看生成的数据文件,发现数据已经导入到 hdfs 中。


  • 然后在 mysql 的 emp 中插入 2 条增量数据:


 insert into `userdb`.`emp` (`id`, `name`,   `deg`, `salary`, `dept`) values ('1206', 'allen', 'admin', '30000', 'tp');     insert into `userdb`.`emp` (`id`, `name`,   `deg`, `salary`, `dept`) values ('1207', 'woon', 'admin', '40000', 'tp');   
复制代码


  • 执行如下的指令,实现增量的导入:


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root --password root \
--table emp --m 1 \
--target-dir /appendresult \
--incremental append \
--check-column id \
--last-value 1205
复制代码


  • 最后验证导入数据目录 可以发现多了一个文件 里面就是增量数据

5.2. Lastmodified 模式增量导入


  • 首先创建一个 customer 表,指定一个时间戳字段:


create table customertest(id int,name varchar(20),last_mod timestamp default   current_timestamp on update current_timestamp);   
复制代码


此处的时间戳设置为在数据的产生和更新时都会发生改变.


  • 分别插入如下记录:


insert into customertest(id,name) values(1,'neil');insert into customertest(id,name) values(2,'jack');insert into customertest(id,name) values(3,'martin');insert into customertest(id,name) values(4,'tony');insert into customertest(id,name) values(5,'eric');
复制代码


  • 执行 sqoop 指令将数据全部导入 hdfs:


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--target-dir /lastmodifiedresult \
--table customertest --m 1
复制代码


  • 查看此时导出的结果数据:


  • 再次插入一条数据进入 customertest 表


 insert into customertest(id,name) values(6,'james')   
复制代码


  • 使用 incremental 的方式进行增量的导入:


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--table customertest \
--target-dir /lastmodifiedresult \
--check-column last_mod \
--incremental lastmodified \
--last-value "2019-09-03 22:59:45" \
--m 1 \
--append
复制代码


此处已经会导入我们最后插入的一条记录,但是我们却发现此处插入了 2 条数据,这是为什么呢?


这是因为采用 lastmodified 模式去处理增量时,会将大于等于 last-value 值的数据当做增量插入。

5.3. Lastmodified 模式:append、merge-key


使用 lastmodified 模式进行增量处理要指定增量数据是以 append 模式(附加)还是 merge-key(合并)模式添加


下面演示使用 merge-by 的模式进行增量更新,我们去更新 id 为 1 的 name 字段。


 update customertest set   name = 'Neil' where id = 1;   
复制代码


更新之后,这条数据的时间戳会更新为更新数据时的系统时间.


执行如下指令,把 id 字段作为 merge-key:


bin/sqoop import \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--table customertest \
--target-dir /lastmodifiedresult \
--check-column last_mod \
--incremental lastmodified \
--last-value "2019-09-03 22:59:45" \
--m 1 \
--merge-key id
复制代码


由于 merge-key 模式是进行了一次完整的 mapreduce 操作,


因此最终我们在 lastmodifiedresult 文件夹下可以看到生成的为 part-r-00000 这样的文件,会发现 id=1 的 name 已经得到修改,同时新增了 id=6 的数据。


二、 Sqoop 导出


将数据从 Hadoop 生态体系导出到 RDBMS 数据库导出前,目标表必须存在于目标数据库中。


export 模式:


默认操作是从将文件中的数据使用 INSERT 语句插入到表中。


更新模式:Sqoop 将生成 UPDATE 替换数据库中现有记录的语句。


以下是 export 命令语法:


$ sqoop export (generic-args) (export-args)

1. 默认模式导出 HDFS 数据到 mysql


默认情况下,sqoop export 将每行输入记录转换成一条 INSERT 语句,添加到目标数据库表中。如果数据库中的表具有约束条件(例如,其值必须唯一的主键列)并且已有数据存在,则必须注意避免插入违反这些约束条件的记录。如果 INSERT 语句失败,导出过程将失败。此模式主要用于将记录导出到可以接收这些结果的空表中。通常用于全表数据导出。


导出时可以是将 Hive 表中的全部记录或者 HDFS 数据(可以是全部字段也可以部分字段)导出到 Mysql 目标表。


1.1. 准备 HDFS 数据


在 HDFS 文件系统中“/emp/”目录的下创建一个文件 emp_data.txt:


1201,gopal,manager,50000,TP
1202,manisha,preader,50000,TP
1203,kalil,php dev,30000,AC
1204,prasanth,php dev,30000,AC
1205,kranthi,admin,20000,TP
1206,satishp,grpdes,20000,GR
复制代码


1.2. 手动创建 mysql 中的目标表


mysql> USE userdb;
mysql> CREATE TABLE employee (
id INT NOT NULL PRIMARY KEY,
name VARCHAR(20),
deg VARCHAR(20),
salary INT,
dept VARCHAR(10));
复制代码


1.3. 执行导出命令


bin/sqoop export \
--connect jdbc:mysql://192.168.109.1:3306/userdb \
--username root \
--password root \
--table employee \
--export-dir /emp/emp_data
复制代码


1.4 相关配置参数


-- input-fields-terminated-by '\t'


指定文件中的分隔符


-- columns


选择列并控制它们的排序。当导出数据文件和目标表字段列顺序完全一致的时候可以不写。否则以逗号为间隔选择和排列各个列。没有被包含在–columns 后面列名或字段要么具备默认值,要么就允许插入空值。否则数据库会拒绝接受 sqoop 导出的数据,导致 Sqoop 作业失败


-- export-dir 导出目录,在执行导出的时候,必须指定这个参数,同时需要具备--table 或--call 参数两者之一,--table 是指的导出数据库当中对应的表,


-- call 是指的某个存储过程。


用户头像

还未添加个人签名 2021.03.07 加入

还未添加个人简介

评论

发布
暂无评论
Apache Sqoop中最重要的核心概念-导入导出