使用 python 操作 hadoop 好像只有 少量的功能,使用python 操作 hive 其實還有一個hiveserver 的一個包,不過 看這個 pyhive 應該是比較好用的碱茁。
安裝依賴
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
操作
from pyhive import hive
conn = hive.Connection(host='xxxx', port=10000, username='xxx', database='default')
cursor.execute('select * from url_log limit 10')
for result in cursor.fetchall():
print result
##真實 內(nèi)網(wǎng) 測試
from pyhive import hive
conn = hive.Connection(host='172.16.16.32', port=10000, username='zhuzheng',auth='LDAP',password="abc123." ,database='fkdb')
cursor=conn.cursor()
cursor.execute('select * from fkdb.tab_client_label limit 10')
for result in cursor.fetchall():
print(result)
###如果 hive 有賬號密碼 你需要 寫上,如果 hive 不在 同一臺機器 也要寫明 ip 和port哗蜈,
###授權模式 需要選擇合適的属划,我這里使用的上 LDAP同眯, 數(shù)據(jù)庫呢 ,你 需要連接你自己 正確的唯鸭。
####其實在搗鼓是 各種報錯,有賬號密碼 寫錯 和 授權模式錯誤 數(shù)據(jù)庫不存在 明肮,thift 報錯 等的菱农,整的人心 煩躁
from impala.dbapi import connect
#需要注意的是這里的auth_mechanism必須有,但database不必須
conn = connect(host='127.0.0.1', port=10000, database='default', auth_mechanism='PLAIN')
cur = conn.cursor()
cur.execute('SHOW DATABASES')
print(cur.fetchall())
cur.execute('SHOW Tables')
print(cur.fetchall())
使用 impala的 python客戶端連接柿估,我自己 測試 到現(xiàn)在還沒有成功
參考
https://blog.csdn.net/Gamer_gyt/article/details/52564335
impala python 依賴的thrift 版本有問題 秫舌,
thrift-sasl==0.2.1
pip uninstall thrift
pip uninstall impyla
pip install thrift==0.9.3
pip install impyla==0.13.8
https://blog.csdn.net/kkevinyang/article/details/79273106
https://github.com/cloudera/impyla/issues/268
https://github.com/cloudera/impyla/issues/234
http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Python-Error-TSaslClientTransport-object-has-no-attribute-trans/td-p/58033
from pyhive import hive
conn = hive.Connection(host="YOUR_HIVE_HOST", port=PORT, username="YOU")
cursor = conn.cursor()
cursor.execute("SELECT cool_stuff FROM hive_table")
for result in cursor.fetchall():
use_result(result)
import pandas as pd
df = pd.read_sql("SELECT cool_stuff FROM hive_table", conn)
from pyhive import hive
import pandas as pd
def getData():
conn = hive.Connection(host="1.0.1.38", auth="CUSTOM", username='hive', password="pvXxHTsdqrt8", port=10000, database='tapro_atg')
df = pd.read_sql("select * from sales_data_leisure_view", conn)
records = df.head(n=100000)
print(records.to_json(orient='records'))
getData();
import pandas as pd
from pyhive import hive
conn = hive.connect('192.168.72.135')
cursor = conn.cursor()
sql = "select * from t2 where city='Shanghai'"
cursor.execute(sql)
res = cursor.fetchall()
df = pd.DataFrame(res, columns=['id', 'name', 'year', 'city'])
df1 = pd.read_sql(sql, conn, chunksize=3)
for chunk in df1:
print(chunk)
# -*- coding:utf-8 -*-
import pandas as pd
from pyhive import hive
import time
import datetime
import os
def rfail(s, file_path):
with open(file_path, "a+") as f:
f.write(s + "\n")
def read_query(sql):
hive_line = '''hive -e "set hive.cli.print.header=true;set mapreduce.job.queuename=hl_report;%s";''' % (sql)
data_buffer = os.popen(hive_line)
data = pd.read_table(data_buffer, sep="\t", chunksize=10000)
return data
def get_from_hive(query, mode, engine_hive):
#engine_hive = hive.Connection(host="xxxxx", port=10000, username="xxxx")
if mode == "pyhive":
data = pd.read_sql(query, engine_hive)
return data
elif mode == "raw":
data = read_query(query)
return data
else:
print("mode: pyhive or raw")
return None
def gen_date(bdate, days):
day = datetime.timedelta(days=1)
for i in range(days):
s = bdate + day * i
# print(type(s))
yield s.strftime("%Y-%m-%d")
def get_date_list(start=None, end=None):
if (start is None) | (end is None):
return []
else:
data = []
for d in gen_date(start, (end - start).days):
data.append(d)
return data
import pandas as pd
from pyhive import presto
cursor = presto.connect(host='presto-master-lb.prod.hulu.com', port = 8080,
catalog = 'hive', username = 'xiaomeng.yang@hulu.com').cursor()
cursor.execute('select * from zzz_emma_genre_16H1_2 limit 10')
result = cursor.fetchall()
result
df = pd.DataFrame(result, columns = ['userid','genre', 'rnk', 'dependency_ratio', 'cumu_ratio'])
import numpy
from pyhive import hive
import pandas as pd
conn = hive.Connection(host="localhost", port=10000, database ='project')
cursor = conn.cursor()
murderdf=pd.read_sql_query("select povertyrate from poverty ",conn)
murdertar=pd.read_sql_query("select murder from crime where district ='TOTAL' and year = 2011",conn)
a=murderdf.as_matrix()
b=murdertar.as_matrix()
print(a)
print(b)
#
# murder = [871, 970, 1095, 1171, 1238, 1244, 1437, 1438, 1631, 1721]
#
# a = [1523, 112]
# b = [10, 20]
print(numpy.corrcoef(a,b))
另外可以借助 pyspark的 hivecontext
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
conf = SparkConf().set("spark.executor.memory", "2g") \
.set("spark.dynamicAllocation.initialExecutors", "2") \
.set("spark.driver.memory", "2g") \
.set("spark.kryoserializer.buffer.max", "1g") \
.set("spark.driver.cores", "4") \
.set("spark.yarn.queue", "ace") \
.set("spark.dynamicAllocation.maxExecutors", "32")
sparkContext = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sparkContext)
hiveContext = HiveContext(sparkContext)
import os
import pandas as pd
def hdfs_to_csv(hdfs_table, csv_name):
query = "SELECT * FROM prod_rwi_batch.%s" % hdfs_table
query_df = hiveContext.sql(query)
query_df.cache()
df = query_df.toPandas()
# save it locally
csv_file = "%s.csv" % csv_name
df.to_csv(csv_file)
# copy it over to BDF
os.system("hdfs dfs -copyFromLocal %s /user/dhomola" % csv_file)
# this didn't work due to access right issues:
# hdfs dfs -copyFromLocal initial_pos.csv /production/ace/data/dpt/dhomola
# delete locally
os.system("rm %s" % csv_file)
hdfs_to_csv("initialcohort_cp01_1508281365499_1142", "initial_pos")
hdfs_to_csv("restrictivecohort_cp02_1508281365499_1142", "restricted_pos")
hdfs_to_csv("randomsamplecohort_cs03_1508281365499_1142", "random_sample_scoring")
hdfs_to_csv("negativecohort_cn01_1508281365499_1142", "initial_negative")
hdfs_to_csv("cohort_v01_1508281365499_1142", "v01")
hdfs_to_csv("cohort_v02_1508281365499_1142", "v02")
參考
- https://github.com/dropbox/PyHive* https://stackoverflow.com/questions/21370431/how-to-access-hive-via-python
在使用Pandas進行數(shù)據(jù)處理的時候,我們通常從CSV或EXCEL中導入數(shù)據(jù)星虹,但有的時候數(shù)據(jù)都存在數(shù)據(jù)庫內(nèi)宽涌,除了 mysql 還有 其他的 nosql 狠毯,我們并沒有現(xiàn)成的數(shù)據(jù)文件嚼松,這時候可以通過Pymongo這個庫献酗,從mongoDB中讀取數(shù)據(jù),然后載入到Pandas中罕偎,只需要簡單的三步。
第一步 安裝依賴 導入包
pip3 install pymongo
import pymongo
import pandas as pd
第二步 設置 MongoDB的連接信息
# 設置MongoDB連接信息
client = pymongo.MongoClient('localhost',27017)
cn_78 = client['cn_78']
project_info = cn_78['project_info']
第三步 加載數(shù)據(jù)到pandas
data = pd.DataFrame(list(project_info.find()))
# 刪除mongodb中的_id字段
del data['_id']
# 選擇需要顯示的字段
data = data[['aear','cate','subcate','name','maxmoney','minmoney','time']]
print(data)
另外參考
import pandas as pd
from pymongo import MongoClient
#建立MongoDB數(shù)據(jù)庫連接
client = MongoClient('192.168.248.52',12000)
#連接所需數(shù)據(jù)庫,locateInfo為數(shù)據(jù)庫名
db = client.locateInfo
db.authenticate("me", "me")
#連接所用集合
collection = db.dataCollect
#從mongodb獲取數(shù)據(jù)
gpsData = pd.DataFrame(list(collection.find({"deviceId":"05792"})))
#刪除數(shù)據(jù)Id字段
del gpsData['_id']
# 選擇需要顯示的字段
gpsData = gpsData[['deviceId','lating','lnging','gnssTime','locateType']]
#對數(shù)據(jù)按照時間的升序排序
gpsData = gpsData.sort('gnssTime')
print(gpsData)
這樣就可以輕松地從MongoDB中讀取數(shù)據(jù)到Pandas中進行數(shù)據(jù)分析了。
pandas 加載 redis 數(shù)據(jù)
首先有一個 牛逼的聚合 pandas-redistrict
https://github.com/correctiv/pandas-redistrict
還有 redis 自己的 包
https://github.com/andymccurdy/redis-py
pandas序列化方法msgpack:pd.read_msgpack()/to_msgpack()讯蒲。雖然目前是實驗性支持墨林,但應該是最簡潔的方法。在讀取時其支持迭代化序列酌呆。
redis_db = redis.StrictRedis(host="localhost", port=6379, db=0)
data = data.to_msgpack(compress='zlib')
#
redis_db.setex(key, data, expire_time)
cached_data = redis_db.get(key)
df = pd.read_msgpack(cached_data)
另一種思路:參考timeseries2redis隙袁,可以將Tick或Bar數(shù)據(jù)在redis中讀取藤乙,實現(xiàn)方法很有趣惭墓。
不過我在看其performance時發(fā)現(xiàn)并沒有pd.read_csv快,pandas的csv讀取底層是C實現(xiàn)的划咐,可以達到幾十ms量級褐缠,如果希望再快幾倍风瘦,可以考慮用HDF5万搔,pandas讀寫性能的比較:performance-considerations瞬雹。
這個還是比較靠譜
set:
redisConn.set("key", df.to_msgpack(compress='zlib'))
get:
pd.read_msgpack(redisConn.get("key"))
stackoverflow上的redis-pickle方案):
from redis import StrictRedis
import cPickle as pickle
# StrictRedis類的子類酗捌,可以pickling和unpickling復雜對象,
# "pset"和"pget"方法代替StrictRedis類的"set"和"get"方法
class PickledRedis(StrictRedis):
def pset(self, key, value, ex=None, px=None, nx=False, xx=False):
value_pickled = pickle.dumps(value, 2)
return self.set(key, value_pickled, ex=None, px=None, nx=False, xx=False)
def pget(self, key):
value_pickled = self.get(key)
return pickle.loads(value_pickled)
另外參考 python操作redis
http://www.reibang.com/p/2639549bedc8
另外 python 操作 memcache 的也可以看看
https://docs.lvrui.io/2016/07/24/Python%E6%93%8D%E4%BD%9Cmemcache%E8%AF%A6%E8%A7%A3/
pandas 操作 Hbase
有兩種方案 尚镰,
- hbase ---> pyspark -->pandas dataframe
2.hbase ---> mgpack--> pandas dataframe
靠譜的有一個 安裝 happybase 和 pdhbase
Writing DataFrame to HBase
Establish hbase connection using happybase and write the dataframe.
import happybase
import numpy as np
import pandas as pd
import pdhbase as pdh
connection = None
try:
connection = happybase.Connection('127.0.0.1')
connection.open()
df = pd.DataFrame(np.random.randn(10, 5), columns=['a', 'b', 'c', 'd', 'e'])
df['f'] = 'hello world'
pdh.to_hbase(df, connection, 'sample_table', 'df_key', cf='cf')
finally:
if connection:
connection.close()
Reading DataFrame from HBase
Establish hbase connection using happybase and read the dataframe.
import happybase
import numpy as np
import pandas as pd
import pdhbase as pdh
connection = None
try:
connection = happybase.Connection('127.0.0.1')
connection.open()
df = pdh.read_hbase(connection, 'sample_table', 'df_key', cf='cf')
print df
finally:
if connection:
connection.close()
pandas 操作 spark
有一個現(xiàn)成 的 包 比較老 pyspark_pandas
另外其實pyspark rdd 支持與 pandas的dataframe 交互
import pyspark
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pyspark.sql.functions as F
# import seaborn as sns
# import matplotlib.pyplot as plt
import sys
import numpy as np
from surprise import AlgoBase, Dataset, evaluate
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# import alert_program as ap
spark = SparkSession.builder.getOrCreate()
ratings_df = pd.read_table("data/ratings.dat", delimiter = '::',
names=["user", "movie", "rating",
"timestamp"], engine = 'python')
spark_df = spark.createDataFrame(ratings_df)
spark_df = spark_df.drop("timestamp")
train, test = spark_df.randomSplit([0.8, 0.2], seed=427471138)
als = ALS(
userCol="user",
itemCol="movie",
ratingCol="rating",
nonnegative=False,
regParam=0.1,
rank=10
)
model = als.fit(train)
predictions = model.transform(test)
pandas_df = predictions.toPandas()
pandas_df_clean=pandas_df.fillna(pandas_df.mean())
pandas_df_clean['RMSE']=np.power(pandas_df_clean['rating']-pandas_df_clean['prediction'],2)
RMSE = np.sqrt(sum(pandas_df_clean['RMSE']) / len(pandas_df_clean))
print (RMSE)
from pyspark.sql import SQLContext
from pandas import DataFrame, Series
import pandas
sqlContext = SQLContext(sc)
df = sqlContext.load(source="org.apache.phoenix.spark", zKUrl="localhost:2181:/hbase-unsecure", table="doctors")
pandas_df = df.toPandas()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
import pandas as pd
spark=SparkSession.builder.master("local").appName("my app").config(conf=SparkConf()).getOrCreate()
sc=SparkSession.builder.config(conf=SparkConf())
df=spark.read.format('json').load(['/home/xs/Documents/Weblog.1457006400155.gz','/home/xs/Documents/Weblog.1457006400158.gz',
'/home/xs/Documents/Weblog.1457006401774.gz'])
g1=df.groupBy("captcha_id",F.substring("request_time",1,19).alias("time")).count().filter(df.captcha_id!='')
pandas_df=g1.toPandas()#轉換成pandas的dataframe
data_pivot=pandas_df.pivot_table(index=["captcha_id"],columns=["time"],values=["count"])
data_pivot.to_csv("/home/xs/Documents/data.csv",header=True)
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
sc = pyspark.SparkContext()
sqlcontext = SQLContext(sc)
pandas_df = pd.read_csv('rank.csv')
(trainingData, testData) = pandas_df.randomSplit([0.7, 0.3])
pandas_df['split'] = np.random.randn(pandas_df.shape[0], 1)
msk = np.random.rand(len(pandas_df)) <= 0.7
train = pandas_df[msk]
test = pandas_df[~msk]
s_df = sqlcontext.createDataFrame(train)
trainingData=s_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF(["features", "label"])
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(trainingData)
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")
pipeline = Pipeline(stages=[featureIndexer, dt])
model = pipeline.fit(trainingData)
test_df = sqlcontext.createDataFrame(test)
testData=test_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF(["features", "label"])
predictions = model.transform(testData)
predictions.select("prediction", "label", "features").show(5)
pandas 操作 HDFS
主要是通過 webhdfs
最牛逼的 簡直是深藏不露
https://github.com/ibis-project/ibis
Ibis: Python data analysis framework for Hadoop and SQL engines
一站到底
- Apache Impala (incubating)
- Apache Kudu
- Hadoop Distributed File System (HDFS)
- PostgreSQL
- MySQL (Experimental)
- SQLite
- Pandas DataFrames (Experimental)
- Clickhouse
- BigQuery
pip install ibis-framework
另外一個 也非常棒
https://github.com/RaRe-Technologies/smart_open
>>> # stream lines from an S3 object
>>> for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
... print line
>>> # using a completely custom s3 server, like s3proxy:
>>> for line in smart_open.smart_open('s3u://user:secret@host:port@mybucket/mykey.txt'):
... print line
>>> # you can also use a boto.s3.key.Key instance directly:
>>> key = boto.connect_s3().get_bucket("my_bucket").get_key("my_key")
>>> with smart_open.smart_open(key) as fin:
... for line in fin:
... print line
>>> # can use context managers too:
>>> with smart_open.smart_open('s3://mybucket/mykey.txt') as fin:
... for line in fin:
... print line
... fin.seek(0) # seek to the beginning
... print fin.read(1000) # read 1000 bytes
>>> # stream from HDFS
>>> for line in smart_open.smart_open('hdfs://user/hadoop/my_file.txt'):
... print line
>>> # stream from HTTP
>>> for line in smart_open.smart_open('http://example.com/index.html'):
... print line
>>> # stream from WebHDFS
>>> for line in smart_open.smart_open('webhdfs://host:port/user/hadoop/my_file.txt'):
... print line
>>> # stream content *into* S3 (write mode):
>>> with smart_open.smart_open('s3://mybucket/mykey.txt', 'wb') as fout:
... for line in ['first line', 'second line', 'third line']:
... fout.write(line + '\n')
>>> # stream content *into* HDFS (write mode):
>>> with smart_open.smart_open('hdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
... for line in ['first line', 'second line', 'third line']:
... fout.write(line + '\n')
>>> # stream content *into* WebHDFS (write mode):
>>> with smart_open.smart_open('webhdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
... for line in ['first line', 'second line', 'third line']:
... fout.write(line + '\n')
>>> # stream from/to local compressed files:
>>> for line in smart_open.smart_open('./foo.txt.gz'):
... print line
>>> with smart_open.smart_open('/home/radim/foo.txt.bz2', 'wb') as fout:
... fout.write("some content\n")
另外一個也很牛逼
https://github.com/spotify/snakebite
Snakebite is a python library that provides a pure python HDFS client and a wrapper around Hadoops minicluster. The client uses protobuf for communicating with the NameNode and comes in the form of a library and a command line interface. Currently, the snakebite client supports most actions that involve the Namenode and reading data from DataNodes.
這個也不錯
https://github.com/crs4/pydoop
官方網(wǎng) http://crs4.github.io/pydoop/
下面的也可以的
官網(wǎng)
https://hdfscli.readthedocs.io/en/latest/
這個也超級 叼哦
https://github.com/HariSekhon/pytools
下面的這個比較 新
https://github.com/dask/hdfs3
https://hdfs3.readthedocs.io/en/latest/
from hdfs.client import Client
client = Client("http://host6.cloud.sinocbd.com:50070/") # 50070: Hadoop默認namenode
dir(client)
# 其中用到的方法有:
# walk() 類似os.walk澳迫,返回值也是包含(路徑橄登,目錄名讥此,文件名)元素的數(shù)組萄喳,每層迭代。
# read() 類似file.read充坑,官方文檔的說法是client.read必須在with塊里使用:
# path=[]
# for i in client.walk('/tempfiles/temp',depth=1):
# for item in i:
# path.append(item)
# print(item)
# print(path)
with client.read('/tempfiles/1.csv', encoding='gbk') as fs:
content = fs.read()
print(content)
import webhdfspy
import pandas as pd
webHDFS = webhdfspy.WebHDFSClient("host6.cloud.sinocbd.com", 50070,username='root')
data=pd.DataFrame(webHDFS.listdir('/'))
print(data)
pathlist=data['pathSuffix']
for i in pathlist:
path="/"+pathlist
# print(path)
# print(webHDFS.listdir(path))
import os
import pickle
from pathlib import PurePath
import hdfs3
import pandas as pd
from ufuncs.storage.utils import check_abs_path
def hdfs_chmod_dirs(path, *, permission_code, raise_errors=False, hdfs3_conn=None):
"""Try to change permissions of each part of the path.
Args:
path (str): Path in HDFS
permission_code (int/str): Permission to set on each part of the
path (eg. 777, 775).
hdfs3_conn (hdfs3.core.HDFileSystem): HDFS connector.
Raises:
IOError: If not possible to change permission of a path part.
"""
check_abs_path(path)
hdfs = hdfs3_conn if hdfs3_conn else hdfs3.HDFileSystem()
# change permissions starting with top dir
_path = '/'
for d in PurePath(path).parts[1:]:
_path = os.path.join(_path, d)
try:
hdfs.chmod(_path, int(str(permission_code), 8))
except IOError as e:
if raise_errors:
raise e
def hdfs_put_object(obj, storage_path,
*, permission_code=755, overwrite=True, hdfs_conn=None):
"""Store a python object to HDFS in pickle file.
Args:
obj: Python object.
storage_path (str): HDFS full path of the file to write to.
permission_code (int/str): Permission to set on the pickle file
(eg. 777, 775). Defaults to 755.
overwrite (bool): Overwrite if file already exists.
Defaults to True.
hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
Raises:
FileExistsError: If file exists and overwrite is set to False.
"""
# create connector if not exists
hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
if not overwrite and hdfs.exists(storage_path):
raise FileExistsError("HDFS file '{}' already exists. "
"Argument overwrite is {}."
.format(storage_path, overwrite))
hdfs.mkdir(os.path.dirname(storage_path))
with hdfs.open(storage_path, 'wb') as f:
pickle.dump(obj, f)
hdfs.chmod(storage_path, int(str(permission_code), 8))
def hdfs_get_object(storage_path, *, hdfs_conn=None):
"""Retrieve a python object from a pickle file in HDFS.
Args:
storage_path (str): HDFS full path of the file to write to.
hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
Returns:
The python object that was loaded from the pickle file.
Raises:
NameError: If the object's class is not defined in the namespace.
FileNotFoundError: If file is not found.
"""
hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
try:
with hdfs.open(storage_path, 'rb') as f:
obj = pickle.load(f)
except IOError as _:
raise FileNotFoundError("No such file or directory in HDFS: '{}'."
.format(storage_path))
except AttributeError as _:
raise NameError("Pickle file object class not found in the namespace.")
return obj
def hdfs_df2csv(df, storage_path, *,
permission_code=755, overwrite=True, hdfs_conn=None, **kwargs):
"""Save pandas dataframe to csv in HDFS.
The kwargs are used to represent any argument from the known
pandas.DataFrame.to_csv function.
Args:
df (pandas.DataFrame): Dataframe to write as csv
permission_code (int/str): Permission to set on the pickle file
(eg. 777, 775). Defaults to 755.
overwrite (bool): Overwrite if file already exists.
Defaults to True.
hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
Raises:
TypeError: If df is not a pandas DataFrame
FileExistsError: If file exists and overwrite is set to False.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("Expected pandas Dataframe, got {}"
.format(type(df).__name__))
check_abs_path(storage_path)
# make hdfs connection
hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
# check if file to overrite
if not overwrite and hdfs.exists(storage_path):
raise FileExistsError("HDFS file '{}' already exists. "
"Argument overwrite is {}."
.format(storage_path, overwrite))
# make directories
hdfs.mkdir(os.path.dirname(storage_path))
# write csv bytes to HDFS file
with hdfs.open(storage_path, 'wb') as f:
f.write(df.to_csv(**kwargs))
# change permission
hdfs.chmod(storage_path, int(str(permission_code), 8))
def hdfs_csv2df(storage_path, *, hdfs_conn=None, **kwargs):
"""Read .csv from HDFS into a pandas dataframe.
The kwargs are used to represent any argument from the known
pandas.DataFrame.read_csv function.
Args:
storage_path (str): Location of .csv file in HDFS
hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
Returns:
pd.DataFrame: Dataframe with .csv data
"""
check_abs_path(storage_path)
hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
with hdfs.open(storage_path, 'rb') as f:
df = pd.read_csv(f, **kwargs)
return df
import warnings
warnings.filterwarnings('ignore')
import sys
import random
import numpy as np
from sklearn import linear_model, cross_validation, metrics, svm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import pydoop.hdfs as hdfs
def read_csv_from_hdfs(path, cols, col_types=None):
files = hdfs.ls(path);
pieces = []
for f in files:
fhandle = hdfs.open(f)
pieces.append(pd.read_csv(fhandle, names=cols, dtype=col_types))
fhandle.close()
return pd.concat(pieces, ignore_index=True)
import os
import pytest
import fastparquet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pandas.util.testing as tm
def hdfs_test_client(driver='libhdfs'):
host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
user = os.environ['ARROW_HDFS_TEST_USER']
try:
port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500))
except ValueError:
raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
'an integer')
return pa.HdfsClient(host, port, user, driver=driver)
def test_fastparquet_read_with_hdfs():
fs = hdfs_test_client()
df = tm.makeDataFrame()
table = pa.Table.from_pandas(df)
path = '/tmp/testing.parquet'
with fs.open(path, 'wb') as f:
pq.write_table(table, f)
parquet_file = fastparquet.ParquetFile(path, open_with=fs.open)
result = parquet_file.to_pandas()
tm.assert_frame_equal(result, df)
python 操作 flink
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/python.html
https://github.com/wdm0006/flink-python-examples