使用 Pandas 操作 Hive | MangoDB | Redis | HBase|Memcache |HDFS |Spark

image.png

使用 python 操作 hadoop 好像只有 少量的功能,使用python 操作 hive 其實還有一個hiveserver 的一個包,不過 看這個 pyhive 應該是比較好用的碱茁。

安裝依賴

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

操作

from pyhive import hive
conn = hive.Connection(host='xxxx', port=10000, username='xxx', database='default')
cursor.execute('select * from url_log limit 10')
for result in cursor.fetchall():
    print result
##真實 內(nèi)網(wǎng) 測試
from pyhive import hive
conn = hive.Connection(host='172.16.16.32', port=10000, username='zhuzheng',auth='LDAP',password="abc123." ,database='fkdb')
cursor=conn.cursor()
cursor.execute('select * from fkdb.tab_client_label limit 10')
for result in cursor.fetchall():
    print(result)
###如果 hive 有賬號密碼  你需要  寫上,如果   hive 不在 同一臺機器 也要寫明  ip  和port哗蜈,
###授權模式 需要選擇合適的属划,我這里使用的上 LDAP同眯, 數(shù)據(jù)庫呢 ,你 需要連接你自己 正確的唯鸭。
####其實在搗鼓是 各種報錯,有賬號密碼 寫錯 和 授權模式錯誤  數(shù)據(jù)庫不存在 明肮,thift 報錯 等的菱农,整的人心 煩躁

from impala.dbapi import connect
#需要注意的是這里的auth_mechanism必須有,但database不必須
conn = connect(host='127.0.0.1', port=10000, database='default', auth_mechanism='PLAIN')
cur = conn.cursor()

cur.execute('SHOW DATABASES')
print(cur.fetchall())

cur.execute('SHOW Tables')
print(cur.fetchall())

使用 impala的 python客戶端連接柿估,我自己 測試 到現(xiàn)在還沒有成功

參考
https://blog.csdn.net/Gamer_gyt/article/details/52564335
impala python 依賴的thrift 版本有問題 秫舌,
thrift-sasl==0.2.1
pip uninstall thrift

pip uninstall impyla

pip install thrift==0.9.3

pip install impyla==0.13.8

https://blog.csdn.net/kkevinyang/article/details/79273106
https://github.com/cloudera/impyla/issues/268
https://github.com/cloudera/impyla/issues/234
http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Python-Error-TSaslClientTransport-object-has-no-attribute-trans/td-p/58033


from pyhive import hive
conn = hive.Connection(host="YOUR_HIVE_HOST", port=PORT, username="YOU")

cursor = conn.cursor()
cursor.execute("SELECT cool_stuff FROM hive_table")
for result in cursor.fetchall():
  use_result(result)

  import pandas as pd

  df = pd.read_sql("SELECT cool_stuff FROM hive_table", conn)
from pyhive import hive
import pandas as pd

def getData():
    conn = hive.Connection(host="1.0.1.38", auth="CUSTOM", username='hive', password="pvXxHTsdqrt8", port=10000, database='tapro_atg')
    df = pd.read_sql("select * from sales_data_leisure_view", conn)
    records = df.head(n=100000)
    print(records.to_json(orient='records'))

getData();
import pandas as pd
from pyhive import hive

conn = hive.connect('192.168.72.135')
cursor = conn.cursor()
sql = "select * from t2 where city='Shanghai'"
cursor.execute(sql)
res = cursor.fetchall()
df = pd.DataFrame(res, columns=['id', 'name', 'year', 'city'])

df1 = pd.read_sql(sql, conn, chunksize=3)
for chunk in df1:
    print(chunk)

# -*- coding:utf-8 -*-
import pandas as pd
from pyhive import hive
import time
import datetime
import os


def rfail(s, file_path):
    with open(file_path, "a+") as f:
        f.write(s + "\n")


def read_query(sql):
    hive_line = '''hive -e "set hive.cli.print.header=true;set mapreduce.job.queuename=hl_report;%s";''' % (sql)
    data_buffer = os.popen(hive_line)
    data = pd.read_table(data_buffer, sep="\t", chunksize=10000)
    return data


