Spark的數(shù)據(jù)戲法

聚焦在 Twitter 上關(guān)于Apache Spark的數(shù)據(jù)迎变,目標(biāo)是準(zhǔn)備將來(lái)用于機(jī)器學(xué)習(xí)和流式處理應(yīng)用的數(shù)據(jù)各薇。 重點(diǎn)是如何通過(guò)分布式網(wǎng)絡(luò)交換代碼和數(shù)據(jù),獲得 串行化, 持久化 , 調(diào)度,和緩存的實(shí)戰(zhàn)經(jīng)驗(yàn) 踊兜。

認(rèn)真使用 Spark SQL, 交互性探索結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù). Spark SQL 的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)是
Spark dataframe犹赖,它受到了 Python Pandas
dataframe 和R dataframe 的啟發(fā). 這是一個(gè)強(qiáng)大的數(shù)據(jù)結(jié)構(gòu), 有R 或Python 背景的數(shù)據(jù)科學(xué)家非常容易理解并喜歡上它.

本章主要關(guān)注以下幾點(diǎn):

? 連接 Twitter, 收集有關(guān)數(shù)據(jù), 然后存儲(chǔ)到各種格式中如 JSON ,CSV 以及 MongoDB這樣的數(shù)據(jù)存儲(chǔ)

? 使用Blaze and Odo分析數(shù)據(jù), 一個(gè)Blaze的副產(chǎn)品庫(kù), 能夠在各種源和目標(biāo)之間建立連接并傳輸數(shù)據(jù)

? 引入 Spark dataframes 作為各個(gè) Spark 模塊交換數(shù)據(jù)的基礎(chǔ)董瞻,同時(shí)使用 Spark SQL交互性探索數(shù)據(jù)

回顧數(shù)據(jù)密集型應(yīng)用的架構(gòu)

首先審視數(shù)據(jù)密集型應(yīng)用架構(gòu)寞蚌,集中關(guān)注集成層以及獲取、提煉和數(shù)據(jù)持久化迭代循環(huán)的基本運(yùn)行. 這一循環(huán)命名為 5C. 5C 代表了connect, collect,correct, compose和consume. 這是集成層運(yùn)行的基本過(guò)程以便于保證從Twitter 獲取數(shù)據(jù)的質(zhì)量和數(shù)量. 我們也將深入持久化層力细,建立如 MongoDB這樣的數(shù)據(jù)存儲(chǔ)方便后面數(shù)據(jù)的處理.

通過(guò)Blaze探索數(shù)據(jù), 這是數(shù)據(jù)操控的一個(gè)Python 庫(kù), 通過(guò)Spark SQL使用 Spark dataframe, 完成交互性數(shù)據(jù)發(fā)現(xiàn)睬澡,感受一下三種 dataframe flavors 的細(xì)微差別。

下圖指出了本章的重點(diǎn)眠蚂, 集中在集成層和持久化層:


3-1 架構(gòu)回顧

數(shù)據(jù)的序列化和反序列化

由于在通過(guò)API獲取數(shù)據(jù)是的限制煞聪,我們需要數(shù)據(jù)存儲(chǔ). 數(shù)據(jù)在分布式集群上處理,我們需要一致的方式來(lái)保存狀態(tài)以便于將來(lái)的提取使用∈呕郏現(xiàn)在定義序列化, 持久化昔脯, 調(diào)度和緩存.

序列化一個(gè)Python對(duì)象是將它轉(zhuǎn)換一個(gè)字節(jié)流. 該P(yáng)ython 對(duì)象在程序掛掉的時(shí)候能夠通過(guò)反序列化提取.序列化后的 Python 對(duì)象在網(wǎng)絡(luò)上傳輸或者存在持久化存儲(chǔ)中. 反序列化是其逆運(yùn)算將字節(jié)流轉(zhuǎn)化為初始的 Python 對(duì)象所以程序能夠從保存的狀態(tài)中提取。 Python中最流行的序列化庫(kù)是Pickle. 事實(shí)上笛臣,PySpark命令將pickled 的數(shù)據(jù)傳輸?shù)蕉鄠€(gè)工作節(jié)點(diǎn).

  • 持久化 將程序的狀態(tài)數(shù)據(jù)保存到硬盤或內(nèi)存云稚,因而在離開或重啟時(shí)繼續(xù)使用。把一個(gè)Python 對(duì)象從內(nèi)存保存到文件或數(shù)據(jù)庫(kù)沈堡,在以后加載的時(shí)候擁有相同的狀態(tài)静陈。

**調(diào)度 **是在多核或者分布式系統(tǒng)中在網(wǎng)絡(luò)TCP連接上發(fā)送 Python 代碼和數(shù)據(jù).

**緩存 **是將一個(gè)Python 對(duì)象轉(zhuǎn)化為內(nèi)存中的字符串可以作為字典中的關(guān)鍵字. Spark 支持將數(shù)據(jù)放入集群范圍的內(nèi)存緩沖區(qū). 這在數(shù)據(jù)重復(fù)訪問(wèn)時(shí)非常有用,例如查詢一個(gè)小引用的數(shù)據(jù)集或者象 Google PageRank那樣的迭代算法. 緩存是Spark中非常關(guān)鍵的一個(gè)概念诞丽,允許我們將RDDs 存入內(nèi)存或者溢出到硬盤 . 緩存策略的選擇依賴于數(shù)據(jù)的線性程度或者RDD轉(zhuǎn)換的DAG 鲸拥,這樣可以最小化 shuffle 或跨網(wǎng)絡(luò)數(shù)據(jù)交換.Spark 為了獲得更好的性能,需要注意數(shù)據(jù)shuffling. 一個(gè)好的分區(qū)策略和 RDD 緩存, 避免不必要的操作, 可以導(dǎo)致Spark更好的性能.

獲取和存儲(chǔ)數(shù)據(jù)

在深入如MongoDB這樣的數(shù)據(jù)庫(kù)存儲(chǔ)之前僧免,先看一下廣泛使用的文件存儲(chǔ) : CSV和JSON文件存儲(chǔ). 這兩種格式被廣泛使用的主要原因: 可讀性好, 簡(jiǎn)單, 輕度關(guān)聯(lián), 并容易使用.

在CSV中持久化數(shù)據(jù)

CSV 是輕量級(jí)可讀易用的格式. 擁有已分隔的文本列和內(nèi)在表格制式刑赶。Python提供了強(qiáng)健的csv庫(kù)能將 cvs文件序列化為一個(gè)Python的字典. 為了我們的程序方便, 寫了一個(gè) python類來(lái)管理CSV格式中 數(shù)據(jù)的存儲(chǔ),和從CSV中讀取數(shù)據(jù).
看一下 IO_csv 類的代碼. init 部分 實(shí)例化了文件路徑懂衩,文件名和文件后綴(本例中,
.csv):

class  IO_csv(object):

    def  __init__(self,  filepath,  filename,  filesuffix='csv'):
    self.filepath  =  filepath  #  /path/to/file  without  the  /'
    at  the  end
    self.filename  =  filename  #  FILE_NAME
    self.filesuffix  =  filesuffix

該類的存儲(chǔ)方法使用了tuple 和 csv 文件的頭字段作為scheme來(lái)持久化數(shù)據(jù)撞叨。如果csv文件存在,則追加數(shù)據(jù)浊洞,否則創(chuàng)建:

def  save(self,  data,  NTname,  fields):
    #  NTname  =  Name  of  the  NamedTuple
    #  fields  =  header  of  CSV  -  list  of  the  fields  name
    NTuple  =  namedtuple(NTname,  fields)
    
    if  os.path.isfile('{0}/{1}.{2}'.format(self.filepath,  self.
    filename,  self.filesuffix)):
    #  Append  existing  file
    with  open('{0}/{1}.{2}'.format(self.filepath,  self.
    filename,  self.filesuffix),  'ab')  as  f:
    writer  =  csv.writer(f)
    #  writer.writerow(fields)  #  fields  =  header  of  CSV
    writer.writerows([row  for  row  in  map(NTuple._make,
    data)])
    #  list  comprehension  using  map  on  the  NamedTuple._
    make()  iterable  and  the  data  file  to  be  saved
    #  Notice  writer.writerows  and  not  writer.writerow
    (i.e.  list  of  multiple  rows  sent  to  csv  file
    else:
    #  Create  new  file
    with  open('{0}/{1}.{2}'.format(self.filepath,  self.
    filename,  self.filesuffix),  'wb')  as  f:
        writer  =  csv.writer(f)
        writer.writerow(fields)  # fields  =  header  of  CSV  -
        list  of  the  fields  name
        writer.writerows([row  for  row  in  map(NTuple._make,
        data)])
        #   list  comprehension  using  map  on  the  NamedTuple._make()  iterable  and  the  data  file  to  be  saved
        #  Notice  writer.writerows  and  not  writer.writerow
        (i.e.  list  of  multiple  rows  sent  to  csv  file

該類的加載方法使用了tuple 和 csv 文件的頭字段使用一致的schema來(lái)提取數(shù)據(jù)牵敷。 加載方法使用生成器來(lái)提高內(nèi)存的有效性,使用yield 返回:

def  load(self,  NTname,  fields):
    #  NTname  =  Name  of  the  NamedTuple
    #  fields  =  header  of  CSV  -  list  of  the  fields  name
    NTuple  =  namedtuple(NTname,  fields)
    with open('{0}/{1}.{2}'.format(self.filepath,self.filename,self.filesuffix),'rU')  as  f:
        reader  =  csv.reader(f)
        for  row  in  map(NTuple._make,  reader):
            #  Using  map  on  the  NamedTuple._make()  iterable  and  the reader  file  to  be  loaded
            yield  row

我們使用tuple解析tweet保存到csv或者從csv中提取數(shù)據(jù):

fields01  =  ['id',  'created_at',  'user_id',  'user_name',  'tweet_text','url']
Tweet01  =  namedtuple('Tweet01',fields01)

def  parse_tweet(data):
    """
    Parse  a  ``tweet``  from  the  given  response  data.
    """
    return  Tweet01(
        id=data.get('id',  None),
        created_at=data.get('created_at',  None),
        user_id=data.get('user_id',  None),
        user_name=data.get('user_name',  None),
        tweet_text=data.get('tweet_text',  None),
        url=data.get('url')
    )

在 JSON中持久化

JSON 是互聯(lián)網(wǎng)應(yīng)用中使用最廣泛的數(shù)據(jù)格式之一. 所有我們使用的API沛申,Twitter, GitHub, 和Meetup, 都通過(guò)JSON格式發(fā)送數(shù)據(jù). JSON 格式比 XML格式要輕劣领,可讀性好,在JSON 中內(nèi)嵌模式. 對(duì)于CSV 格式, 所有記錄遵從相同的表結(jié)構(gòu)铁材,而JSON 的結(jié)構(gòu)能夠變化尖淘,是半結(jié)構(gòu)化的奕锌,一條JSON 記錄能夠映射成Python中的字典。
看一下 IO_json類的代碼. init 部分例化了文件路徑村生,文件名和文件后綴(本例中,.json):

class  IO_json(object):
    def  __init__(self,  filepath,  filename,  filesuffix='json'):
        self.filepath  =  filepath  #  /path/to/file  without  the  /'
        at  the  end
        self.filename  =  filename  #  FILE_NAME
        self.filesuffix  =  filesuffix
        #  self.file_io  =  os.path.join(dir_name,  .'.join((base_
        filename,  filename_suffix)))

該類的save方法使用utf-8編碼來(lái)保證數(shù)據(jù)讀寫的兼容性惊暴。 如果JSON存在, 則追加數(shù)據(jù)否則創(chuàng)建:

def  save(self,  data):
    if  os.path.isfile('{0}/{1}.{2}'.format(self.filepath,  self.
    filename,  self.filesuffix)):
        #  Append  existing  file
        with  io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'a', encoding='utf-8')  as  f:
            f.write(unicode(json.dumps(data,  ensure_ascii=
            False)))  # In python 3, there is no "unicode"  function
            #  f.write(json.dumps(data,  ensure_ascii=  False))  #
            create  a  \"  escape  char  for  "  in  the  saved  file
            else:
            #  Create  new  file
        with  io.open('{0}/{1}.{2}'.format(self.filepath, self.filename,  self.filesuffix),  'w',  encoding='utf-8') as  f:
            f.write(unicode(json.dumps(data,  ensure_ascii=
            False)))
            #  f.write(json.dumps(data,  ensure_ascii=  False))

這個(gè)類的load 方法返回了讀取的文件 , 獲取json數(shù)據(jù)需要調(diào)用 json.loads函數(shù):

def  load(self):
    with  io.open('{0}/{1}.{2}'.format(self.filepath, self.filename,  self.filesuffix),  encoding='utf-8')  as  f:
        return  f.read()

搭建MongoDB

鑒于存儲(chǔ)所收集信息的重要性趁桃,搭建MongoDB 作為我們的文檔存儲(chǔ)數(shù)據(jù)庫(kù) . 所有采集的信息是 JSON 格式辽话, MongoDB 以 BSON (short for Binary JSON)格式信息, 因此是一個(gè)自然的選擇.

現(xiàn)在完成下列步驟:

? 安裝MongoDB 服務(wù)器和客戶端
? 運(yùn)行MongoDB server
? 運(yùn)行 Mongo client
? 安裝PyMongo driver
? 創(chuàng)建 Python Mongo client

安裝MongoDB服務(wù)器和客戶端

