HBase數(shù)據(jù)插入使用Put對(duì)象,Put對(duì)象在進(jìn)行數(shù)據(jù)插入時(shí)拟淮,首先會(huì)向HBase集群發(fā)送一個(gè)RPC請(qǐng)求干茉,得到相應(yīng)之后將Put類中的數(shù)據(jù)通過序列化的方式傳給HBase集群,集群節(jié)點(diǎn)接收到數(shù)據(jù)之后進(jìn)行添加功能很泊。
單行插入
單行插入即每次只插入一行數(shù)據(jù)角虫,下面先看一個(gè)插入一條數(shù)據(jù)的代碼:
@Test
public void testPut() throws IOException {
Connection conn = ConnectionFactory.createConnection();
HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t1"));
Put put = new Put(Bytes.toBytes("row1"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(1));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(15));
table.put(put);
table.close();
conn.close();
}
從上面的代碼可以看出插入一條數(shù)據(jù)是使用HTable對(duì)象的put方法添加,put方法中的參數(shù)是Put對(duì)象委造。好下面就來講講Put對(duì)象戳鹅。
Put的構(gòu)造函數(shù)
public Put(byte [] row) // 只指定rowKey
public Put(byte [] rowArray, int rowOffset, int rowLength) // 從一個(gè)字節(jié)數(shù)組中指定rowKey
public Put(byte [] rowArray, int rowOffset, int rowLength, long ts) // 從一個(gè)字節(jié)數(shù)組中指定rowKey,并且設(shè)置時(shí)間戳
public Put(byte[] row, long ts) // 指定rowKey和時(shí)間戳
public Put(ByteBuffer row)
public Put(ByteBuffer row, long ts)
public Put(Put putToCopy) // 從其他put對(duì)象拷貝
創(chuàng)建Put對(duì)象時(shí)用戶必須指定rowKey,在HBase中每行數(shù)據(jù)都有一個(gè)唯一的行鍵(rowKey)作為標(biāo)識(shí)昏兆,跟HBase的大多數(shù)數(shù)據(jù)類型一樣枫虏,它是一個(gè)字節(jié)數(shù)組。用戶可以按照自己的需求指定每行的行鍵。通常情況下模软,rowKey的含義與真實(shí)場(chǎng)景相關(guān),例如他的含義可以是一個(gè)用戶名或者訂單號(hào)饮潦,它的內(nèi)容可以是簡(jiǎn)單的數(shù)據(jù)燃异,也可以是比較復(fù)雜的UUID(全局統(tǒng)一標(biāo)識(shí)符)等。
HBase為用戶提供了一個(gè)包含很多靜態(tài)方法的輔助類继蜡,這個(gè)類可以將許多java數(shù)據(jù)類型轉(zhuǎn)換為字節(jié)數(shù)組回俐。
Bytes類中的靜態(tài)方法
public static byte[] toBytes(ByteBuffer bb)
public static byte[] toBytes(String s)
public static byte[] toBytes(boolean b)
public static byte[] toBytes(long val)
public static byte[] toBytes(float f)
public static byte[] toBytes(int val)
創(chuàng)建Put對(duì)象之后,就可以向該對(duì)象中添加數(shù)據(jù)了稀并,添加數(shù)據(jù)的方法如下:
public Put add(Cell kv)
public Put addColumn(byte [] family, byte [] qualifier, byte [] value)
public Put addColumn(byte [] family, byte [] qualifier, long ts, byte [] value)
public Put addColumn(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value)
每一次調(diào)用add()或者addColumn()方法都可以添加一行數(shù)據(jù)仅颇,如果再加上一個(gè)時(shí)間戳選項(xiàng),就能成為一個(gè)數(shù)據(jù)單元格碘举。注意忘瓦,當(dāng)不指定時(shí)間戳?xí)r,Put對(duì)象會(huì)使用來自構(gòu)造函數(shù)中的可選時(shí)間戳引颈,如果用戶在構(gòu)造Put對(duì)象的時(shí)候也沒有指定時(shí)間戳耕皮,則時(shí)間戳將會(huì)由regionServer設(shè)定。
系統(tǒng)為一些用戶提供了Cell實(shí)例的變種蝙场,這里說的高級(jí)用戶是指知道怎樣檢索或創(chuàng)建這個(gè)內(nèi)部類的用戶凌停。Cell實(shí)例代表了一個(gè)唯一的數(shù)據(jù)單元格,類似于一個(gè)協(xié)調(diào)系統(tǒng)售滤,該系統(tǒng)使用行鍵罚拟、列族、列限定符完箩、時(shí)間戳指向一個(gè)單元格的值赐俗,像一個(gè)三維立方體系統(tǒng)。
獲取Put對(duì)象內(nèi)部的Cell對(duì)象需要調(diào)用以下方法:
public List<Cell> get(byte[] family, byte[] qualifier)
public NavigableMap<byte [], List<Cell>> getFamilyCellMap()
以上兩個(gè)方法可以查詢用戶之間添加的內(nèi)容嗜憔,同時(shí)將特定單元格的信息轉(zhuǎn)換成Cell對(duì)象秃励。用戶可以選擇獲取整個(gè)列族的全部數(shù)據(jù)單元,一個(gè)列族中的特定列或者是全部數(shù)據(jù)吉捶。后面的getFamilyCellMap()方法可以遍歷Put對(duì)象中的每個(gè)可用的Cell對(duì)象夺鲜。
用戶可以采用以下方法檢測(cè)是否存在特定的單元格,而不需遍歷整個(gè)集合:
boolean has(byte[] family,byte[] qualifier)
boolean has(byte[] family,byte[] qualifier,long ts)
boolean has(byte[] family,byte[] qualifier,byte[] value)
boolean has(byte[] family,byte[] qualifier,long ts,byte[] value)
Put類的其它方法
方法 | 描述 |
---|---|
getRow() | 返回創(chuàng)建Put實(shí)例時(shí)所指定的行鍵 |
getRowLock() | 返回當(dāng)前Put實(shí)例的行RowLock實(shí)例 |
getLockId() | 返回使用rowlock參數(shù)傳遞給構(gòu)造函數(shù)的可選的鎖ID,當(dāng)未被指定時(shí)返回-1L |
setWriteToWAL() | 允許關(guān)閉默認(rèn)啟用的服務(wù)端預(yù)寫日志(WAL)功能 |
getWriteToWAL() | 返回代表是否啟用了WAL功能 |
getTimeStamp() | 返回相應(yīng)Put實(shí)例的時(shí)間戳呐舔,該值可在構(gòu)造函數(shù)中由ts參數(shù)傳入币励,當(dāng)未被設(shè)定時(shí)返回Long.MAX_VALUE |
heapSize() | 計(jì)算當(dāng)前Put實(shí)例所需的堆大小,既包含其中的數(shù)據(jù)珊拼,也包含內(nèi)部數(shù)據(jù)結(jié)構(gòu)所需的空間 |
isEmpty() | 檢查FamilyMap是否含有任何KeyValue實(shí)例 |
numFamilies() | 查詢FamilyMap的大小,即所有的KeyValue實(shí)例中列族的個(gè)數(shù) |
size() | 返回本次Put會(huì)添加的KeyValue的實(shí)例 |
客戶端的寫緩沖區(qū)
每一個(gè)put操作實(shí)際上都是一個(gè)RPC操作食呻,它將客戶端數(shù)據(jù)傳送到服務(wù)器然后返回。這只適合小數(shù)據(jù)量的操作,如果有個(gè)應(yīng)用程序需要每秒存儲(chǔ)上千行數(shù)據(jù)到HBase表中仅胞,這樣處理就不太合適了每辟。
Tip
減少獨(dú)立RPC調(diào)用的關(guān)鍵是限制往返時(shí)間,往返時(shí)間就是客戶端發(fā)送一個(gè)請(qǐng)求到服務(wù)器干旧,然后服務(wù)器通過網(wǎng)絡(luò)進(jìn)行相應(yīng)的時(shí)間渠欺。這個(gè)時(shí)間不包含 數(shù)據(jù)實(shí)際傳輸?shù)臅r(shí)間,它其實(shí)就是通過線路傳送網(wǎng)絡(luò)包的開銷椎眯。一般情況下挠将,在LAN網(wǎng)絡(luò)中大約要花1ms的時(shí)間,這意味著在一秒的時(shí)間內(nèi)只能完成1000次RPC往返相應(yīng)编整。
另一個(gè)重要的因素就是消息大小舔稀。如果通過網(wǎng)絡(luò)發(fā)送的請(qǐng)求內(nèi)容較大,那么需要請(qǐng)求返回的次數(shù)相應(yīng)較少掌测,這是因?yàn)闀r(shí)間主要花費(fèi)在數(shù)據(jù)傳遞上内贮。不過如果傳送的數(shù)據(jù)量很小,比如一個(gè)計(jì)數(shù)器遞增操作赏半,那么用戶把多次修改的數(shù)據(jù)批量提交給服務(wù)器并減少請(qǐng)求次數(shù)贺归,性能會(huì)有相應(yīng)提升。
HBase的API配備了一個(gè)客戶端的寫緩沖區(qū)断箫,緩沖區(qū)負(fù)責(zé)收集put操作拂酣,然后調(diào)用RPC操作一次性將put送往服務(wù)器。全局交換機(jī)控制著該緩沖區(qū)是否在使用仲义,以下是其方法:
void setAutoFlushTo(boolean autoFlush)
boolean isAutoFlush()
默認(rèn)情況下婶熬,客戶端緩沖區(qū)是禁用的“D欤可以通過將自動(dòng)刷新設(shè)置為false來激活緩沖區(qū)赵颅,調(diào)用如下:
table.setAutoFlushTo(false)
激活客戶端緩沖區(qū)之后,將數(shù)據(jù)存儲(chǔ)到HBase中暂刘。此時(shí)的操作不會(huì)產(chǎn)生RPC調(diào)用饺谬,因?yàn)榇鎯?chǔ)Put實(shí)例保存在客戶端進(jìn)程中的內(nèi)存中。當(dāng)需要強(qiáng)制把數(shù)據(jù)寫到服務(wù)器時(shí)谣拣,可以調(diào)用table.flushCommits()
募寨。
flushCommits()
方法將所有的修改傳送給遠(yuǎn)程服務(wù)器。被緩沖的Put實(shí)例可以跨多行森缠“斡ィ客戶端能夠批量處理這些更新,并把他們傳送到對(duì)應(yīng)regionServer贵涵。和調(diào)用單行put()方法一樣列肢,用戶不需要擔(dān)心數(shù)據(jù)分配到了哪里恰画,因?yàn)閷?duì)于用戶來說,HBase客戶端對(duì)這個(gè)方法的處理是透明的瓷马,下圖展示了在客戶端請(qǐng)求傳送到服務(wù)器之前是怎樣按regionServer排序分組拴还,并通過每個(gè)regionServer的RPC請(qǐng)求將數(shù)據(jù)傳送到服務(wù)器的。
緩沖區(qū)僅在兩種情況下會(huì)刷新
- 顯示刷新
- 用戶調(diào)用flushCommit()方法欧聘,把數(shù)據(jù)發(fā)送到服務(wù)器做永久存儲(chǔ)自沧。
- 隱式刷新
- 隱式刷新會(huì)在用戶調(diào)用put或setWriteBufferSize()方法時(shí)觸發(fā)。這兩個(gè)方法都會(huì)將目前占用的緩沖區(qū)大小與用戶配置的大小做比較树瞭,如果超出限制則會(huì)調(diào)用flushCommits()方法。如果緩沖區(qū)被禁用爱谁,可以設(shè)置setAutoFlushTo(true),這樣用戶每次調(diào)用put方法是都會(huì)觸發(fā)刷寫晒喷。
批量插入示例代碼
/**
* 批量添加100000條數(shù)據(jù),每2000條一刷寫
*/
@Test
public void testBatchPut() throws IOException {
Connection conn = ConnectionFactory.createConnection();
HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t1"));
table.setAutoFlushTo(false);
for (int i = 0; i < 100000; i++) {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100));
table.put(put);
if (i % 2000 == 0){
table.flushCommits();
}
}
table.flushCommits();
table.close();
conn.close();
}
Put列表
客戶端可以將一個(gè)Put實(shí)例的集合進(jìn)行插入操作访敌,實(shí)現(xiàn)代碼如下:
/**
* 批量插入1000條數(shù)據(jù)
*/
@Test
public void testBatchPut2() throws IOException {
Connection conn = ConnectionFactory.createConnection();
HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t1"));
List<Put> puts = new ArrayList<Put>();
for (int i = 0; i < 1000; i++) {
Put put = new Put(Bytes.toBytes("r" + i));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100));
puts.add(put);
}
table.put(puts);
table.close();
conn.close();
}
原子性操作compare-and-set
有一個(gè)特別的普通調(diào)用凉敲,其能保證自身操作的原子性,檢查和寫(check and put)方法如下:
public boolean checkAndPut(final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Put put)
public boolean checkAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOp compareOp, final byte [] value,
final Put put)
測(cè)試代碼如下
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
boolean res1 = table.checkAndPut(Bytes.toBytes("row1"),
Bytes.toBytes("qual1"), Bytes.toBytes("val1"), null, put1);
System.out.println("Put applied: " + res1);
table.put(put1);
boolean res2 = table.checkAndPut(Bytes.toBytes("row1"),
Bytes.toBytes("qual1"), Bytes.toBytes("val1"), null, put1);
System.out.println("Put applied: " + res2);
Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val2"));
boolean res3 = table.checkAndPut(Bytes.toBytes("row1"),
Bytes.toBytes("qual1"), Bytes.toBytes("val1"), null, put2);
System.out.println("Put applied: " + res3);
table.put(put2);
Put put3 = new Put(Bytes.toBytes("row3"));
put3.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual3"),
Bytes.toBytes("val3"));
boolean res4 = table.checkAndPut(Bytes.toBytes("row1"),
Bytes.toBytes("qual1"), Bytes.toBytes("val1"), null, put3);
System.out.println("Put applied: " + res4);