def get_from_hive(query, mode, engine_hive):
    #engine_hive = hive.Connection(host="xxxxx", port=10000, username="xxxx")
    if mode == "pyhive":
        data = pd.read_sql(query, engine_hive)
        return data
    elif mode == "raw":
        data = read_query(query)
        return data
    else:
        print("mode: pyhive or raw")
        return None


def gen_date(bdate, days):
    day = datetime.timedelta(days=1)
    for i in range(days):
        s = bdate + day * i
        # print(type(s))
        yield s.strftime("%Y-%m-%d")


def get_date_list(start=None, end=None):
    if (start is None) | (end is None):
        return []
    else:
        data = []
        for d in gen_date(start, (end - start).days):
            data.append(d)
        return data

import pandas as pd 
from pyhive import presto

cursor = presto.connect(host='presto-master-lb.prod.hulu.com', port = 8080, 
                        catalog = 'hive', username = 'xiaomeng.yang@hulu.com').cursor()
cursor.execute('select * from zzz_emma_genre_16H1_2 limit 10')
result = cursor.fetchall()
result

df = pd.DataFrame(result, columns = ['userid','genre', 'rnk', 'dependency_ratio', 'cumu_ratio'])
import numpy
from pyhive import hive
import pandas as pd
conn = hive.Connection(host="localhost", port=10000, database ='project')
cursor = conn.cursor()
murderdf=pd.read_sql_query("select povertyrate from poverty ",conn)
murdertar=pd.read_sql_query("select murder from crime where district ='TOTAL' and year = 2011",conn)

a=murderdf.as_matrix()
b=murdertar.as_matrix()

print(a)
print(b)
#
# murder = [871, 970, 1095, 1171, 1238, 1244, 1437, 1438, 1631, 1721]
#
# a = [1523, 112]
# b = [10, 20]

print(numpy.corrcoef(a,b))

另外可以借助 pyspark的 hivecontext

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
        
conf = SparkConf().set("spark.executor.memory", "2g") \
  .set("spark.dynamicAllocation.initialExecutors", "2") \
  .set("spark.driver.memory", "2g") \
  .set("spark.kryoserializer.buffer.max", "1g") \
  .set("spark.driver.cores", "4") \
  .set("spark.yarn.queue", "ace") \
  .set("spark.dynamicAllocation.maxExecutors", "32")
  
  
sparkContext = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sparkContext)
hiveContext = HiveContext(sparkContext)


import os
import pandas as pd


def hdfs_to_csv(hdfs_table, csv_name):
  query = "SELECT * FROM prod_rwi_batch.%s" % hdfs_table
  query_df = hiveContext.sql(query)
  query_df.cache()
  df = query_df.toPandas()  

  # save it locally
  csv_file = "%s.csv" % csv_name
  df.to_csv(csv_file)

  # copy it over to BDF
  os.system("hdfs dfs -copyFromLocal %s /user/dhomola" % csv_file) 
  # this didn't work due to access right issues: 
  # hdfs dfs -copyFromLocal initial_pos.csv /production/ace/data/dpt/dhomola

  # delete locally
  os.system("rm %s" % csv_file)

hdfs_to_csv("initialcohort_cp01_1508281365499_1142", "initial_pos")
hdfs_to_csv("restrictivecohort_cp02_1508281365499_1142", "restricted_pos")
hdfs_to_csv("randomsamplecohort_cs03_1508281365499_1142", "random_sample_scoring")
hdfs_to_csv("negativecohort_cn01_1508281365499_1142", "initial_negative")
hdfs_to_csv("cohort_v01_1508281365499_1142", "v01")
hdfs_to_csv("cohort_v02_1508281365499_1142", "v02")

參考

在使用Pandas進行數(shù)據(jù)處理的時候,我們通常從CSV或EXCEL中導入數(shù)據(jù)星虹,但有的時候數(shù)據(jù)都存在數(shù)據(jù)庫內(nèi)宽涌,除了 mysql 還有 其他的 nosql 狠毯,我們并沒有現(xiàn)成的數(shù)據(jù)文件嚼松,這時候可以通過Pymongo這個庫献酗,從mongoDB中讀取數(shù)據(jù),然后載入到Pandas中罕偎,只需要簡單的三步。

