Hbase的主要客戶端接口通過(guò)org.apache.hadoop.hbase.client包中的HTable類來(lái)實(shí)現(xiàn),用戶可以通過(guò)它實(shí)現(xiàn)對(duì)Hbase進(jìn)行數(shù)據(jù)的增查改刪(CRUD)操作磺送。
需要注意的是HTable實(shí)例的創(chuàng)建是有代價(jià)的忧勿,實(shí)例化時(shí)會(huì)進(jìn)行一些檢查和其他操作龙屉,這些操作都是比較耗時(shí)的。因此,建議給每個(gè)線程創(chuàng)建一個(gè)HTable實(shí)例,并在客戶端應(yīng)用的生存周期內(nèi)復(fù)用這個(gè)實(shí)例羊娃。
put方法
put方法實(shí)現(xiàn)向Hbase存入數(shù)據(jù),可以分為兩類:一類作用于單行埃跷;另一類作用于多行
單行put
void put(Put put) throws IOException
該方法以單個(gè)Put或存儲(chǔ)在列表中的一組Put對(duì)象作為入?yún)⑷镧瑁琍ut對(duì)象有如下幾個(gè)構(gòu)造函數(shù):
Put(byte[] row)
Put(byte[] row, RowLock rowLock)
Put(byte[] row, long ts)
Put(byte[] row, long ts, RowLock rowLock)
創(chuàng)建Put實(shí)例用戶需要提供一個(gè)行鍵,Hbase中每行數(shù)據(jù)都有唯一的行鍵弥雹,就像關(guān)系型數(shù)據(jù)庫(kù)的主鍵
創(chuàng)建完P(guān)ut實(shí)例后垃帅,調(diào)用下面的幾個(gè)方法向?qū)嵗刑砑訑?shù)據(jù):
Put add(byte[] family, byte[] qualifier, byte[] value)
Put add(byte[] family, byte[] qualifier, long ts, byte[] value)
Put add(KeyValue kv)
每次調(diào)用add方法可以向?qū)嵗刑砑右涣袛?shù)據(jù),如果不指定時(shí)間戳ts參數(shù)剪勿,會(huì)使用構(gòu)造Put實(shí)例時(shí)指定的時(shí)間戳挺智,如果也沒(méi)指定,將會(huì)由region server指定
獲取Put實(shí)例內(nèi)部添加的數(shù)據(jù)通過(guò)get方法:
List<KeyValue> get(byte[] family, byte[] qualifier)
Map<byte[], List<KeyValue>> getFamilyMap()
如果想不通過(guò)遍歷整個(gè)集合而確定數(shù)據(jù)是否存在窗宦,可以使用:
boolean has(byte[] family, byte[] qualifier)
boolean has(byte[] family, byte[] qualifier, long ts)
boolean has(byte[] family, byte[] qualifier, byte[] value)
boolean has(byte[] family, byte[] qualifier, long ts, byte[] value)
如果找到匹配的列返回true
Put類還提供了一些其他常用方法:
方法 | 用途 |
---|---|
getRow() | 返回創(chuàng)建Put實(shí)例時(shí)指定的行鍵 |
getRowLock() | 返回當(dāng)前Put實(shí)例的RowLock實(shí)例 |
getTimeStamp() | 返回Put實(shí)例的時(shí)間戳,未被設(shè)定時(shí)返回Long.MAX_VALUE |
heapSize() | 計(jì)算當(dāng)前Put實(shí)例的堆大小二鳄,既包括數(shù)據(jù)也包含數(shù)據(jù)結(jié)構(gòu)所占空間 |
isEmpty() | 檢查FamilyMap是否含有任何KeyValue實(shí)例 |
numFamilies() | FamilyMap的大小赴涵,即所有KeyValue實(shí)例中列族的個(gè)數(shù) |
size() | 本次Put會(huì)添加的KeyValue(單元格)實(shí)例的數(shù)量 |
調(diào)用實(shí)例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class PutExample {
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "testtable");
Put put = new Put(Bytes.toBytes("row1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2")
Bytes.toBytes("val2"));
table.put(put);
}
}
數(shù)據(jù)版本化
Hbase的一個(gè)特殊功能是可以為一個(gè)單元格(一個(gè)特定列的值)存儲(chǔ)多個(gè)版本的數(shù)據(jù),每個(gè)版本的數(shù)據(jù)都有一個(gè)時(shí)間戳订讼,并且按照降序存儲(chǔ)髓窜。
舉個(gè)例子:
> create "test", "cf1"
> put "test", "row1", "cf1", "val1"
> put "test", "row1", "cf1", "val2"
> scan "test"
> scan "test",{ VERSIONS => 3}
上面這些shell命令創(chuàng)建了一個(gè)test表和cf1列族,緊接著使用兩個(gè)put命令向同一個(gè)行鍵和列鍵中存入數(shù)據(jù)欺殿,但是值不同寄纵;然后用scan命令查看數(shù)據(jù)。正常情況下脖苏,你可能認(rèn)為val2會(huì)覆蓋val1程拭,但Hbase中不是的,默認(rèn)情況Hbase會(huì)保留3個(gè)版本的數(shù)據(jù)棍潘。scan操作和get操作只返回最新版本的數(shù)據(jù)恃鞋,調(diào)用時(shí)加入最大版本參數(shù)就會(huì)得到多版本大數(shù)據(jù)
寫緩沖區(qū)
在Hbase中,每個(gè)put操作都會(huì)被轉(zhuǎn)換成一次 RPC 操作亦歉,從而將客戶端的數(shù)據(jù)提交給服務(wù)器并接受服務(wù)器的處理結(jié)果恤浪。如果某個(gè)應(yīng)用程序需要每秒提交上千行數(shù)據(jù)到Hbase中,這樣使用一行已提交的方式就太過(guò)于不合理了
Hbase的api提供了一個(gè)客戶端的寫緩沖區(qū)肴楷,它負(fù)責(zé)收集客戶端提交的put操作水由,在合適的時(shí)機(jī)通過(guò)一次RPC調(diào)用將所有put送往服務(wù)器,通過(guò)下面的方法可以控制和查看緩沖區(qū)的使用狀態(tài):
void setAutoFlush(boolean autoFlush)
boolean isAutoFlush()
Hbase客戶端默認(rèn)情況下是禁用客戶端緩沖區(qū)的赛蔫,可以顯示地將 autoFlush 設(shè)置成 false 來(lái)激活緩沖區(qū):
table.setAutoFlush(false)
同時(shí)可以通過(guò)調(diào)用 isAutoFlush() 方法來(lái)查看緩沖區(qū)是否啟用砂客,返回false表示已啟用緩沖區(qū)
顯示激活緩沖區(qū)后泥张,客戶端單行put操作不會(huì)直接產(chǎn)生RPC調(diào)用,此時(shí)提交的put實(shí)例被保存在客戶端進(jìn)程的內(nèi)存中鞭盟,如果想強(qiáng)制把數(shù)據(jù)刷新到Hbase服務(wù)器圾结,可以調(diào)用api方法:
void flushCommits() throws IOException
該方法會(huì)將所有的修改提交給遠(yuǎn)程Hbase服務(wù)器,被緩沖的put實(shí)例可以跨多行
通常用戶不需要自己手動(dòng)的強(qiáng)制刷緩沖區(qū)齿诉,Hbase客戶端api會(huì)跟蹤緩沖區(qū)中數(shù)據(jù)大小筝野,一旦超出配置的大小限制,會(huì)自動(dòng)隱式調(diào)用寫命令粤剧,可以通過(guò)下面的方法查看和設(shè)置緩沖區(qū)大小(單位是字節(jié)歇竟,默認(rèn)是2097152字節(jié),即2M):
long getWriteBufferSize()
void setWriteBufferSize(long writeBufferSize) throws IOException
此外還可以通過(guò)修改Hbase的配置文件 hbase-site.xml 來(lái)設(shè)置緩沖區(qū)大小抵恋,這樣可以避免給每個(gè)HTable實(shí)例都設(shè)置一次大谢酪椤:
<property>
<name>hbase.client.write.buffer</name>
<value>20971520</value>
</property>
寫緩沖區(qū)實(shí)例
HTable table = new HTable(conf, "testtable");
System.out.println("Auto flush:" + table.isAutoFlush());
table.setAutoFlush(false);
Put put1 = new Put(Bytes.toBytes("row1));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
table.put(put1);
Put pu2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val2"));
table.put(put2);
Put pu3 = new Put(Bytes.toBytes("row3"));
put3.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val3"));
table.put(put3);
Get get = new Get(Bytes.toBytes("row1"));
Result res1 = table.get(get);
System.out.println("Result:" + res1);
table.flushCommits();
Result res2 = table.get(get);
System.out.println("Result:" + res2);
可以通過(guò) getWriteBuffer() 方法獲取提交到緩沖區(qū)的put實(shí)例列表,不過(guò)直接操作這個(gè)列表是比較危險(xiǎn)的弧关,因?yàn)檫@將繞過(guò)堆大小的檢查盅安,并且有可能這個(gè)緩沖區(qū)正在刷寫內(nèi)容到服務(wù)器
批量Put
Hbase客戶端api提供批量提交put實(shí)例的方法,調(diào)用方法如下:
void put(List<Put> puts) throws IOException
由于批量操作可能會(huì)涉及多行世囊,有可能部分修改會(huì)失敗别瞭,那么Hbase服務(wù)器將放棄當(dāng)前修改的處理,繼續(xù)處理下一個(gè)株憾,最后通過(guò)一個(gè)IOException反饋給客戶端蝙寨。服務(wù)器處理失敗的put實(shí)例會(huì)被保存在客戶端的寫緩沖區(qū)中,再下一次刷寫緩沖區(qū)的時(shí)候會(huì)重試
值得一提的是嗤瞎,客戶端會(huì)做一些基本的檢查工作墙歪,例如put實(shí)例內(nèi)容是否為空,是否制定了列等贝奇。如果檢查不通過(guò)虹菲,客戶端會(huì)拋出異常,同時(shí)刷寫數(shù)據(jù)的時(shí)候這些put實(shí)例不會(huì)被處理
使用批量put時(shí)弃秆,需要注意的是:客戶端無(wú)法控制服務(wù)器端執(zhí)行put的順序届惋,即用戶無(wú)法保證數(shù)據(jù)的寫入順序!
批量put實(shí)例
List<Put> puts = new ArrayList<Put>();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
puts.add(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val2"));
puts.add(put2);
table.put(puts);
原子性操作
Hbase客戶端api提供了一種特殊的put調(diào)用菠赚,它能夠保證操作的原子性:(檢查寫 check and put):
boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException
該方法自帶檢查功能脑豹,檢查通過(guò)才執(zhí)行put操作,否則放棄本次修改衡查,可以保證服務(wù)端put操作的原子性瘩欺。
此原子性操作api限制了檢查和修改的數(shù)據(jù)必須屬于同一行,因此只提供了同一行數(shù)據(jù)的原子性保證;檢查和修改分別針對(duì)不同行時(shí)會(huì)拋出異常
原子操作實(shí)例
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
// 值不存在的情況下執(zhí)行
boolean res1 = table.checkAndPut(Bytes.toBytes("row1"),
Bytes.toBytes("colfam1"),
Bytes.toBytes("qual1"),
null, put1);
Put put2 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val2"));
// 當(dāng)上一次提交的值存在俱饿,則執(zhí)行
boolean res2 = table.checkAndPut(Bytes.toBytes("row1"),
Bytes.toBytes("colfam1"),
Bytes.toBytes("qual1"),
Bytes.toBytes("val1"),
put2);
原子性操作使Hbase提供了不同客戶端并發(fā)修改數(shù)據(jù)的功能