執(zhí)行如下步驟安裝 MongoDB 包:

  1. 使用包管理工具導(dǎo)入公鑰(in our
    case, Ubuntu's apt),命令如下:
sudo  apt-key  adv  --keyserver  hkp://keyserver.ubuntu.com:80  --recv
7F0CEB10  

創(chuàng)建 MongoDB 的文件列表卫病,命令如下. :

echo  "deb  http://repo.mongodb.org/apt/ubuntu  "$("lsb_release
-sc)"/  mongodb-org/3.0  multiverse"  |  sudo  tee  /etc/apt/sources.
list.d/mongodb-org-3.0.list
  

3.升級(jí)本地包的數(shù)據(jù)庫(kù):

sudo  apt-get  update

4.安裝MongoDB 的最新穩(wěn)定版:

sudo  apt-get  install  -y  mongodb-org

運(yùn)行MongoDB服務(wù)器

啟動(dòng)MongoDB server:

  1. 啟動(dòng)MongoDB server, 命令如下:
sudo  service  mongodb  start
  
  1. 檢查mongod 是否正常啟動(dòng):
an@an-VB:/usr/bin$  ps  -ef  |  grep  mongo
mongodb  
967 1 4  07:03   ? 00:02:02  /usr/bin/mongod
--config  /etc/mongod.conf
an 
3143 3085 0  07:45  pts/3 00:00:00  grep  --color=auto
mongo
In    

在本例中油啤,mongodb 運(yùn)行在967進(jìn)程.

  1. The mongod server 監(jiān)聽(tīng)默認(rèn)端口27017 可以在配置文件中修改.
  1. 檢查/var/log/mongod/mongod.log 日志文件的內(nèi)容:
an@an-VB:/var/lib/mongodb$  ls  -lru
  
total  81936
  
drwxr-xr-x  2  mongodb  nogroup 4096  Apr  25  11:19  _tmp 

-rw-r--r--  1  mongodb  nogroup 69  Apr  25  11:19  storage.bson
  

-rwxr-xr-x  1  mongodb  nogroup 5  Apr  25  11:19  mongod.lock
  

-rw-------  1  mongodb  nogroup  16777216  Apr  25  11:19  local.ns
  

-rw-------  1  mongodb  nogroup  67108864  Apr  25  11:19  local.0
  

drwxr-xr-x  2  mongodb  nogroup 4096  Apr  25  11:19  journal
  
  1. 停止mongodb 的服務(wù)器, 命令如下:


sudo  service  mongodb  stop

運(yùn)行Mongo客戶端

在控制臺(tái)運(yùn)行Mongo client 很簡(jiǎn)單,命令如下:

an@an-VB:/usr/bin$  mongo

MongoDB  shell  version:  3.0.2
 
connecting  to:  test
Server  has  startup  warnings:
  
2015-05-30T07:03:49.387+0200  I  CONTROL [initandlisten]  

2015-05-30T07:03:49.388+0200  I  CONTROL [initandlisten]
 

在mongo client console 提示下, 查看數(shù)據(jù)庫(kù)的命令如下:


>  show  dbs


local 0.078GB  


test 0.078GB
  

選擇 test數(shù)據(jù)庫(kù):

>  use  test
  
switched  to  db  test
 

在數(shù)據(jù)庫(kù)中顯示 collections:


>  show  collections


restaurants
  

system.indexes
 

我們查看 restaurant collection 中的紀(jì)錄:


>  db.restaurants.find()


{  "_id"  :  ObjectId("553b70055e82e7b824ae0e6f"),  "address  :  {  "building
:  "1007",  "coord"  :  [  -73.856077,  40.848447  ],  "street  :  "Morris  Park
Ave",  "zipcode  :  "10462  },  "borough  :  "Bronx",  "cuisine  :  "Bakery",
"grades  :  [  {  "grade  :  "A",  "score"  :  2,  "date"  :  ISODate("2014-
03-03T00:00:00Z")  },  {  "date"  :  ISODate("2013-09-11T00:00:00Z"),
"grade  :  "A",  "score"  :  6  },  {  "score"  :  10,  "date"  :  ISODate("2013-
01-24T00:00:00Z"),  "grade  :  "A  },  {  "date"  :  ISODate("2011-11-
23T00:00:00Z"),  "grade  :  "A",  "score"  :  9  },  {  "date"  :  ISODate("2011-
03-10T00:00:00Z"),  "grade  :  "B",  "score"  :  14  }  ],  "name  :  "Morris
Park  Bake  Shop",  "restaurant_id  :  "30075445"  }

安裝PyMongo driver

在anaconda 中安裝mongodb的Python驅(qū)動(dòng)也很簡(jiǎn)單:

conda install pymongo

創(chuàng)建 MongoDB的Python client

我們創(chuàng)建一個(gè) IO_mongo 類用來(lái)收集數(shù)據(jù) 存儲(chǔ)采集的數(shù)據(jù) 提取保存的數(shù)據(jù). 為了創(chuàng)建mongo client, 需要import pymongo. 連接本地端口 27017命令如下:

from  pymongo  import  MongoClient  as  MCli
class  IO_mongo(object):
conn={'host':'localhost',  'ip':'27017'}

我們的類初始化了客戶端連接, 數(shù)據(jù)庫(kù) (本例中, twtr_db),和被訪問(wèn)連接的collection (本例中, twtr_coll):

def  __init__(self,  db='twtr_db',  coll='twtr_coll',  **conn  ):
    #  Connects  to  the  MongoDB  server
    self.client  =  MCli(**conn)
    self.db  =  self.client[db]
    self.coll  =  self.db[coll]

save方法插入新的紀(jì)錄:

def  save(self,  data):
    #Insert  to  collection  in  db
    return  self.coll.insert(data)    

load 方法根據(jù)規(guī)則提取數(shù)據(jù). 在數(shù)據(jù)量大的情況下 返回游標(biāo):

def  load(self,  return_cursor=False,  criteria=None, projection=None):

    if  criteria  is  None:
    criteria  =  {}
    
    if  projection  is  None:
        cursor  =  self.coll.find(criteria)
    else:  
        cursor  =  self.coll.find(criteria,  projection)

    #  Return  a  cursor  for  large  amounts  of  data
    if  return_cursor:
        return  cursor
    else:
        return  [  item  for  item  in  cursor  ]

從Twitter汲取數(shù)據(jù)

每個(gè)社交網(wǎng)絡(luò)都有自己的限制和挑戰(zhàn)蟀苛, 一個(gè)主要的障礙就是強(qiáng)加的流量限制. 在長(zhǎng)連接或重復(fù)執(zhí)行時(shí)要有暫停, 必須要避免重復(fù)數(shù)據(jù).我們重新設(shè)計(jì)了連接程序來(lái)關(guān)注流量限制益咬。

TwitterAPI 類根據(jù)查詢條件來(lái)搜索和采集,我們已經(jīng)添加了如下操作:

?日志能力帜平,使用 Python logging 庫(kù)在程序失敗時(shí)紀(jì)錄錯(cuò)誤和警告

? 使用MongoDB 的持久化能力幽告,象使用 IO_json 操作JSON 文件那樣操作 IO_mongo 類

? API 流量限制和錯(cuò)誤管理能力 , 保證我們彈性調(diào)用 Twitter 而不會(huì)被認(rèn)為是惡意攻擊

步驟如下:

  1. 通過(guò)證書初始化Twitter API 的實(shí)例:
class  TwitterAPI(object):
    """
    TwitterAPI  class  allows  the  Connection  to  Twitter  via  OAuth
    once  you  have  registered  with  Twitter  and  receive  the
    necessary  credentials
    """
    
    def  __init__(self):
        consumer_key  =  'get_your_credentials'
        consumer_secret  =  get  your_credentials'
        access_token  =  'get_your_credentials'
        access_secret  =  'get  your_credentials'
        self.consumer_key  =  consumer_key
        self.consumer_secret  =  consumer_secret
        self.access_token  =  access_token
        self.access_secret  =  access_secret
        self.retries  =  3
        self.auth  =  twitter.oauth.OAuth(access_token,  access_
        secret,  consumer_key,  consumer_secret)
        self.api  =  twitter.Twitter(auth=self.auth)

2 設(shè)置日志等級(jí),初始化 logger:

°   logger.debug(debug  message)
°   logger.info(info  message)
°   logger.warn(warn  message)
°   logger.error(error  message)
°   logger.critical(critical  message)

3設(shè)置日志路徑和內(nèi)容格式:

#  logger  initialisation
appName  =  'twt150530'
self.logger  =  logging.getLogger(appName)
#self.logger.setLevel(logging.DEBUG)
#  create  console  handler  and  set  level  to  debug
logPath  = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data'
fileName  =  appName
fileHandler  =  logging.FileHandler("{0}/{1}.log".
format(logPath,  fileName))
formatter  =  logging.Formatter('%(asctime)s  -  %(name)s  -
%(levelname)s  -  %(message)s')
fileHandler.setFormatter(formatter)
self.logger.addHandler(fileHandler)
self.logger.setLevel(logging.DEBUG)

4.初始化JSON文件的持久化指令:

#  Save  to  JSON  file  initialisation
jsonFpath  =  '/home/an/spark/spark-1.3.0-bin-hadoop2.4/
examples/AN_Spark/data'
jsonFname  =  'twtr15053001'
self.jsonSaver  =  IO_json(jsonFpath,  jsonFname)

5.初始化 MongoDB database 和 collection :

#  Save  to  MongoDB  Intitialisation
self.mongoSaver  =  IO_mongo(db='twtr01_db',  coll='twtr01_
coll')   

6.searchTwitter 方法 根據(jù)指定的查詢條件搜索:

def  searchTwitter(self,  q,  max_res=10,**kwargs):
    search_results  =  self.api.search.tweets(q=q,  count=10,
    **kwargs)
    statuses  =  search_results['statuses']
    max_results  =  min(1000,  max_res)
    
    for  _  in  range(10):
        try:
            next_results  =  search_results['search_metadata']
            ['next_results']
            #  self.logger.info('info'  in  searchTwitter  -  next_
            results:%s'%  next_results[1:])
        except  KeyError  as  e:
            self.logger.error('error' in searchTwitter: %s',%(e))
            break
        
        #  next_results  =  urlparse.parse_qsl(next_results[1:])
        #  python  2.7
        next_results  =  urllib.parse.parse_qsl(next_results[1:])
        #  self.logger.info('info'  in  searchTwitter  -  next_
        results[max_id]:',  next_results[0:])
        kwargs  =  dict(next_results)
        #  self.logger.info('info'  in  searchTwitter  -  next_
        results[max_id]:%s'%  kwargs['max_id'])
        search_results  =  self.api.search.tweets(**kwargs)
        statuses  +=  search_results['statuses']
        self.saveTweets(search_results['statuses'])
        
        if  len(statuses)  >  max_results:
            self.logger.info('info'  in  searchTwitter  -  got  %i tweets  -  max:  %i'  %(len(statuses),  max_results))
            break
    return  statuses

7.saveTweets 方法將所選的tweets 保存為JSON 存入MongoDB:

def  saveTweets(self,  statuses):
    #  Saving  to  JSON  File
    self.jsonSaver.save(statuses)
    
    #  Saving  to  MongoDB
    for  s  in  statuses:
    self.mongoSaver.save(s)

8.parseTweets 方法從Twitter API 提供的大量信息中提取關(guān)鍵的 tweet 信息:

def  parseTweets(self,  statuses):
    return  [  (status['id'],
    status['created_at'],
    status['user']['id'],
    status['user']['name']
    status['text''text'],
    url['expanded_url'])
    for  status  in  statuses
        for  url  in  status['entities']['urls']
    ]

9.getTweets 方法調(diào)用searchTwitter裆甩,保證API 調(diào)用的穩(wěn)定性并重點(diǎn)關(guān)注速率限制冗锁。代碼如下:

def  getTweets(self,  q,    max_res=10):
    """
    Make  a  Twitter  API  call  whilst  managing  rate  limit  and errors.
    """
    def  handleError(e,  wait_period=2,  sleep_when_rate_
    limited=True):
        if  wait_period  >  3600:  #  Seconds
        self.logger.error('Too  many  retries  in  getTweets:
        %s',  %(e))
            raise  e
        if  e.e.code  ==  401:
            self.logger.error('error  401  *  Not  Authorised  *  in
            getTweets:  %s',  %(e))
            return  None
        elif  e.e.code  ==  404:
            self.logger.error('error  404  *  Not  Found  *  in
            getTweets:  %s',  %(e))
            return  None
        elif  e.e.code  ==  429:
            self.logger.error('error  429  *  API  Rate  Limit
            Exceeded  *  in  getTweets:  %s',  %(e))
            if  sleep_when_rate_limited:
                self.logger.error('error  429  *  Retrying  in  15
                minutes  *  in  getTweets:  %s',  %(e))
                sys.stderr.flush()
                time.sleep(60*15  +  5)
                self.logger.info('error  429  *  Retrying  now  *
                in  getTweets:  %s',  %(e))
                return  2
            else:
                raise  e  # Caller must handle  the  rate  limiting issue
        elif  e.e.code  in  (500,  502,  503,  504):
            self.logger.info('Encountered  %i  Error.  Retrying
            in  %i  seconds'  %  (e.e.code,  wait_period))
            time.sleep(wait_period)
            wait_period  *=  1.5
            return  wait_period
        else:
            self.logger.error('Exit  -  aborting  -  %s',  %(e))
            raise  e

10.根據(jù)指定的參數(shù)查詢調(diào)用searchTwitter API . 如果遇到了任何錯(cuò)誤, 由handleError 方法處理:

while  True:
    try:
        self.searchTwitter(  q,  max_res=10)
    except  twitter.api.TwitterHTTPError  as  e:
        error_count  =  0
        wait_period  =  handleError(e,  wait_period)
        if  wait_period  is  None:
            return

使用Blaze探索數(shù)據(jù)

Blaze是個(gè)由Continuum.io,開發(fā)的 Python庫(kù) ,利用了 Python Numpy arrays 和 Pandas dataframe. Blaze 擴(kuò)展到多核計(jì)算, 而Pandas 和 Numpy 是單核的.

Blaze 為各種后端提供了統(tǒng)一適配的一致性用戶接口. Blaze 精心安排了:

? Data: 不同數(shù)據(jù)存儲(chǔ)的無(wú)縫交換如 CSV, JSON, HDF5,
HDFS, 和 Bcolz 文件
? Computation: 對(duì)不同的后端采用同樣的查詢方式如 Spark, MongoDB, Pandas, or SQL Alchemy.

? Symbolic expressions: 在一定范圍內(nèi)使用了與Pandas類似的語(yǔ)法來(lái)抽象表達(dá) join, group-by, filter,
selection, 和注入嗤栓,參考R語(yǔ)言實(shí)現(xiàn)了
split-apply-combine 方法.
Blaze 表達(dá)式 和Spark RDD 數(shù)據(jù)轉(zhuǎn)換一致冻河,采用延遲計(jì)算.

深入 Blaze首先要引入所需的庫(kù): numpy, pandas,
blaze 和 odo. Odo 是 Blaze的一個(gè)派生品保證了各種數(shù)據(jù)后端的數(shù)據(jù)移植,命令如下:

import  numpy  as  np

import  pandas  as  pd
 

from  blaze  import  Data,  by,  join,  merge 


from  odo  import  odo
 


BokehJS  successfully  loaded.

讀取存儲(chǔ)在CSV文件中解析過(guò)的tweets 生成Pandas Dataframe:

twts_csv:
twts_pd_df  =  pd.DataFrame(twts_csv_read,  columns=Tweet01._fields)

twts_pd_df.head()


Out[65]:
id created_at user_id user_name tweet_text url

1 598831111406510082 2015-05-14  12:43:57 14755521
raulsaeztapia RT  @pacoid:  Great  recap  of  @StrataConf  EU  in  L...
http://www.mango-solutions.com/wp/2015/05/the-...

2 598831111406510082 2015-05-14  12:43:57 14755521
raulsaeztapia RT  @pacoid:  Great  recap  of  @StrataConf  EU  in  L...
http://www.mango-solutions.com/wp/2015/05/the-...

3 98808944719593472 2015-05-14  11:15:52 14755521
raulsaeztapia RT  @alvaroagea:  Simply  @ApacheSpark  http://t.c...
http://www.webex.com/ciscospark/

4 598808944719593472 2015-05-14  11:15:52 14755521
raulsaeztapia RT  @alvaroagea:  Simply  @ApacheSpark  http://t.c...
http://sparkjava.com/

運(yùn)行Tweets Panda Dataframe 的 describe() 函數(shù) 獲得數(shù)據(jù)集中的信心:

twts_pd_df.describe()
Out[66]:
id  created_at  user_id user_name   tweet_text  url
count   19  19  19  19  19  19
unique  7   7   6   6   6   7
top 598808944719593472  2015-05-14  11:15:52    14755521
raulsaeztapia   RT  @alvaroagea:  Simply  @ApacheSpark  http://t.c...
http://bit.ly/1Hfd0Xm
freq    6   6   9   9   6   6

簡(jiǎn)單的調(diào)用Data() 函數(shù)將Pandas dataframe 轉(zhuǎn)化為一個(gè) Blaze dataframe:

#
#  Blaze  dataframe
#
twts_bz_df  =  Data(twts_pd_df)

通過(guò)傳遞schema 函數(shù)提取一個(gè) Blaze dataframe 的schema 表達(dá):

twts_bz_df.schema
Out[73]:
dshape("""{
id:  ?string,
created_at:  ?string,
user_id:  ?string,
user_name:  ?string,
tweet_text:  ?string,
url:  ?string
}""")

.dshape 函數(shù)給出一條記錄和schema:

twts_bz_df.dshape
Out[74]:
dshape("""19  *  {
    id:  ?string,
    created_at:  ?string,
    user_id:  ?string,
    user_name:  ?string,
    tweet_text:  ?string,
    url:  ?string
    }""")

打印Blaze dataframe 的內(nèi)容:

twts_bz_df.data
Out[75]:
id  created_at  user_id user_name   tweet_text  url
1   598831111406510082  2015-05-14  12:43:57    14755521
raulsaeztapia   RT  @pacoid:  Great  recap  of  @StrataConf  EU  in  L...
http://www.mango-solutions.com/wp/2015/05/the-...
2   598831111406510082  2015-05-14  12:43:57    14755521
raulsaeztapia   RT  @pacoid:  Great  recap  of  @StrataConf  EU  in  L...
http://www.mango-solutions.com/wp/2015/05/the-...
...
18  598782970082807808  2015-05-14  09:32:39    1377652806
embeddedcomputer.nl RT  @BigDataTechCon:  Moving  Rating
Prediction  w...    http://buff.ly/1QBpk8J
19  598777933730160640  2015-05-14  09:12:38    294862170   Ellen
Friedman    I'm  still  on  Euro  time.  If  you  are  too  check  o...
http://bit.ly/1Hfd0Xm

提取 tweet_text 字段茉帅,獲得唯一的值:

twts_bz_df.tweet_text.distinct()
Out[76]:
tweet_text
0   RT  @pacoid:  Great  recap  of  @StrataConf  EU  in  L...
1   RT  @alvaroagea:  Simply  @ApacheSpark  http://t.c...
2   RT  @PrabhaGana:  What  exactly  is  @ApacheSpark  a...
3   RT  @Ellen_Friedman:  I'm  still  on  Euro  time.  If...
4   RT  @BigDataTechCon:  Moving  Rating  Prediction  w...
5   I'm  still  on  Euro  time.  If  you  are  too  check  o...

從dataframe 中提取了多個(gè)字段 ['id', 'user_name','tweet_text'] 并計(jì)算唯一的記錄:

twts_bz_df[['id',  'user_name','tweet_text']].distinct()
Out[78]:
id  user_name   tweet_text
0   598831111406510082  raulsaeztapia   RT  @pacoid:  Great  recap  of  @
StrataConf  EU  in  L...
1   598808944719593472  raulsaeztapia   RT  @alvaroagea:  Simply  @
ApacheSpark  http://t.c...
2   598796205091500032  John  Humphreys RT  @PrabhaGana:  What  exactly
is  @ApacheSpark  a...
3   598788561127735296  Leonardo  D'Ambrosi RT  @Ellen_Friedman:  I'm
still  on  Euro  time.  If...
4   598785545557438464  Alexey  Kosenkov    RT  @Ellen_Friedman:  I'm
still  on  Euro  time.  If...
5   598782970082807808  embeddedcomputer.nl RT  @BigDataTechCon:
Moving  Rating  Prediction  w...
6   598777933730160640  Ellen  Friedman I'm  still  on  Euro  time.  If
you  are  too  check  o...

使用 Odo傳輸數(shù)據(jù)

Odo 是Blaze的一個(gè)衍生項(xiàng)目. 用于數(shù)據(jù)交換芋绸,保證了各種不同格式數(shù)據(jù)間的移植 (CSV, JSON, HDFS, and more) 并且跨越不同的數(shù)據(jù)庫(kù) (SQL 數(shù)據(jù)庫(kù), MongoDB, 等等) ,用法簡(jiǎn)單担敌,Odo(source, target)
為了 傳輸?shù)揭粋€(gè)數(shù)據(jù)庫(kù),需要指定URL地址. 例如,
MongoDB , 用法如下:


mongodb://username:password@hostname:port/database_name::collection_name

  

使用Odo 運(yùn)行一些例子廷蓉,這里通過(guò)讀取CSV文件并創(chuàng)建一個(gè) Blaze dataframe來(lái)展示Odo的用法:

filepath =  csvFpath
  


filename =  csvFname
  


filesuffix  =  csvSuffix
 


twts_odo_df  =  Data('{0}/{1}.{2}'.format(filepath,  filename,
filesuffix))
  

計(jì)算 dataframe中的記錄個(gè)數(shù):


twts_odo_df.count()
  

Out[81]:
  

19
  

顯示dataframe中最初的5條記錄:


twts_odo_df.head(5)
  

Out[82]:
  

id created_at user_id user_name tweet_text url
  

0 598831111406510082 2015-05-14  12:43:57 14755521
raulsaeztapia RT  @pacoid:  Great  recap  of  @StrataConf  EU  in  L...
http://www.mango-solutions.com/wp/2015/05/the-...


2 598808944719593472 2015-05-14  11:15:52 14755521
raulsaeztapia RT  @alvaroagea:  Simply  @ApacheSpark  http://t.c...  http://www.webex.com/ciscospark/  
3   598808944719593472  2015-05-14  11:15:52    14755521
raulsaeztapia   RT  @alvaroagea:  Simply  @ApacheSpark  http://t.c...
http://sparkjava.com/
4   598808944719593472  2015-05-14  11:15:52    14755521
raulsaeztapia   RT  @alvaroagea:  Simply  @ApacheSpark  http://t.c...
https://www.sparkfun.com/

從dataframe 中獲得 dshape 的信息 , 這里得到記錄的個(gè)數(shù)和 schema:

twts_odo_df.dshape
Out[83]:
dshape("var  *  {
    id:  int64,
    created_at:  ?datetime,
    user_id:  int64,
    user_name:  ?string,
    tweet_text:  ?string,
    url:  ?string
    }""")

將處理過(guò)的 Blaze dataframe 存入 JSON:

odo(twts_odo_distinct_df,  '{0}/{1}.{2}'.format(jsonFpath,  jsonFname,
jsonSuffix))
Out[92]:
<odo.backends.json.JSONLines  at  0x7f77f0abfc50>

轉(zhuǎn)換JSON 文件為 CSV 文件:

odo('{0}/{1}.{2}'.format(jsonFpath,  jsonFname,  jsonSuffix),  '{0}/{1}.{2}'.format(csvFpath,  csvFname,  csvSuffix))
Out[94]:
<odo.backends.csv.CSV  at  0x7f77f0abfe10>

使用Spark SQL探索數(shù)據(jù)

Spark SQL 是建立在Spark 核心之上的關(guān)系型查詢引擎. Spark SQL 使用的查詢優(yōu)化叫 Catalyst.

關(guān)系型查詢使用 SQL 或HiveQL 表達(dá)全封,在 JSON, CSV, 和各種數(shù)據(jù)庫(kù)中查詢. Spark SQL 為 RDD 函數(shù)式編程之上的Spark dataframes 提供了完整的聲明式表達(dá).

理解 Spark dataframe

從 @bigdata 而來(lái)的一個(gè)tweet 意味著 Spark SQL和 dataframes都可以使用了,參見(jiàn)圖中下方的各種數(shù)據(jù)源. 在頂部, R作為一個(gè)新的語(yǔ)言在Scala, Java和Python之后將逐步被支持. 最終, Data Frame 機(jī)理遍布在 R, Python, 和 Spark 中.

3-2 Spark SQL

Spark dataframes 從 SchemaRDDs 中產(chǎn)生. 它結(jié)合了 RDD 和可以被Spark 推導(dǎo) schema, 注冊(cè)過(guò)的dataframe 才能被請(qǐng)求桃犬,允許通過(guò)直白的SQL 完成復(fù)雜嵌套的JSON 數(shù)據(jù)查詢刹悴,同時(shí)支持 延遲計(jì)算, lineage,分區(qū),和持久化.

通過(guò) Spark SQL 查詢數(shù)據(jù), 首先要導(dǎo)入 SparkContext 和SQLContex:

from  pyspark  import  SparkConf,  SparkContext
from  pyspark.sql  import  SQLContext,  Row
In  [95]:
sc
Out[95]:
<pyspark.context.SparkContext  at  0x7f7829581890>
In  [96]:
sc.master
Out[96]:
u'local[*]'
''In  [98]:
#  Instantiate  Spark   SQL  context
sqlc  = SQLContext(sc)  

讀取存儲(chǔ)在 Odo中的JSON文件:

twts_sql_df_01  =  sqlc.jsonFile  ("/home/an/spark/spark-1.3.0-bin-
hadoop2.4/examples/AN_Spark/data/twtr15051401_distinct.json")
In  [101]:
twts_sql_df_01.show()
created_at  id  tweet_text  user_id
user_name
2015-05-14T12:43:57Z  598831111406510082  RT  @pacoid:  Great...  14755521
raulsaeztapia
2015-05-14T11:15:52Z  598808944719593472  RT  @alvaroagea:  S...  14755521
raulsaeztapia
2015-05-14T10:25:15Z  598796205091500032  RT  @PrabhaGana:  W...  48695135
John  Humphreys
2015-05-14T09:54:52Z  598788561127735296  RT  @Ellen_Friedma...
2385931712  Leonardo  D'Ambrosi
2015-05-14T09:42:53Z  598785545557438464  RT  @Ellen_Friedma...  461020977
Alexey  Kosenkov
2015-05-14T09:32:39Z  598782970082807808  RT  @BigDataTechCo...
1377652806  embeddedcomputer.nl
2015-05-14T09:12:38Z  598777933730160640  I'm  still  on  Euro...  294862170
Ellen  Friedman

打印 Spark dataframe 的schema:

twts_sql_df_01.printSchema()
root
|--  created_at:  string  (nullable  =  true)
|--  id:  long  (nullable  =  true)
|--  tweet_text:  string  (nullable  =  true)
|--  user_id:  long  (nullable  =  true)
|--  user_name:  string  (nullable  =  true)

從dataframe中選擇 user_name 字段:

twts_sql_df_01.select('user_name').show()
user_name
raulsaeztapia
raulsaeztapia
John  Humphreys
Leonardo  D'Ambrosi
Alexey  Kosenkov
embeddedcomputer.nl
Ellen  Friedman

將 dataframe 注冊(cè)成一個(gè)表 ,在上執(zhí)行一個(gè) SQL 查詢:

twts_sql_df_01.registerAsTable('tweets_01')

可以處理更復(fù)雜的 JSON; 讀取原始的 Twitter JSON 文件:

twts_sql_df_01_selection  =  sqlc.sql("SELECT  *  FROM  tweets_01 WHERE
user_name  =  'raulsaeztapia'")
In  [109]:
twts_sql_df_01_selection.show()
created_at  id  tweet_text  user_id
user_name
2015-05-14T12:43:57Z  598831111406510082  RT  @pacoid:  Great... 14755521
raulsaeztapia
2015-05-14T11:15:52Z  598808944719593472  RT  @alvaroagea:  S... 14755521
raulsaeztapia
Let's  process  some  more  complex  JSON;  we  read  the  original  Twitter  JSON  file:
tweets_sqlc_inf  =  sqlc.jsonFile(infile)
Spark  SQL  is  able  to  infer  the  schema  of  a  complex  nested  JSON  file:
tweets_sqlc_inf.printSchema()
root
|--  contributors:  string  (nullable  =  true)
|--  coordinates:  string  (nullable  =  true)
|--  created_at:  string  (nullable  =  true)
|--  entities:  struct  (nullable  =  true)
|   |--  hashtags:  array  (nullable  =  true)
|   |   |--  element:  struct  (containsNull  =  true)
|   |   |   |--  indices:  array  (nullable  =  true)
|   |   |   |   |--  element:  long  (containsNull  =  true)
|   |   |   |--  text:  string  (nullable  =  true)
|   |--  media:  array  (nullable  =  true)
|   |   |--  element:  struct  (containsNull  =  true)
|   |   |   |--  display_url:  string  (nullable  =  true)
|   |   |   |--  expanded_url:  string  (nullable  =  true)
|   |   |   |--  id:  long  (nullable  =  true)
|   |   |   |--  id_str:  string  (nullable  =  true)
|   |   |   |--  indices:  array  (nullable  =  true)
...     (snip)  ...
|   |--  statuses_count:  long  (nullable  =  true)
|   |--  time_zone:  string  (nullable  =  true)
|   |--  url:  string  (nullable  =  true)
|   |--  utc_offset:  long  (nullable  =  true)
|   |--  verified:  boolean  (nullable  =  true)

從dataframe 選定列中讀取感興趣的關(guān)鍵信息 (本例中攒暇, ['created_at', 'id', 'text', 'user.id', 'user.name', 'entities.urls.expanded_url']):

tweets_extract_sqlc  =  tweets_sqlc_inf[['created_at',  'id',  'text',
'user.id',  'user.name',  'entities.urls.expanded_url']].distinct()
In  [145]:
tweets_extract_sqlc.show()
created_at  id  text    id
name    expanded_url
Thu  May  14  09:32:...  598782970082807808  RT  @BigDataTechCo...
1377652806  embeddedcomputer.nl  ArrayBuffer(http:...
Thu  May  14  12:43:...  598831111406510082  RT  @pacoid:  Great...  14755521
raulsaeztapia   ArrayBuffer(http:...
Thu  May  14  12:18:...  598824733086523393  @rabbitonweb  spea...

...
Thu  May  14  12:28:...  598827171168264192  RT  @baandrzejczak...  20909005
Pawe?  Szulc    ArrayBuffer()

理解Spark SQL query optimizer

在 dataframe 中執(zhí)行SQL 語(yǔ)句:

tweets_extract_sqlc_sel  =  sqlc.sql("SELECT  *  from  Tweets_xtr_001  WHERE
name='raulsaeztapia'")

看一下Spark SQL 執(zhí)行查詢計(jì)劃的細(xì)節(jié):

? 解析

? 分析

? 優(yōu)化

? 物理查詢

查詢計(jì)劃使用了 Spark SQL's Catalyst 優(yōu)化器. 為了沖查詢部分生成編譯過(guò)的字節(jié)碼, Catalyst 優(yōu)化器在物理計(jì)劃評(píng)估后根據(jù)成本執(zhí)行解析和優(yōu)化.

在tweet中的解釋:

3-3 Spark 查詢優(yōu)化器

回顧一下代碼, 在執(zhí)行Spark SQL 查詢時(shí)調(diào)用 .explain 函數(shù), 給出了 Catalyst optimizer之行時(shí)的全部細(xì)節(jié):

tweets_extract_sqlc_sel.explain(extended  =  True)
==  Parsed  Logical  Plan  ==
'Project  [*]
'Filter  ('name  =  raulsaeztapia)'name'    'UnresolvedRelation'  [Tweets_
xtr_001],  None
==  Analyzed  Logical  Plan  ==
Project  [created_at#7,id#12L,text#27,id#80L,name#81,expanded_url#82]
Filter  (name#81  =  raulsaeztapia)
Distinct
Project  [created_at#7,id#12L,text#27,user#29.id  AS  id#80L,user#29.
name  AS  name#81,entities#8.urls.expanded_url  AS  expanded_url#82]
Relation[contributors#5,coordinates#6,created_
at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_
str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_
to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#
18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#2
3L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,us
er#29]  JSONRelation(/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/
AN_Spark/data/twtr15051401.json,1.0,None)
==  Optimized  Logical  Plan  ==
Filter  (name#81  =  raulsaeztapia)
Distinct
Project  [created_at#7,id#12L,text#27,user#29.id  AS  id#80L,user#29.
name  AS  name#81,entities#8.urls.expanded_url  AS  expanded_url#82]
Relation[contributors#5,coordinates#6,created_
at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_
str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_
to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#
18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#2
3L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,us
er#29]  JSONRelation(/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/
AN_Spark/data/twtr15051401.json,1.0,None)
==  Physical  Plan  ==
Filter  (name#81  =  raulsaeztapia)
Distinct  false
Exchange  (HashPartitioning  [created_at#7,id#12L,text#27,id#80L,name#
81,expanded_url#82],  200)
Distinct  true
Project  [created_at#7,id#12L,text#27,user#29.id  AS  id#80L,user#29.
name  AS  name#81,entities#8.urls.expanded_url  AS  expanded_url#82]
PhysicalRDD  [contributors#5,coordinates#6,created_
at#7,entities#8,favorite_count#9L,favorited#10,geo#11,id#12L,id_str#13,in_reply_to_screen_name#14,in_reply_to_status_id#15,in_reply_to_status_id_str#16,in_reply_to_user_id#17L,in_reply_to_user_id_str#
18,lang#19,metadata#20,place#21,possibly_sensitive#22,retweet_count#2
3L,retweeted#24,retweeted_status#25,source#26,text#27,truncated#28,us
er#29],  MapPartitionsRDD[165]  at  map  at  JsonRDD.scala:41
Code  Generation:  false
==  RDD  ==

最后, 這里是查詢的結(jié)果:

tweets_extract_sqlc_sel.show()
created_at  id  text    id
name    expanded_url
Thu  May  14  12:43:...  598831111406510082  RT  @pacoid:  Great...  14755521
raulsaeztapia  ArrayBuffer(http:...
Thu  May  14  11:15:...  598808944719593472  RT  @alvaroagea:  S...  14755521
raulsaeztapia  ArrayBuffer(http:...
In  [148]:

用Spark SQL 加載和處理 CSV files with Spark

使用 Spark 的 spark-csv_2.11:1.2.0 包. 在IPython Notebook 啟動(dòng)PySpark 需要準(zhǔn)確地通過(guò) –packages 指定 spark-csv 的包名:

$  IPYTHON_OPTS='notebook'  /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/
pyspark  --packages  com.databricks:spark-csv_2.11:1.2.0

  

這觸發(fā)了下面的輸出; 可以看到 spark-csv 包使用的所有依賴:

an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark$  IPYTHON_
OPTS='notebook'  /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark
--packages  com.databricks:spark-csv_2.11:1.2.0
...     (snip)  ...
Ivy  Default  Cache  set  to:  /home/an/.ivy2/cache
The  jars  for  the  packages  stored  in:  /home/an/.ivy2/jars
::  loading  settings  ::  url  =  jar:file:/home/an/spark/spark-1.5.0-bin-
hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/
core/settings/ivysettings.xml
com.databricks#spark-csv_2.11  added  as  a  dependency
::  resolving  dependencies  ::  org.apache.spark#spark-submit-parent;1.0
confs:  [default]
found  com.databricks#spark-csv_2.11;1.2.0  in  central
found  org.apache.commons#commons-csv;1.1  in  central
found  com.univocity#univocity-parsers;1.5.1  in  central
::  resolution  report  ::  resolve  835ms  ::  artifacts  dl  48ms
::  modules  in  use:
com.databricks#spark-csv_2.11;1.2.0  from  central  in  [default]
com.univocity#univocity-parsers;1.5.1  from  central  in  [default]
org.apache.commons#commons-csv;1.1  from  central  in  [default]
----------------------------------------------------------------
|   |   modules ||  artifacts   |
|   conf    |  number|  search|dwnlded|evicted||  number|dwnlded|
----------------------------------------------------------------
|   default |   3   |   0   |   0   |   0   ||  3   |   0
----------------------------------------------------------------
::  retrieving  ::  org.apache.spark#spark-submit-parent
confs:  [default]
0  artifacts  copied,  3  already  retrieved  (0kB/45ms)

We are now ready to load our csv file and process it. Let's first import the
SQLContext:

#
#  Read  csv  in  a  Spark  DF
#
sqlContext  =  SQLContext(sc)
spdf_in  =  sqlContext.read.format('com.databricks.spark.csv')\
.options(delimiter=";").
options(header="true")\
.options(header='true').load(csv_
in)

訪問(wèn)從加載的CSV中創(chuàng)建的 dataframe 的schema:

In  [10]:
spdf_in.printSchema()
root
|--  :  string  (nullable  =  true)
|--  id:  string  (nullable  =  true)
|--  created_at:  string  (nullable  =  true)
|--  user_id:  string  (nullable  =  true)
|--  user_name:  string  (nullable  =  true)
|--  tweet_text:  string  (nullable  =  true)

檢查 dataframe的列:

In  [12]:
spdf_in.columns
Out[12]:
['',  'id',  'created_at',  'user_id',  'user_name',  'tweet_text']

審視一下 dataframe 的內(nèi)容:

In  [13]:
spdf_in.show()
+---+------------------+--------------------+----------+--------------
----+--------------------+
|   |   id| created_at| user_id|    user_
name|   tweet_text|
+---+------------------+--------------------+----------+--------------
----+--------------------+
|   0|638830426971181057|Tue  Sep  01  21:46:...|3276255125|    True
Equality|ernestsgantt:  Bey...|
|   1|638830426727911424|Tue  Sep  01  21:46:...|3276255125|    True
Equality|ernestsgantt:  Bey...|
|   2|638830425402556417|Tue  Sep  01  21:46:...|3276255125|    True
Equality|ernestsgantt:  Bey...|
...     (snip)  ...
|  41|638830280988426250|Tue  Sep  01  21:46:...|  951081582|   Jack
Baldwin|RT  @cloudaus:  We  ...|
|  42|638830276626399232|Tue  Sep  01  21:46:...|   6525302|Masayoshi
Nakamura|PynamoDB使使使使使使使    |
+---+------------------+--------------------+----------+--------------
----+--------------------+
only  showing  top  20  rows

通過(guò)Spark SQL查詢MangoDB

有兩個(gè)方法完成MongoDB 和Spark的交互: 首先是通過(guò) Hadoop MongoDB connector, 第二種的直接訪問(wèn).
第一種方法需要搭建一個(gè) Hadoop 環(huán)境才能從
Hadoop MongoDB connector中完成查詢. onnector
托管在GitHub 上 https://github.com/mongodb/mongo-hadoop/ wiki/Spark-Usage.
MongoDB 發(fā)表了一系列的官方博客描述了真實(shí)的使用場(chǎng)景:

    ? Using  MongoDB  with  Hadoop  &  Spark:  Part  1  -  Introduction  &  Setup  (https://

www.mongodb.com/blog/post/using-mongodb-hadoop-spark-part-1-
introduction-setup)

? Using MongoDB with Hadoop and Spark: Part 2 - Hive Example (https://www.
mongodb.com/blog/post/using-mongodb-hadoop-spark-part-2-hive-
example)

    ? Using  MongoDB  with  Hadoop  &  Spark:  Part  3  -  Spark  Example  &  Key  Takeaways

(https://www.mongodb.com/blog/post/using-mongodb-hadoop-spark-
part-3-spark-example-key-takeaways)

搭建一個(gè)完整的 Hadoop 環(huán)境是個(gè)力氣活. 使用第二種方法土匀,利用Stratio開發(fā)并維護(hù)的spark-mongodb ,這是托管在Spark.packages.org.上的 Stratio spark-mongodb 包.
該包的版本和相關(guān)信息可以從 spark.packages.org :

Releases
Version: 0.10.1 ( 8263c8 | zip | jar ) / Date: 2015-11-18 / License:
Apache-2.0 / Scala version: 2.10
(http://spark-packages.org/package/Stratio/spark-mongodb)

在IPython Notebook 中啟動(dòng)PySpark形用, 同時(shí)準(zhǔn)確指定spark-mongodb 的包名:

$  IPYTHON_OPTS='notebook'  /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/
pyspark  --packages  com.stratio.datasource:spark-mongodb_2.10:0.10.1

這將觸發(fā)下面的輸出; 可以看到 spark-mongodb 包的所有依賴:

an@an-VB:~/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark$  IPYTHON_
OPTS='notebook'  /home/an/spark/spark-1.5.0-bin-hadoop2.6/bin/pyspark
--packages  com.stratio.datasource:spark-mongodb_2.10:0.10.1
...     (snip)  ...
Ivy  Default  Cache  set  to:  /home/an/.ivy2/cache
The  jars  for  the  packages  stored  in:  /home/an/.ivy2/jars
::  loading  settings  ::  url  =  jar:file:/home/an/spark/spark-1.5.0-bin-
hadoop2.6/lib/spark-assembly-1.5.0-hadoop2.6.0.jar!/org/apache/ivy/
core/settings/ivysettings.xml
com.stratio.datasource#spark-mongodb_2.10  added  as  a  dependency
::  resolving  dependencies  ::  org.apache.spark#spark-submit-parent;1.0
confs:  [default]
found  com.stratio.datasource#spark-mongodb_2.10;0.10.1  in  central
[W  22:10:50.910  NotebookApp]  Timeout  waiting  for  kernel_info  reply
from  764081d3-baf9-4978-ad89-7735e6323cb6
found  org.mongodb#casbah-commons_2.10;2.8.0  in  central
found  com.github.nscala-time#nscala-time_2.10;1.0.0  in  central
found  joda-time#joda-time;2.3  in  central
found  org.joda#joda-convert;1.2  in  central
found  org.slf4j#slf4j-api;1.6.0  in  central
found  org.mongodb#mongo-java-driver;2.13.0  in  central
found  org.mongodb#casbah-query_2.10;2.8.0  in  central
found  org.mongodb#casbah-core_2.10;2.8.0  in  central
downloading  https://repo1.maven.org/maven2/com/stratio/datasource/
park-mongodb_2.10/0.10.1/spark-mongodb_2.10-0.10.1.jar...
[SUCCESSFUL  ]  com.stratio.datasource#spark-
mongodb_2.10;0.10.1!spark-mongodb_2.10.jar  (3130ms)
downloading  https://repo1.maven.org/maven2/org/mongodb/casbah-
ommons_2.10/2.8.0/casbah-commons_2.10-2.8.0.jar...
[SUCCESSFUL  ]  org.mongodb#casbah-commons_2.10;2.8.0!casbah-
commons_2.10.jar  (2812ms)
downloading  https://repo1.maven.org/maven2/org/mongodb/casbah-
uery_2.10/2.8.0/casbah-query_2.10-2.8.0.jar...
[SUCCESSFUL  ]  org.mongodb#casbah-query_2.10;2.8.0!casbah-query_2.10.
jar  (1432ms)
downloading  https://repo1.maven.org/maven2/org/mongodb/casbah-
ore_2.10/2.8.0/casbah-core_2.10-2.8.0.jar...
[SUCCESSFUL  ]  org.mongodb#casbah-core_2.10;2.8.0!casbah-core_2.10.
jar  (2785ms)
downloading  https://repo1.maven.org/maven2/com/github/nscala-time/
scala-time_2.10/1.0.0/nscala-time_2.10-1.0.0.jar...
[SUCCESSFUL  ]  com.github.nscala-time#nscala-time_2.10;1.0.0!nscala-
time_2.10.jar  (2725ms)
downloading  https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.6.0/
slf4j-api-1.6.0.jar  ...
[SUCCESSFUL  ]  org.slf4j#slf4j-api;1.6.0!slf4j-api.jar  (371ms)
downloading  https://repo1.maven.org/maven2/org/mongodb/mongo-java-
driver/2.13.0/mongo-java-driver-2.13.0.jar  ...
[SUCCESSFUL  ]  org.mongodb#mongo-java-driver;2.13.0!mongo-java-
driver.jar  (5259ms)
downloading  https://repo1.maven.org/maven2/joda-time/joda-time/2.3/
joda-time-2.3.jar  ...
[SUCCESSFUL  ]  joda-time#joda-time;2.3!joda-time.jar  (6949ms)
downloading  https://repo1.maven.org/maven2/org/joda/joda-convert/1.2/
joda-convert-1.2.jar  ...
[SUCCESSFUL  ]  org.joda#joda-convert;1.2!joda-convert.jar  (548ms)
::  resolution  report  ::  resolve  11850ms  ::  artifacts  dl  26075ms
::  modules  in  use:
com.github.nscala-time#nscala-time_2.10;1.0.0  from  central  in
[default]
com.stratio.datasource#spark-mongodb_2.10;0.10.1  from  central  in
[default]
joda-time#joda-time;2.3  from  central  in  [default]
org.joda#joda-convert;1.2  from  central  in  [default]
org.mongodb#casbah-commons_2.10;2.8.0  from  central  in  [default]
org.mongodb#casbah-core_2.10;2.8.0  from  central  in  [default]
org.mongodb#casbah-query_2.10;2.8.0  from  central  in  [default]
org.mongodb#mongo-java-driver;2.13.0  from  central  in  [default]
org.slf4j#slf4j-api;1.6.0  from  central  in  [default]
-----------------------------------------------------------------
|   |   modules ||  artifacts
|
|   conf    |  number|  search|dwnlded|evicted||
number|dwnlded|
-------------------------------------------------------------------
--
|   default |   9   |   9   |   9   |   0   ||  9   |   9
|
-------------------------------------------------------------------
--
::  retrieving  ::  org.apache.spark#spark-submit-parent
confs:  [default]
9  artifacts  copied,  0  already  retrieved  (2335kB/51ms)
...     (snip)  ...

查詢MongDB的27017端口就轧,從twtr01_db中的collection twtr01_coll讀取數(shù)據(jù)证杭。
首先import SQLContext:

In  [5]:
from  pyspark.sql  import  SQLContext
sqlContext.sql("CREATE  TEMPORARY  TABLE  tweet_table  USING  com.stratio.datasource.mongodb  OPTIONS  (host  'localhost:27017',  database  'twtr01_
db',  collection  'twtr01_coll')")
sqlContext.sql("SELECT  *  FROM  tweet_table  where  id=598830778269769728
").collect()

這里是查詢輸出:

Out[5]:
[Row(text=u'@spark_io  is  now  @particle  -  awesome  news  -  now  I  can enjoy  my  Particle  Cores/Photons  +  @sparkfun  sensors  +  @ApacheSpark analytics  :-)',  _id=u'55aa640fd770871cba74cb88',  contributors=None,
retweeted=False,  user=Row(contributors_enabled=False,  created_at=u'Mon Aug  25  14:01:26  +0000  2008',  default_profile=True,  default_profile_image=False,  description=u'Building  open  source  tools  for  and  teaching
enterprise  software  developers',  entities=Row(description=Row(urls=[]),  url=Row(urls=[Row(url=u'http://t.co/TSHp13EWeu',  indices=[0,22],

...     (snip)  ... 

9],  name=u'Spark  is  Particle',  screen_name=u'spark_io'),
Row(id=487010011,  id_str=u'487010011',  indices=[17,  26],
name=u'Particle',  screen_name=u'particle'),  Row(id=17877351,
id_str=u'17877351',  indices=[88,  97],  name=u'SparkFun
Electronics',  screen_name=u'sparkfun'),  Row(id=1551361069,  id_
str=u'1551361069',  indices=[108,  120],  name=u'Apache  Spark',  screen_name=u'ApacheSpark')]),  is_quote_status=None,  lang=u'en',  quoted_status_id_str=None,  quoted_status_id=None,  created_at=u'Thu  May 14  12:42:37  +0000  2015',  retweeted_status=None,  truncated=False,
place=None,  id=598830778269769728,  in_reply_to_user_id=3187046084,
retweet_count=0,  in_reply_to_status_id=None,  in_reply_to_screen_name=u'spark_io',  in_reply_to_user_id_str=u'3187046084',  source=u'<a   rel="nofollow">Twitter  Web  Client</a>',
id_str=u'598830778269769728',  coordinates=None,  metadata=Row(iso_language_code=u'en',  result_type=u'recent'),  quoted_status=None)]
#

一旦從 Twitter獲得了數(shù)據(jù),就可以使用 Blaze 和 Odo 庫(kù)來(lái)探索信息。 Spark SQL 是交互式數(shù)據(jù)探索妒御,分析和轉(zhuǎn)換的重要模塊解愤,充分利用 Spark dataframe 數(shù)據(jù)結(jié)構(gòu). dataframe 的概念起源于 R,然后被 Python Pandas 成功的采用. dataframe 是數(shù)據(jù)科學(xué)家的坐騎乎莉。

Spark SQL 和dataframe的組合產(chǎn)生了數(shù)據(jù)處理的強(qiáng)大引擎送讲,可以為使用 Spark MLlib完成機(jī)器學(xué)習(xí)進(jìn)而提取結(jié)論間接做好了準(zhǔn)備.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市惋啃,隨后出現(xiàn)的幾起案子哼鬓,更是在濱河造成了極大的恐慌,老刑警劉巖边灭,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件异希,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡存筏,警方通過(guò)查閱死者的電腦和手機(jī)宠互,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)椭坚,“玉大人予跌,你說(shuō)我怎么就攤上這事∩凭ィ” “怎么了券册?”我有些...
    開封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)垂涯。 經(jīng)常有香客問(wèn)我烁焙,道長(zhǎng),這世上最難降的妖魔是什么耕赘? 我笑而不...
    開封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任骄蝇,我火速辦了婚禮,結(jié)果婚禮上操骡,老公的妹妹穿的比我還像新娘九火。我一直安慰自己,他們只是感情好册招,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開白布岔激。 她就那樣靜靜地躺著,像睡著了一般是掰。 火紅的嫁衣襯著肌膚如雪虑鼎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音炫彩,去河邊找鬼匾七。 笑死,一個(gè)胖子當(dāng)著我的面吹牛媒楼,可吹牛的內(nèi)容都是我干的乐尊。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼划址,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼扔嵌!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起夺颤,我...
    開封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤痢缎,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后世澜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體独旷,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年寥裂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嵌洼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡封恰,死狀恐怖麻养,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情诺舔,我是刑警寧澤鳖昌,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站低飒,受9級(jí)特大地震影響许昨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜褥赊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一糕档、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拌喉,春花似錦翼岁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)悉患。三九已至残家,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間售躁,已是汗流浹背坞淮。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工茴晋, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人回窘。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓诺擅,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親啡直。 傳聞我的和親對(duì)象是個(gè)殘疾皇子烁涌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容