第一步 安裝依賴 導入包

pip3 install  pymongo
import pymongo
import pandas as pd

第二步 設置 MongoDB的連接信息

# 設置MongoDB連接信息
client = pymongo.MongoClient('localhost',27017)
cn_78 = client['cn_78']
project_info = cn_78['project_info']

第三步 加載數(shù)據(jù)到pandas

data = pd.DataFrame(list(project_info.find()))
# 刪除mongodb中的_id字段
del data['_id']
# 選擇需要顯示的字段
data = data[['aear','cate','subcate','name','maxmoney','minmoney','time']]
print(data)

另外參考

import pandas as pd
from pymongo import MongoClient

#建立MongoDB數(shù)據(jù)庫連接
client = MongoClient('192.168.248.52',12000)
#連接所需數(shù)據(jù)庫,locateInfo為數(shù)據(jù)庫名
db = client.locateInfo
db.authenticate("me", "me")
#連接所用集合
collection = db.dataCollect
#從mongodb獲取數(shù)據(jù)
gpsData = pd.DataFrame(list(collection.find({"deviceId":"05792"})))
#刪除數(shù)據(jù)Id字段
del gpsData['_id']
# 選擇需要顯示的字段
gpsData = gpsData[['deviceId','lating','lnging','gnssTime','locateType']]
#對數(shù)據(jù)按照時間的升序排序
gpsData = gpsData.sort('gnssTime')
print(gpsData)

這樣就可以輕松地從MongoDB中讀取數(shù)據(jù)到Pandas中進行數(shù)據(jù)分析了。

pandas 加載 redis 數(shù)據(jù)

首先有一個 牛逼的聚合 pandas-redistrict
https://github.com/correctiv/pandas-redistrict

還有 redis 自己的 包
https://github.com/andymccurdy/redis-py

pandas序列化方法msgpack:pd.read_msgpack()/to_msgpack()讯蒲。雖然目前是實驗性支持墨林,但應該是最簡潔的方法。在讀取時其支持迭代化序列酌呆。

redis_db = redis.StrictRedis(host="localhost", port=6379, db=0)
data = data.to_msgpack(compress='zlib')
# 
redis_db.setex(key, data, expire_time)

cached_data = redis_db.get(key)
df = pd.read_msgpack(cached_data)

另一種思路:參考timeseries2redis隙袁,可以將Tick或Bar數(shù)據(jù)在redis中讀取藤乙,實現(xiàn)方法很有趣惭墓。

不過我在看其performance時發(fā)現(xiàn)并沒有pd.read_csv快,pandas的csv讀取底層是C實現(xiàn)的划咐,可以達到幾十ms量級褐缠,如果希望再快幾倍风瘦,可以考慮用HDF5万搔,pandas讀寫性能的比較:performance-considerations瞬雹。


這個還是比較靠譜

set:

redisConn.set("key", df.to_msgpack(compress='zlib'))
get:

pd.read_msgpack(redisConn.get("key"))


stackoverflow上的redis-pickle方案):

from redis import StrictRedis
import cPickle as pickle

# StrictRedis類的子類酗捌,可以pickling和unpickling復雜對象,
# "pset"和"pget"方法代替StrictRedis類的"set"和"get"方法

class PickledRedis(StrictRedis):

    def pset(self, key, value, ex=None, px=None, nx=False, xx=False):
        value_pickled = pickle.dumps(value, 2)
        return self.set(key, value_pickled, ex=None, px=None, nx=False, xx=False)

    def pget(self, key):
        value_pickled = self.get(key)
        return pickle.loads(value_pickled)

另外參考 python操作redis
http://www.reibang.com/p/2639549bedc8

另外 python 操作 memcache 的也可以看看
https://docs.lvrui.io/2016/07/24/Python%E6%93%8D%E4%BD%9Cmemcache%E8%AF%A6%E8%A7%A3/

pandas 操作 Hbase

有兩種方案 尚镰,

  1. hbase ---> pyspark -->pandas dataframe
    2.hbase ---> mgpack--> pandas dataframe

靠譜的有一個 安裝 happybase 和 pdhbase

Writing DataFrame to HBase

