一乍赫、使用sqoop從mysql中抽取數(shù)據(jù)到hive匆篓,查看sqoop官方文檔暂吉,有如下兩種方案:
7.2.9. Incremental Imports
Sqoop provides an incremental import mode which can be used to retrieve only rows newer than some previously-imported set of rows.
The following arguments control incremental imports:
Table 5. Incremental import arguments:
Argument Description
--check-column (col) Specifies the column to be examined when determining which rows to import. (the column should not be of type CHAR/NCHAR/VARCHAR/VARNCHAR/ LONGVARCHAR/LONGNVARCHAR)
--incremental (mode) Specifies how Sqoop determines which rows are new. Legal values for mode include append and lastmodified.
--last-value (value) Specifies the maximum value of the check column from the previous import.
Sqoop supports two types of incremental imports: append and lastmodified. You can use the --incremental argument to specify the type of incremental import to perform.
You should specify append mode when importing a table where new rows are continually being added with increasing row id values. You specify the column containing the row’s id with --check-column. Sqoop imports rows where the check column has a value greater than the one specified with --last-value.
An alternate table update strategy supported by Sqoop is called lastmodified mode. You should use this when rows of the source table may be updated, and each such update will set the value of a last-modified column to the current timestamp. Rows where the check column holds a timestamp more recent than the timestamp specified with --last-value are imported.
At the end of an incremental import, the value which should be specified as --last-value for a subsequent import is printed to the screen. When running a subsequent import, you should specify --last-value in this way to ensure you import only the new or updated data. This is handled automatically by creating an incremental import as a saved job, which is the preferred mechanism for performing a recurring incremental import. See the section on saved jobs later in this document for more information.
示例:
簡(jiǎn)單說(shuō)胖秒,sqoop支持兩種增量MySql導(dǎo)入到hive的模式借笙,
一種是 append业稼,即通過(guò)指定一個(gè)遞增的列,比如:
--incremental append --check-column id --last-value 0
導(dǎo)入id>0的數(shù)
另一種是可以根據(jù)時(shí)間戳的模式叫 lastmodified ,比如:
--incremental lastmodified --check-column createTime --last-value '2012-02-01 11:0:00'
就是只導(dǎo)入createTime 比'2012-02-01 11:0:00'更大的數(shù)據(jù)。
--check-column (col) --檢查的列
--incremental (mode) --所選模式吩抓,append或者lastmodified
--last-value (value) -- 最后一次的值
```
本次采用的是時(shí)間戳方案疹娶,每天導(dǎo)入全量數(shù)據(jù),在hive中抽取最新的數(shù)據(jù)
#####二、hive增量SQL實(shí)現(xiàn)锹安。
數(shù)據(jù):data.txt
```
1,mary,18,2017-06-26 10:00:00
2,lucy,29,2017-06-26 10:00:00
3,jack,18,2017-06-26 10:00:00
4,nick,25,2017-06-26 10:00:00
4,nick,18,2017-06-27 10:00:00
5,tom,26,2017-06-26 10:00:00
5,tom,26,2017-06-27 12:00:00
```
1. 建表語(yǔ)句:
```
create table mytable(id int,name string,age int,createTime string) partitioned by (dt string) row format delimited fields terminated by ',';
```
2. 導(dǎo)入數(shù)據(jù):
```
load data local inpath '/home/ubuntu/data.txt' into table mytable partition(dt='20170626');
```
3. 查看數(shù)據(jù)
```
hive> select * from mytable where dt='20170626';
OK
1 mary 18 2017-06-26 10:00:00 20170626
2 lucy 29 2017-06-26 10:00:00 20170626
3 jack 18 2017-06-26 10:00:00 20170626
4 nick 25 2017-06-26 10:00:00 20170626
4 nick 18 2017-06-27 10:00:00 20170626
5 tom 26 2017-06-26 10:00:00 20170626
5 tom 26 2017-06-27 12:00:00 20170626
Time taken: 0.364 seconds, Fetched: 7 row(s)
hive>
```
4. 我們發(fā)現(xiàn)20170626中有27號(hào)的增量數(shù)據(jù),所以應(yīng)該將數(shù)據(jù)更新到20160627分區(qū)超升,保留最新的數(shù)據(jù)
(注:hive中刪除分區(qū)方法:alter table mytable drop partition(dt='20170627'))
#####查詢語(yǔ)句如下:
```
SELECT id, name, age, createTime
FROM (SELECT id, name, age, createTime, row_number() OVER (PARTITION BY id ORDER BY createTime DESC) AS rn
FROM mytable
) t
WHERE t.rn = 1;
```
其中:
```
select id,name,age,createTime,row_number() over (partition by id order by createTime DESC) AS rn from mytable
```
使用的hive的窗口函數(shù)row_number()盈滴,該函數(shù)作用是將原表按partition后面的字段分區(qū)后,并且按照createTime字段降序排列后,對(duì)分組內(nèi)部的行記錄進(jìn)行標(biāo)記行號(hào)咬展,分別從1-n順序標(biāo)號(hào)济瓢,
該句的查詢結(jié)果如下:
```
Total MapReduce CPU Time Spent: 2 seconds 250 msec
OK
1 mary 18 2017-06-26 10:00:00 1
2 lucy 29 2017-06-26 10:00:00 1
3 jack 18 2017-06-26 10:00:00 1
4 nick 18 2017-06-27 10:00:00 1
4 nick 25 2017-06-26 10:00:00 2
5 tom 26 2017-06-27 12:00:00 1
5 tom 26 2017-06-26 10:00:00 2
Time taken: 24.823 seconds, Fetched: 7 row(s)
hive>
```
因此我們很容易得出20170627號(hào)有效的最新數(shù)據(jù)為行號(hào)rn為1的數(shù)據(jù)
#####三箕宙、更新數(shù)據(jù)
最后將數(shù)據(jù)更新到20170627分區(qū),SQL如下
```
INSERT INTO TABLE mytable PARTITION(dt='20170627')
SELECT id, name, age, createTime
FROM (SELECT id, name, age, createTime, row_number() OVER (PARTITION BY id ORDER BY createTime DESC) AS rn
FROM mytable
) t
WHERE t.rn = 1;
```
查看數(shù)據(jù)
```
> select * from mytable where dt='20170627';
OK
1 mary 18 2017-06-26 10:00:00 20170627
2 lucy 29 2017-06-26 10:00:00 20170627
3 jack 18 2017-06-26 10:00:00 20170627
4 nick 18 2017-06-27 10:00:00 20170627
5 tom 26 2017-06-27 12:00:00 20170627
Time taken: 0.121 seconds, Fetched: 5 row(s)
hive>
```
對(duì)比后發(fā)現(xiàn)柬帕,數(shù)據(jù)確實(shí)是最新的哟忍。