本文首發(fā)于 Nebula Graph Community 公眾號
最近我試著搭建了方便大家一鍵試玩的 Nebula Graph 中的 Spark 相關(guān)的項目,今天就把它們整理成文分享給大家授霸。而且,我趟出來了 PySpark 下的 Nebula Spark Connector 的使用方式启涯,后邊也會一并貢獻到文檔里猴抹。
NebulaGraph 的三個 Spark 子項目
我曾經(jīng)圍繞 NebulaGraph 的所有數(shù)據(jù)導(dǎo)入方法畫過一個草圖,其中已經(jīng)包含了 Spark Connector锁荔,Nebula Exchange 的簡單介紹蟀给。在這篇文章中我將它們和另外的 Nebula Algorithm 進行稍微深入的探討。
注:這篇文檔 也很清楚為我們列舉了不同導(dǎo)入工具的選擇阳堕。
TL;DR
- Nebula Spark Connector 是一個 Spark Lib跋理,它能讓 Spark 應(yīng)用程序能夠以
dataframe
的形式從 NebulaGraph 中讀取和寫入圖數(shù)據(jù)。 - Nebula Exchange 建立在 Nebula Spark Connector 之上恬总,作為一個 Spark Lib 同時可以直接被 Spark 提交 JAR 包執(zhí)行的應(yīng)用程序前普,它的設(shè)計目標(biāo)是和 NebulaGraph 交換不同的數(shù)據(jù)源(對于開源版本,它是單向的:寫入壹堰,而對于企業(yè)版本拭卿,它是雙向的)。Nebula Exchange 支持的很多不同類型的數(shù)據(jù)源如:MySQL贱纠、Neo4j峻厚、PostgreSQL、ClickHouse谆焊、Hive 等惠桃。除了直接寫入 NebulaGraph,它還可以選擇生成 SST 文件辖试,并將其注入 NebulaGraph辜王,以便使用 NebulaGraph 集群之外算力幫助排序底層。
- Nebula Algorithm罐孝,建立在 Nebula Spark Connector 和 GraphX 之上呐馆,也是一個Spark Lib 和 Spark 上的應(yīng)用程序,它用來在 NebulaGraph 的圖上運行常用的圖算法(pagerank莲兢,LPA等)摹恰。
Nebula Spark Connector
- 代碼:https://github.com/vesoft-inc/nebula-spark-connector
- 文檔:https://docs.nebula-graph.io/3.1.0/nebula-spark-connector/
- JAR 包:https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/
- 代碼例子:example
NebulaGraph Spark Reader
為了從 NebulaGraph 中讀取數(shù)據(jù),比如讀 vertex怒见,Nebula Spark Connector 將掃描所有帶有給定 TAG 的 Nebula StorageD俗慈,比如這樣表示掃描 player
這個 TAG :withLabel("player")
,我們還可以指定 vertex 的屬性:withReturnCols(List("name", "age"))
遣耍。
指定好所有的讀 TAG 相關(guān)的配置之后闺阱,調(diào)用 spark.read.nebula.loadVerticesToDF
返回得到的就是掃描 NebulaGraph 之后轉(zhuǎn)換為 Dataframe 的圖數(shù)據(jù),像這樣:
def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
寫入的例子我這里不列出舵变,不過酣溃,前邊給出的代碼示例的鏈接里是有更詳細的例子瘦穆,這里值得一提的是,Spark Connector 讀數(shù)據(jù)為了滿足圖分析赊豌、圖計算的大量數(shù)據(jù)場景扛或,和大部分其他客戶端非常不同,它直接繞過了 GraphD碘饼,通過掃描 MetaD 和 StorageD 獲得數(shù)據(jù)熙兔,但是寫入的情況則是通過 GraphD 發(fā)起 nGQL DML 語句寫入的。
接下來我們來做一個上手練習(xí)吧艾恼。
上手 Nebula Spark Connector
先決條件:假設(shè)下面的程序是在一臺有互聯(lián)網(wǎng)連接的 Linux 機器上運行的住涉,最好是預(yù)裝了 Docker 和 Docker-Compose。
拉起環(huán)境
首先钠绍,讓我們用 Nebula-Up 部署基于容器的 NebulaGraph Core v3舆声、Nebula Studio、Nebula Console 和 Spark柳爽、Hadoop 環(huán)境媳握,如果還沒安裝好它也會嘗試為我們安裝 Docker 和 Docker-Compose。
# Install Core with Spark Connector, Nebula Algorithm, Nebula Exchange
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark
你知道嗎 Nebula-UP 可以一鍵裝更多東西磷脯,如果你的環(huán)境配置大一點(比如 8 GB RAM)
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash
可以裝更多東西毙芜,但是請注意 Nebula-UP 不是為生產(chǎn)環(huán)境準(zhǔn)備的。
上述邊腳本執(zhí)行后争拐,讓我們用 Nebula-Console(Nebula Graph 的命令行客戶端)來連接它腋粥。
# Connect to nebula with console
~/.nebula-up/console.sh
# Execute any queryies like
~/.nebula-up/console.sh -e "SHOW HOSTS"
加載一份數(shù)據(jù)進去,并執(zhí)行一個圖查詢:
# Load the sample dataset
~/.nebula-up/load-basketballplayer-dataset.sh
# 等一分鐘左右
# Make a Graph Query the sample dataset
~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'
進入 Spark 環(huán)境
執(zhí)行下面這一行架曹,我們就可以進入到 Spark 環(huán)境:
docker exec -it spark_master_1 bash
如果我們想執(zhí)行編譯隘冲,可以在里邊安裝 mvn
:
docker exec -it spark_master_1 bash
# in the container shell
export MAVEN_VERSION=3.5.4
export MAVEN_HOME=/usr/lib/mvn
export PATH=$MAVEN_HOME/bin:$PATH
wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \
tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \
rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \
mv apache-maven-$MAVEN_VERSION /usr/lib/mvn
跑 Spark Connector 的例子
選項 1(推薦):通過 PySpark
- 進入 PySpark Shell
~/.nebula-up/nebula-pyspark.sh
- 調(diào)用 Nebula Spark Reader
# call Nebula Spark Connector Reader
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()
# show the dataframe with limit of 2
df.show(n=2)
- 返回結(jié)果例子
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Python version 2.7.16 (default, Jan 14 2020 07:22:06)
SparkSession available as 'spark'.
>>> df = spark.read.format(
... "com.vesoft.nebula.connector.NebulaDataSource").option(
... "type", "vertex").option(
... "spaceName", "basketballplayer").option(
... "label", "player").option(
... "returnCols", "name,age").option(
... "metaAddress", "metad0:9559").option(
... "partitionNumber", 1).load()
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows
選項 2:編譯、提交示例 JAR 包
- 先克隆 Spark Connector 和它示例代碼的代碼倉庫绑雄,然后編譯:
注意展辞,我們使用了 master 分支,因為當(dāng)下 master 分支是兼容 3.x 的万牺,一定要保證 spark connector 和數(shù)據(jù)庫內(nèi)核版本是匹配的罗珍,版本對應(yīng)關(guān)系參考代碼倉庫的
README.md
。
cd ~/.nebula-up/nebula-up/spark
git clone https://github.com/vesoft-inc/nebula-spark-connector.git
docker exec -it spark_master_1 bash
cd /root/nebula-spark-connector
- 替換示例項目的代碼
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
- 把如下的代碼粘貼進去脚粟,這里邊我們對前邊加載的圖:
basketballplayer
上做了頂點和邊的讀操作:分別調(diào)用readVertex
和readEdges
覆旱。
package com.vesoft.nebula.examples.connector
import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkReaderExample {
private val LOG = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
sparkConf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
val spark = SparkSession
.builder()
.master("local")
.config(sparkConf)
.getOrCreate()
readVertex(spark)
readEdges(spark)
spark.close()
sys.exit()
}
def readVertex(spark: SparkSession): Unit = {
LOG.info("start to read nebula vertices")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withConenctionRetry(2)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("player")
.withNoColumn(false)
.withReturnCols(List("name", "age"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
vertex.printSchema()
vertex.show(20)
println("vertex count: " + vertex.count())
}
def readEdges(spark: SparkSession): Unit = {
LOG.info("start to read nebula edges")
val config =
NebulaConnectionConfig
.builder()
.withMetaAddress("metad0:9559,metad1:9559,metad2:9559")
.withTimeout(6000)
.withConenctionRetry(2)
.build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("basketballplayer")
.withLabel("follow")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withLimit(10)
.withPartitionNum(10)
.build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
edge.printSchema()
edge.show(20)
println("edge count: " + edge.count())
}
}
- 然后打包成 JAR 包
/usr/lib/mvn/bin/mvn install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
- 最后,把它提交到 Spark 里執(zhí)行:
cd example
/spark/bin/spark-submit --master "local" \
--class com.vesoft.nebula.examples.connector.NebulaSparkReaderExample \
--driver-memory 4g target/example-3.0-SNAPSHOT.jar
# 退出 spark 容器
exit
- 成功之后核无,我們會得到返回結(jié)果:
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s
+---------+------------------+---+
|_vertexId| name|age|
+---------+------------------+---+
|player105| Danny Green| 31|
|player109| Tiago Splitter| 34|
|player111| David West| 38|
|player118| Russell Westbrook| 30|
|player143|Kristaps Porzingis| 23|
|player114| Tracy McGrady| 39|
|player150| Luka Doncic| 20|
|player103| Rudy Gay| 32|
|player113| Dejounte Murray| 29|
|player121| Chris Paul| 33|
|player128| Carmelo Anthony| 34|
|player130| Joel Embiid| 25|
|player136| Steve Nash| 45|
|player108| Boris Diaw| 36|
|player122| DeAndre Jordan| 30|
|player123| Ricky Rubio| 28|
|player139| Marc Gasol| 34|
|player142| Klay Thompson| 29|
|player145| JaVale McGee| 31|
|player102| LaMarcus Aldridge| 33|
+---------+------------------+---+
only showing top 20 rows
22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s
+---------+---------+-----+------+
| _srcId| _dstId|_rank|degree|
+---------+---------+-----+------+
|player105|player100| 0| 70|
|player105|player104| 0| 83|
|player105|player116| 0| 80|
|player109|player100| 0| 80|
|player109|player125| 0| 90|
|player118|player120| 0| 90|
|player118|player131| 0| 90|
|player143|player150| 0| 90|
|player114|player103| 0| 90|
|player114|player115| 0| 90|
|player114|player140| 0| 90|
|player150|player120| 0| 80|
|player150|player137| 0| 90|
|player150|player143| 0| 90|
|player103|player102| 0| 70|
|player113|player100| 0| 99|
|player113|player101| 0| 99|
|player113|player104| 0| 99|
|player113|player105| 0| 99|
|player113|player106| 0| 99|
+---------+---------+-----+------+
only showing top 20 rows
事實上扣唱,在這個代碼倉庫下還有更多的例子,特別是 GraphX 的例子,你可以嘗試自己去探索這部分噪沙。
請注意炼彪,在 GraphX 假定頂點 ID 是數(shù)字類型的,因此對于字符串類型的頂點 ID 情況正歼,需要進行實時轉(zhuǎn)換辐马,請參考 Nebula Algorithom 中的例子,了解如何繞過這一問題局义。
Nebula Exchange
- 代碼:https://github.com/vesoft-inc/nebula-exchange/
- 文檔:https://docs.nebula-graph.com.cn/3.1.0/nebula-exchange/about-exchange/ex-ug-what-is-exchange/
- JAR 包:https://github.com/vesoft-inc/nebula-exchange/releases
- 配置例子: exchange-common/src/test/resources/application.conf
Nebula Exchange 是一個 Spark Lib喜爷,也是一個可以直接提交執(zhí)行的 Spark 應(yīng)用,它被用來從多個數(shù)據(jù)源讀取數(shù)據(jù)寫入 NebulaGraph 或者輸出 Nebula Graph SST 文件旭咽。
[圖片上傳失敗...(image-d06d2e-1658140185739)]
通過 spark-submit 的方式使用 Nebula Exchange 的方法很直接:
- 首先創(chuàng)建配置文件,讓 Exchange 知道應(yīng)該如何獲取和寫入數(shù)據(jù)
- 然后用指定的配置文件調(diào)用 Exchange 包
現(xiàn)在赌厅,讓我們用上一章中創(chuàng)建的相同環(huán)境做一個實際測試穷绵。
一鍵試玩 Exchange
先跑起來看看吧
請參考前邊拉起環(huán)境這一章節(jié),先一鍵裝好環(huán)境特愿。
一鍵執(zhí)行:
~/.nebula-up/nebula-exchange-example.sh
恭喜你仲墨,已經(jīng)第一次執(zhí)行成功一個 Exchange 的數(shù)據(jù)導(dǎo)入任務(wù)啦!
再看看一些細節(jié)
這個例子里揍障,我們實際上是用 Exchange 從 CSV 文件這一其中支持的數(shù)據(jù)源中讀取數(shù)據(jù)寫入 NebulaGraph 集群的目养。這個 CSV 文件中第一列是頂點 ID,第二和第三列是 "姓名 "和 "年齡 "的屬性:
player800,"Foo Bar",23
player801,"Another Name",21
- 咱們可以進到 Spark 環(huán)境里看看
docker exec -it spark_master_1 bash
cd /root
- 可以看到我們提交 Exchange 任務(wù)時候指定的配置文件
exchange.conf
它是一個HOCON
格式的文件:- 在
.nebula
中描述了 NebulaGraph 集群的相關(guān)信息 - 在
.tags
中描述了如何將必填字段對應(yīng)到我們的數(shù)據(jù)源(這里是 CSV 文件)等有關(guān) Vertecies 的信息毒嫡。
- 在
{
# Spark relation config
spark: {
app: {
name: Nebula Exchange
}
master:local
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory: 1G
}
cores:{
max: 16
}
}
# Nebula Graph relation config
nebula: {
address:{
graph:["graphd:9669"]
meta:["metad0:9559", "metad1:9559", "metad2:9559"]
}
user: root
pswd: nebula
space: basketballplayer
# parameters for SST import, not required
path:{
local:"/tmp"
remote:"/sst"
hdfs.namenode: "hdfs://localhost:9000"
}
# nebula client connection parameters
connection {
# socket connect & execute timeout, unit: millisecond
timeout: 30000
}
error: {
# max number of failures, if the number of failures is bigger than max, then exit the application.
max: 32
# failed import job will be recorded in output path
output: /tmp/errors
}
# use google's RateLimiter to limit the requests send to NebulaGraph
rate: {
# the stable throughput of RateLimiter
limit: 1024
# Acquires a permit from RateLimiter, unit: MILLISECONDS
# if it can't be obtained within the specified timeout, then give up the request.
timeout: 1000
}
}
# Processing tags
# There are tag config examples for different dataSources.
tags: [
# HDFS csv
# Import mode is client, just change type.sink to sst if you want to use client import mode.
{
name: player
type: {
source: csv
sink: client
}
path: "file:///root/player.csv"
# if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
fields: [_c1, _c2]
nebula.fields: [name, age]
vertex: {
field:_c0
}
separator: ","
header: false
batch: 256
partition: 32
}
]
}
- 我們應(yīng)該能看到那個 CSV 數(shù)據(jù)源和這個配置文件都在同一個目錄下了:
bash-5.0# ls -l
total 24
drwxrwxr-x 2 1000 1000 4096 Jun 1 04:26 download
-rw-rw-r-- 1 1000 1000 1908 Jun 1 04:23 exchange.conf
-rw-rw-r-- 1 1000 1000 2593 Jun 1 04:23 hadoop.env
drwxrwxr-x 7 1000 1000 4096 Jun 6 03:27 nebula-spark-connector
-rw-rw-r-- 1 1000 1000 51 Jun 1 04:23 player.csv
- 然后癌蚁,實際上我們可以手動再次提交一下這個 Exchange 任務(wù)
/spark/bin/spark-submit --master local \
--class com.vesoft.nebula.exchange.Exchange download/nebula-exchange.jar \
-c exchange.conf
- 部分返回結(jié)果
22/06/06 03:56:26 INFO Exchange$: Processing Tag player
22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2
22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age
22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv
...
22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0
...
更多的數(shù)據(jù)源,請參考文檔和配置的例子兜畸。
關(guān)于 Exchange 輸出 SST 文件的實踐努释,你可以參考文檔和我的舊文 Nebula Exchange SST 2.x實踐指南。
Nebula Algorithm
- 代碼倉庫: https://github.com/vesoft-inc/nebula-algorithm
- 文檔:https://docs.nebula-graph.com.cn/3.1.0/nebula-algorithm/
- JAR 包:https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/
- 示例代碼:example/src/main/scala/com/vesoft/nebula/algorithm
通過 spark-submit 提交任務(wù)
我在這個代碼倉庫里給出了例子咬摇,今天我們借助 Nebula-UP 可以更方便體驗它伐蒂。
參考前邊拉起環(huán)境這一章節(jié),先一鍵裝好環(huán)境肛鹏。
在如上通過 Nebula-UP 的 Spark 模式部署了需要的依賴之后
- 加載 LiveJournal 數(shù)據(jù)集
~/.nebula-up/load-LiveJournal-dataset.sh
- 在 LiveJournal 數(shù)據(jù)集上執(zhí)行一個 PageRank 算法逸邦,結(jié)果輸出到 CSV 文件中
~/.nebula-up/nebula-algo-pagerank-example.sh
- 檢查輸出結(jié)果:
docker exec -it spark_master_1 bash
head /output/part*000.csv
_id,pagerank
637100,0.9268620883822242
108150,1.1855749056722755
957460,0.923720299211093
257320,0.9967932799358413
配置文件解讀
完整文件在這里,這里在扰,我們介紹一下主要的字段:
-
.data
指定了源是 Nebula缕减,表示從集群獲取圖數(shù)據(jù),輸出sink
是csv
芒珠,表示寫到本地文件里烛卧。
data: {
# data source. optional of nebula,csv,json
source: nebula
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
sink: csv
# if your algorithm needs weight
hasWeight: false
}
-
.nebula.read
規(guī)定了讀 NebulaGraph 集群的對應(yīng)關(guān)系,這里是讀取所有 edge type:follow
的邊數(shù)據(jù)為一整張圖
nebula: {
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
read: {
# Nebula metad server address, multiple addresses are split by English comma
metaAddress: "metad0:9559"
# Nebula space
space: livejournal
# Nebula edge types, multiple labels means that data from multiple edges will union together
labels: ["follow"]
# Nebula edge property name for each edge type, this property will be as weight col for algorithm.
# Make sure the weightCols are corresponding to labels.
weightCols: []
}
-
.algorithm
里配置了我們要調(diào)用的算法,和算法的配置
algorithm: {
executeAlgo: pagerank
# PageRank parameter
pagerank: {
maxIter: 10
resetProb: 0.15 # default 0.15
}
作為一個庫在 Spark 中調(diào)用 Nebula Algoritm
請注意另一方面总放,我們可以將 Nebula Algoritm 作為一個庫調(diào)用呈宇,它的好處在于:
對算法的輸出格式有更多的控制/定制功能
可以對非數(shù)字 ID 的情況進行轉(zhuǎn)換,見這里
這里我先不給出例子了局雄,如果大家感興趣可以給 Nebula-UP 提需求甥啄,我也會增加相應(yīng)的例子。
交流圖數(shù)據(jù)庫技術(shù)炬搭?加入 Nebula 交流群請先填寫下你的 Nebula 名片蜈漓,Nebula 小助手會拉你進群~~