Establish hbase connection using happybase and write the dataframe.

 import happybase
    import numpy as np
    import pandas as pd
    import pdhbase as pdh
    connection = None
    try:
        connection = happybase.Connection('127.0.0.1')
        connection.open()
        df = pd.DataFrame(np.random.randn(10, 5), columns=['a', 'b', 'c', 'd', 'e'])
        df['f'] = 'hello world'
        pdh.to_hbase(df, connection, 'sample_table', 'df_key', cf='cf')
    finally:
        if connection:
            connection.close()

Reading DataFrame from HBase

Establish hbase connection using happybase and read the dataframe.

import happybase
import numpy as np
import pandas as pd
import pdhbase as pdh
connection = None
try:
    connection = happybase.Connection('127.0.0.1')
    connection.open()
    df = pdh.read_hbase(connection, 'sample_table', 'df_key', cf='cf')
    print df
finally:
    if connection:
        connection.close()

pandas 操作 spark

有一個現(xiàn)成 的 包 比較老 pyspark_pandas

另外其實pyspark rdd 支持與 pandas的dataframe 交互

import pyspark
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pyspark.sql.functions as F
# import seaborn as sns
# import matplotlib.pyplot as plt
import sys
import numpy as np
from surprise import AlgoBase, Dataset, evaluate
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# import alert_program as ap

spark = SparkSession.builder.getOrCreate()


ratings_df = pd.read_table("data/ratings.dat", delimiter = '::',
                                     names=["user", "movie", "rating",
                                            "timestamp"], engine = 'python')
spark_df = spark.createDataFrame(ratings_df)

spark_df = spark_df.drop("timestamp")
train, test = spark_df.randomSplit([0.8, 0.2], seed=427471138)

als = ALS(
          userCol="user",
          itemCol="movie",
          ratingCol="rating",
          nonnegative=False,
          regParam=0.1,
          rank=10
         )
model = als.fit(train)

predictions = model.transform(test)

pandas_df = predictions.toPandas()
pandas_df_clean=pandas_df.fillna(pandas_df.mean())
pandas_df_clean['RMSE']=np.power(pandas_df_clean['rating']-pandas_df_clean['prediction'],2)
RMSE = np.sqrt(sum(pandas_df_clean['RMSE']) / len(pandas_df_clean))

print (RMSE)
from pyspark.sql import SQLContext
from pandas import DataFrame, Series
import pandas
sqlContext = SQLContext(sc)

df = sqlContext.load(source="org.apache.phoenix.spark", zKUrl="localhost:2181:/hbase-unsecure", table="doctors")
pandas_df = df.toPandas()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F
import pandas as pd
spark=SparkSession.builder.master("local").appName("my app").config(conf=SparkConf()).getOrCreate()
sc=SparkSession.builder.config(conf=SparkConf())
df=spark.read.format('json').load(['/home/xs/Documents/Weblog.1457006400155.gz','/home/xs/Documents/Weblog.1457006400158.gz',
'/home/xs/Documents/Weblog.1457006401774.gz'])
g1=df.groupBy("captcha_id",F.substring("request_time",1,19).alias("time")).count().filter(df.captcha_id!='')
pandas_df=g1.toPandas()#轉換成pandas的dataframe
data_pivot=pandas_df.pivot_table(index=["captcha_id"],columns=["time"],values=["count"])
data_pivot.to_csv("/home/xs/Documents/data.csv",header=True)

from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark
from pyspark.sql.functions import *
import pandas as pd

import numpy as np

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

sc = pyspark.SparkContext()
sqlcontext = SQLContext(sc)


pandas_df = pd.read_csv('rank.csv')

(trainingData, testData) = pandas_df.randomSplit([0.7, 0.3])

pandas_df['split'] = np.random.randn(pandas_df.shape[0], 1)

msk = np.random.rand(len(pandas_df)) <= 0.7

train = pandas_df[msk]
test = pandas_df[~msk]


s_df = sqlcontext.createDataFrame(train)


trainingData=s_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF(["features", "label"])

featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(trainingData)

dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

pipeline = Pipeline(stages=[featureIndexer, dt])

model = pipeline.fit(trainingData)


