import happybase
import pandas as pd
def get_hbase_pool(host, size=5):
"""獲取一個連接池
:param host: hbase主機ip
:return: 連接池
"""
pool = happybase.ConnectionPool(host=host, size=int(size)) # 因R傳遞的size過來非int陪捷,需用int轉(zhuǎn)換
return pool
def read_hbase_data(pool, table_name, col_filter, col, row_prefix=None):
"""讀取hbase的數(shù)據(jù)表并轉(zhuǎn)換成dataframe輸出
:param pool: 連接池
:param col_filter: 過濾器(數(shù)據(jù)篩選)
:param col: dataframe的列名與read_col對應(yīng)
:param row_prefix: 指定row_key的前綴
:param table_name: 要讀取hbase的表名
:return: 數(shù)據(jù)讀取結(jié)果dataframe
"""
read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定讀取hbase數(shù)據(jù)表的列名(base64編碼)
row_prefix = row_prefix.encode()
result = pd.DataFrame(columns=col)
with pool.connection() as connection:
try:
#print(connection.tables()) #所有數(shù)據(jù)表
tab = connection.table(table_name)
for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
#print('key= ', key, '\nvalue= ', value)
col_value = pd.DataFrame.from_dict(value, orient='index').T # dict轉(zhuǎn)dataframe
col_value = col_value.applymap(lambda x: str(x, 'utf-8')) # 將bytes解碼為utf-8
col_value.columns = col
result = result.append(col_value)
connection.close()
except Exception as e:
connection.close()
print('Error:', e)
return result
def read_hbase_data_nopool(host, table_name, col_filter, col, row_prefix=None):
"""讀取hbase的數(shù)據(jù)表并轉(zhuǎn)換成dataframe輸出
:param host: hbase主機ip
:param col_filter: 過濾器(數(shù)據(jù)篩選)
:param col: dataframe的列名與read_col對應(yīng)
:param row_prefix: 指定row_key的前綴
:param table_name: 要讀取hbase的表名
:return: 數(shù)據(jù)讀取結(jié)果dataframe
"""
read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定讀取hbase數(shù)據(jù)表的列名(base64編碼)
row_prefix = row_prefix.encode()
result = pd.DataFrame(columns=col)
connection = happybase.Connection(host, autoconnect=False) # ip
connection.open()
try:
#print(connection.tables()) #所有數(shù)據(jù)表
tab = connection.table(table_name)
for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
#print('key= ', key, '\nvalue= ', value)
col_value = pd.DataFrame.from_dict(value, orient='index').T # dict轉(zhuǎn)dataframe
col_value = col_value.applymap(lambda x: str(x, 'utf-8')) # 將bytes解碼為utf-8
col_value.columns = col
result = result.append(col_value)
connection.close()
except Exception as e:
connection.close()
print('Error:', e)
return result
if __name__ == "__main__":
host = 'ip'
table_name = '表名'
row_prefix = None # row_key的前綴
col_filter = "SingleColumnValueFilter('f', 'x', =, 'binary:a')" # 過濾器 篩選x=a的
col = ['id', 'x', 'y'] # dataframe的列名
pool = get_hbase_pool(host, size=5)
result = read_hbase_data(pool=pool, table_name=table_name, col_filter=col_filter,
col=col, row_prefix=row_prefix)
python讀取hbase
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來润文,“玉大人姐呐,你說我怎么就攤上這事〉潋颍” “怎么了曙砂?”我有些...
- 文/不壞的土叔 我叫張陵,是天一觀的道長骏掀。 經(jīng)常有香客問我鸠澈,道長柱告,這世上最難降的妖魔是什么? 我笑而不...
- 正文 為了忘掉前任笑陈,我火速辦了婚禮际度,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘涵妥。我一直安慰自己乖菱,他們只是感情好,可當我...
- 文/花漫 我一把揭開白布蓬网。 她就那樣靜靜地躺著窒所,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拳缠。 梳的紋絲不亂的頭發(fā)上墩新,一...
- 文/蒼蘭香墨 我猛地睜開眼徙菠,長吁一口氣:“原來是場噩夢啊……” “哼讯沈!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起婿奔,我...
- 正文 年R本政府宣布,位于F島的核電站审轮,受9級特大地震影響肥哎,放射性物質(zhì)發(fā)生泄漏辽俗。R本人自食惡果不足惜疾渣,卻給世界環(huán)境...
- 文/蒙蒙 一篡诽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧榴捡,春花似錦杈女、人聲如沸。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至项乒,卻和暖如春啰劲,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背檀何。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- 1.Hbase安裝: 可以參考這篇文章,寫的很詳細:https://blog.csdn.net/wuruijie3...
- 準備數(shù)據(jù): 上傳到hdfs 編寫mapper: 編寫reducer: 編寫driver: 打包運行主類: yarn...
- 一、版本信息和環(huán)境 1概作、版本信息(全是Apache版本): hadoop-2.6.0 hbase-1.2.6.1 ...
- 1 通過 scan 讀取 hbase 表 應(yīng)用場景: 讀取方法: 直到讀取數(shù)據(jù)的inputformat是 Tabl...