十余天沒有學(xué)習(xí)Spark了糊治,不是我在偷懶猜惋,而是前段時(shí)間一直在研究形態(tài)學(xué)算法提取波形的問題⊥璺眨現(xiàn)在算法差不多搞定啦,但是用Python開發(fā)的著摔,有空有能力時(shí)再補(bǔ)上Spark的程序缓窜。還是先來記錄一下分布式矩陣的用法吧~
一般采用分布式矩陣進(jìn)行存儲(chǔ)都在數(shù)據(jù)量非常大的情況下進(jìn)行,處理速度和效率與其存儲(chǔ)格式息息相關(guān)。MLlib提供了四種分布式矩陣存儲(chǔ)形式雹洗,分別為:行矩陣香罐,帶有行索引的行矩陣,坐標(biāo)矩陣和塊矩陣时肿,據(jù)說分塊矩陣并不常用庇茫。
行矩陣
行矩陣以行作為基本的矩陣存儲(chǔ)格式,每一行的內(nèi)容都可以單獨(dú)取出來進(jìn)行操作螃成,列的作用相較小旦签。
帶索引的行矩陣
為了方便在系統(tǒng)調(diào)試的過程中對(duì)行矩陣的內(nèi)容進(jìn)行觀察和顯示,MLlib提供了帶索引的行矩陣寸宏。
坐標(biāo)矩陣
坐標(biāo)矩陣是一種帶有坐標(biāo)標(biāo)記的矩陣宁炫,其中的每一個(gè)具體數(shù)據(jù)都有一組坐標(biāo)進(jìn)行標(biāo)示,類型格式如下:
(x: Long, y: Long, values: Double)
分塊矩陣
顧名思義氮凝,就是將矩陣分塊(好廢話哦)羔巢。分塊矩陣可由帶索引的行矩陣IndexedRowMatrix或坐標(biāo)矩陣CoordinateMatrix調(diào)用toBlockMatrix()方法來進(jìn)行轉(zhuǎn)換。
例程
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.{Matrix, Matrices, Vectors, Vector}
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, IndexedRowMatrix, IndexedRow, CoordinateMatrix, MatrixEntry}
object RDDMatrix {
def main(args: Array[String]): Unit = {
println("--------------------------本地矩陣-------------------------------")
val mx = Matrices.dense(2, 3, Array(1, 2, 3, 4, 5, 6)) // 創(chuàng)建2行3列的本地矩陣罩阵,Matrices.dense是矩陣重組的調(diào)用方法
println(mx)
println("--------------------------分布式行矩陣------------------------------")
val conf = new SparkConf().setAppName("Distributed matrix").setMaster("local")
val sc = new SparkContext(conf)
val path = "F:/ScalaProject/test/collaborativeFilter/src/main/resources/Kmeans.txt"
val rdd = sc.textFile(path).map(_.split(" ").map(_.toDouble)) // 轉(zhuǎn)化成Double類型的向量存儲(chǔ)
val rdd1 = rdd.map(line => Vectors.dense(line)) // 轉(zhuǎn)換成向量存儲(chǔ)
val rm = new RowMatrix(rdd1) // 讀入行矩陣
// 如果打印rm中的具體內(nèi)容竿秆,結(jié)果顯示是數(shù)據(jù)的內(nèi)存地址。這表明RowMatrix只是一個(gè)轉(zhuǎn)化操作稿壁,并不運(yùn)行最終結(jié)果幽钢。
println(rm.numRows()) // 打印行數(shù)
println(rm.numCols()) // 打印列數(shù)
println("--------------------------帶索引的行矩陣---------------------------")
val rdd2 = rdd1.map(vd => new IndexedRow(vd.size, vd)) // 轉(zhuǎn)化格式
val irm = new IndexedRowMatrix(rdd2) // 建立索引行矩陣實(shí)例
println(irm.getClass) // 打印類型
irm.rows.foreach(println)// 打印內(nèi)容數(shù)據(jù)
println("---------------------------坐標(biāo)矩陣--------------------------------")
val rdd3 = rdd.map(vue => (vue(0).toLong, vue(1).toLong, vue(2))). // 轉(zhuǎn)化成坐標(biāo)格式
map(vue2 => new MatrixEntry(vue2._1, vue2._2, vue2._3)) // 轉(zhuǎn)化成坐標(biāo)矩陣格式
// vue(0)和vue(1)分別是行和列坐標(biāo)的坐標(biāo)軸標(biāo)號(hào),vue(2)是具體內(nèi)容
// ._1 和 ._2 是scala語句中元組參數(shù)的序數(shù)專用標(biāo)號(hào)傅是,分別是傳入第二個(gè)和第三個(gè)值
val crm = new CoordinateMatrix(rdd3) // 直接打印CoordinateMatrix實(shí)例的對(duì)象也僅僅是內(nèi)存地址
crm.entries.foreach(println)
println("--------------------------分塊矩陣---------------------------")
// 將坐標(biāo)矩陣轉(zhuǎn)換成2x2的分塊矩陣并存儲(chǔ)匪燕,尺寸通過參數(shù)傳入
val matA = irm.toBlockMatrix(2,2).cache()
// 查看其分塊情況
matA.blocks.collect.foreach(println)
println(matA.numColBlocks)
println(matA.numRowBlocks)
}
}
原始數(shù)據(jù):
1 2 2
1 1 1
1 3 3
2 2 2
3 4 5
4 3 3
2 2 2
4 4 1
運(yùn)行結(jié)果:
--------------------------本地矩陣-------------------------------
1.0 3.0 5.0
2.0 4.0 6.0
--------------------------分布式行矩陣------------------------------
8
3
--------------------------帶索引的行矩陣---------------------------
class org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
IndexedRow(3,[1.0,2.0,2.0])
IndexedRow(3,[1.0,1.0,1.0])
IndexedRow(3,[1.0,3.0,3.0])
IndexedRow(3,[2.0,2.0,2.0])
IndexedRow(3,[3.0,4.0,5.0])
IndexedRow(3,[4.0,3.0,3.0])
IndexedRow(3,[2.0,2.0,2.0])
IndexedRow(3,[4.0,4.0,1.0])
---------------------------坐標(biāo)矩陣--------------------------------
MatrixEntry(1,2,2.0)
MatrixEntry(1,1,1.0)
MatrixEntry(1,3,3.0)
MatrixEntry(2,2,2.0)
MatrixEntry(3,4,5.0)
MatrixEntry(4,3,3.0)
MatrixEntry(2,2,2.0)
MatrixEntry(4,4,1.0)
--------------------------分塊矩陣---------------------------
((1,1),2 x 1 CSCMatrix
(1,0) 19.0)
((1,0),2 x 2 CSCMatrix
(1,0) 18.0
(1,1) 21.0)
2
2
至于分塊矩陣為什么會(huì)輸出這么奇怪的結(jié)果,還沒有研究明白喧笔,等搞明白再補(bǔ)上吧帽驯!