test_df = sqlcontext.createDataFrame(test)
testData=test_df.rdd.map(lambda x:(Vectors.dense(x[2:3]), x[1])).toDF(["features", "label"])
predictions = model.transform(testData)

predictions.select("prediction", "label", "features").show(5)

pandas 操作 HDFS

主要是通過 webhdfs

最牛逼的 簡直是深藏不露
https://github.com/ibis-project/ibis

Ibis: Python data analysis framework for Hadoop and SQL engines
一站到底

pip install ibis-framework

另外一個 也非常棒
https://github.com/RaRe-Technologies/smart_open

>>> # stream lines from an S3 object
>>> for line in smart_open.smart_open('s3://mybucket/mykey.txt'):
...    print line

>>> # using a completely custom s3 server, like s3proxy:
>>> for line in smart_open.smart_open('s3u://user:secret@host:port@mybucket/mykey.txt'):
...    print line

>>> # you can also use a boto.s3.key.Key instance directly:
>>> key = boto.connect_s3().get_bucket("my_bucket").get_key("my_key")
>>> with smart_open.smart_open(key) as fin:
...     for line in fin:
...         print line

>>> # can use context managers too:
>>> with smart_open.smart_open('s3://mybucket/mykey.txt') as fin:
...     for line in fin:
...         print line
...     fin.seek(0)  # seek to the beginning
...     print fin.read(1000)  # read 1000 bytes

>>> # stream from HDFS
>>> for line in smart_open.smart_open('hdfs://user/hadoop/my_file.txt'):
...     print line

>>> # stream from HTTP
>>> for line in smart_open.smart_open('http://example.com/index.html'):
...     print line

>>> # stream from WebHDFS
>>> for line in smart_open.smart_open('webhdfs://host:port/user/hadoop/my_file.txt'):
...     print line

>>> # stream content *into* S3 (write mode):
>>> with smart_open.smart_open('s3://mybucket/mykey.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')

>>> # stream content *into* HDFS (write mode):
>>> with smart_open.smart_open('hdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')

>>> # stream content *into* WebHDFS (write mode):
>>> with smart_open.smart_open('webhdfs://host:port/user/hadoop/my_file.txt', 'wb') as fout:
...     for line in ['first line', 'second line', 'third line']:
...          fout.write(line + '\n')

>>> # stream from/to local compressed files:
>>> for line in smart_open.smart_open('./foo.txt.gz'):
...    print line

>>> with smart_open.smart_open('/home/radim/foo.txt.bz2', 'wb') as fout:
...    fout.write("some content\n")

另外一個也很牛逼
https://github.com/spotify/snakebite

Snakebite is a python library that provides a pure python HDFS client and a wrapper around Hadoops minicluster. The client uses protobuf for communicating with the NameNode and comes in the form of a library and a command line interface. Currently, the snakebite client supports most actions that involve the Namenode and reading data from DataNodes.

這個也不錯
https://github.com/crs4/pydoop
官方網(wǎng) http://crs4.github.io/pydoop/

下面的也可以的

https://github.com/mtth/hdfs

官網(wǎng)
https://hdfscli.readthedocs.io/en/latest/

這個也超級 叼哦

https://github.com/HariSekhon/pytools
下面的這個比較 新
https://github.com/dask/hdfs3
https://hdfs3.readthedocs.io/en/latest/

from hdfs.client import Client
client = Client("http://host6.cloud.sinocbd.com:50070/")  # 50070: Hadoop默認namenode
dir(client)
# 其中用到的方法有:
# walk() 類似os.walk澳迫,返回值也是包含(路徑橄登,目錄名讥此,文件名)元素的數(shù)組萄喳,每層迭代。
# read() 類似file.read充坑,官方文檔的說法是client.read必須在with塊里使用:
# path=[]
# for i in client.walk('/tempfiles/temp',depth=1):
#     for item in i:
#      path.append(item)
#      print(item)
# print(path)
with client.read('/tempfiles/1.csv', encoding='gbk') as fs:
    content = fs.read()
    print(content)
import webhdfspy
import pandas as pd
webHDFS = webhdfspy.WebHDFSClient("host6.cloud.sinocbd.com", 50070,username='root')
data=pd.DataFrame(webHDFS.listdir('/'))
print(data)
pathlist=data['pathSuffix']
for i in pathlist:
    path="/"+pathlist
    # print(path)
    # print(webHDFS.listdir(path))
import os
import pickle

from pathlib import PurePath

import hdfs3
import pandas as pd

from ufuncs.storage.utils import check_abs_path


def hdfs_chmod_dirs(path, *, permission_code, raise_errors=False, hdfs3_conn=None):
    """Try to change permissions of each part of the path.
    Args:
        path (str): Path in HDFS
        permission_code (int/str): Permission to set on each part of the
            path (eg. 777, 775).
        hdfs3_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Raises:
        IOError: If not possible to change permission of a path part.
    """
    check_abs_path(path)
    hdfs = hdfs3_conn if hdfs3_conn else hdfs3.HDFileSystem()
    # change permissions starting with top dir
    _path = '/'
    for d in PurePath(path).parts[1:]:
        _path = os.path.join(_path, d)
        try:
            hdfs.chmod(_path, int(str(permission_code), 8))
        except IOError as e:
            if raise_errors:
                raise e


def hdfs_put_object(obj, storage_path,
                    *, permission_code=755, overwrite=True, hdfs_conn=None):
    """Store a python object to HDFS in pickle file.
    Args:
        obj: Python object.
        storage_path (str): HDFS full path of the file to write to.
        permission_code (int/str): Permission to set on the pickle file
            (eg. 777, 775). Defaults to 755.
        overwrite (bool): Overwrite if file already exists.
            Defaults to True.
        hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Raises:
        FileExistsError: If file exists and overwrite is set to False.
    """
    # create connector if not exists
    hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
    if not overwrite and hdfs.exists(storage_path):
        raise FileExistsError("HDFS file '{}' already exists. "
                              "Argument overwrite is {}."
                              .format(storage_path, overwrite))
    hdfs.mkdir(os.path.dirname(storage_path))
    with hdfs.open(storage_path, 'wb') as f:
        pickle.dump(obj, f)
    hdfs.chmod(storage_path, int(str(permission_code), 8))


def hdfs_get_object(storage_path, *, hdfs_conn=None):
    """Retrieve a python object from a pickle file in HDFS.
    Args:
        storage_path (str): HDFS full path of the file to write to.
        hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Returns:
        The python object that was loaded from the pickle file.
    Raises:
        NameError: If the object's class is not defined in the namespace.
        FileNotFoundError: If file is not found.
    """
    hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
    try:
        with hdfs.open(storage_path, 'rb') as f:
            obj = pickle.load(f)
    except IOError as _:
        raise FileNotFoundError("No such file or directory in HDFS: '{}'."
                                .format(storage_path))
    except AttributeError as _:
        raise NameError("Pickle file object class not found in the namespace.")
    return obj


def hdfs_df2csv(df, storage_path, *,
                permission_code=755, overwrite=True, hdfs_conn=None, **kwargs):
    """Save pandas dataframe to csv in HDFS.
    The kwargs are used to represent any argument from the known
    pandas.DataFrame.to_csv function.
    Args:
        df (pandas.DataFrame): Dataframe to write as csv
        permission_code (int/str): Permission to set on the pickle file
            (eg. 777, 775). Defaults to 755.
        overwrite (bool): Overwrite if file already exists.
            Defaults to True.
        hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Raises:
        TypeError: If df is not a pandas DataFrame
        FileExistsError: If file exists and overwrite is set to False.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Expected pandas Dataframe, got {}"
                        .format(type(df).__name__))
    check_abs_path(storage_path)
    # make hdfs connection
    hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
    # check if file to overrite
    if not overwrite and hdfs.exists(storage_path):
        raise FileExistsError("HDFS file '{}' already exists. "
                              "Argument overwrite is {}."
                              .format(storage_path, overwrite))
    # make directories
    hdfs.mkdir(os.path.dirname(storage_path))
    # write csv bytes to HDFS file
    with hdfs.open(storage_path, 'wb') as f:
        f.write(df.to_csv(**kwargs))
    # change permission
    hdfs.chmod(storage_path, int(str(permission_code), 8))


def hdfs_csv2df(storage_path, *, hdfs_conn=None, **kwargs):
    """Read .csv from HDFS into a pandas dataframe.
    The kwargs are used to represent any argument from the known
    pandas.DataFrame.read_csv function.
    Args:
        storage_path (str): Location of .csv file in HDFS
        hdfs_conn (hdfs3.core.HDFileSystem): HDFS connector.
    Returns:
        pd.DataFrame: Dataframe with .csv data
    """
    check_abs_path(storage_path)
    hdfs = hdfs_conn if hdfs_conn else hdfs3.HDFileSystem()
    with hdfs.open(storage_path, 'rb') as f:
        df = pd.read_csv(f, **kwargs)
    return df
import warnings
warnings.filterwarnings('ignore')

import sys
import random
import numpy as np

from sklearn import linear_model, cross_validation, metrics, svm
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import pydoop.hdfs as hdfs

def read_csv_from_hdfs(path, cols, col_types=None):
  files = hdfs.ls(path);
  pieces = []
  for f in files:
    fhandle = hdfs.open(f)
    pieces.append(pd.read_csv(fhandle, names=cols, dtype=col_types))
    fhandle.close()
  return pd.concat(pieces, ignore_index=True)
import os
import pytest

import fastparquet
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pandas.util.testing as tm


def hdfs_test_client(driver='libhdfs'):
    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
    user = os.environ['ARROW_HDFS_TEST_USER']
    try:
        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500))
    except ValueError:
        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
                         'an integer')

    return pa.HdfsClient(host, port, user, driver=driver)


def test_fastparquet_read_with_hdfs():
    fs = hdfs_test_client()

    df = tm.makeDataFrame()
    table = pa.Table.from_pandas(df)

    path = '/tmp/testing.parquet'
    with fs.open(path, 'wb') as f:
        pq.write_table(table, f)

    parquet_file = fastparquet.ParquetFile(path, open_with=fs.open)

    result = parquet_file.to_pandas()
    tm.assert_frame_equal(result, df)

python 操作 flink

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/python.html
https://github.com/wdm0006/flink-python-examples

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末司志,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子棵介,更是在濱河造成了極大的恐慌吧史,老刑警劉巖贸营,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件钞脂,死亡現(xiàn)場離奇詭異,居然都是意外死亡邓夕,警方通過查閱死者的電腦和手機焚刚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門矿咕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人捡絮,你說我怎么就攤上這事莲镣∪鹞辏” “怎么了?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵略板,是天一觀的道長叮称。 經(jīng)常有香客問我藐鹤,道長娱节,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任谴古,我火速辦了婚禮掰担,結果婚禮上怒炸,老公的妹妹穿的比我還像新娘阅羹。我一直安慰自己教寂,他們只是感情好孝宗,可當我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著猿诸,像睡著了一般狡忙。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上窜觉,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天禀挫,我揣著相機與錄音语婴,去河邊找鬼驶睦。 笑死场航,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的僻造。 我是一名探鬼主播孩饼,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼捣辆,長吁一口氣:“原來是場噩夢啊……” “哼汽畴!你這毒婦竟也來了耸序?” 一聲冷哼從身側響起坎怪,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤搅窿,失蹤者是張志新(化名)和其女友劉穎隙券,沒想到半個月后娱仔,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡耐朴,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年筛峭,在試婚紗的時候發(fā)現(xiàn)自己被綠了蜒滩。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片奶稠。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡锌订,死狀恐怖辆飘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情芹关,我是刑警寧澤,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布侥衬,位于F島的核電站轴总,受9級特大地震影響,放射性物質發(fā)生泄漏功偿。R本人自食惡果不足惜械荷,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一虑灰、第九天 我趴在偏房一處隱蔽的房頂上張望瘩缆。 院中可真熱鬧佃蚜,春花似錦谐算、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽一铅。三九已至,卻和暖如春肮之,著一層夾襖步出監(jiān)牢的瞬間戈擒,已是汗流浹背艰毒。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留犬辰,地道東北人冰单。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓诫欠,卻偏偏與公主長得像荒叼,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子坏晦,可洞房花燭夜當晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容