學(xué)習(xí)之前需要了解幾個(gè)內(nèi)容:
1.Scala目前的地位
大數(shù)據(jù)而生
2.Scala可以做什么
spark底層代碼是scala編寫,所以可以編寫spark
3.Scala與java的區(qū)別
Scala是一個(gè)函數(shù)式語(yǔ)言奸鬓,在某種意義上來講所有函數(shù)都是數(shù)值歧蕉。
開始學(xué)習(xí)
一.Scala基礎(chǔ)
1.需要先集成scala環(huán)境
如何用Intellij IDEA寫scala 搭建scala開發(fā)環(huán)境_百度經(jīng)驗(yàn)
2.直接上代碼了(必須有c或者java基礎(chǔ))
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
object Hello {
? def main(args:Array[String]):Unit = {
? ? println("hehe")
? ? val x = 10
? ? val result = {
? ? ? if (x>5)
? ? ? ? {1}
? ? ? else
? ? ? ? {
? ? ? ? ? "haha"
? ? ? ? }
}
? ? println(result)
}
? for(i <-1 to 3 ; j <-1 to 3 ) {
? ? println("i=" +i +"j=" +j)
}
? val v = for( i <-1 to 10 ) yield i *10
? println(v)
? val s = Array(1,2,3)
? val t = for(i <-s ) yield i *10
? for(k <-0 until t.length)
? println(t(k));
? def m1(x:Int,y:Int) :Int =? x *y
? println(m1(8,9))
? val r = 1 to 10
? val f1 = (x:Int) =>x *10
? val f2 = (y:Int) =>y *10
? val w = r.map(f1)
? for(i <-0 until w.length )
? print(w(i))
? val u =? r.map(_ *100)
? for(i <-0 until w.length )
? ? print(u(i))
? print(u.toBuffer)
? val? func :Int =>String = { y =>y.toString }
? println(func(10))
? val reverse :(Int,Double) =>(Double,Int) = { (x,y) =>(y,x)}
? print(reverse(3,3.01))
? // 神奇的下劃線
? val m2 = m1 _
print(m2(3,4));
? val arr3 = Array(1,2,3)
? val arr4 = for(i <-arr3) yield i*10
? print(arr4.toBuffer)
? val ab = ArrayBuffer[Int]()
? ab += 1
? ab ++= Array(2,3)
? ab.insert(0,0)
? ab += 4
? print(ab)
? val ac = ab.filter( _ %2 ==0 ).map(_ *10).sortWith(_>_)
? print(ac)
? val map1 = Map("i" -> 1)
? val map2 = Map("j" -> 2)
? map1("k") = 3
? map1 += ("o" -> 4)
? map2("j")=10
? println("map:"+map1.toBuffer)
? println("map:"+map2.toBuffer)
? //元組從1開始
? val vs = (1,"spark",3.0)
? print(vs._2)
? val pa = ("l",3)
? map2 += pa
? map2 += (("h",10),("g",11))
? println("map:"+map2.toBuffer)
? val list1? = List(1,2.3)
? val list2? =? 0 :: list1
? println(list1)
? println(list2)
}
二.spark之一個(gè)RDD
import org.apache.spark.{SparkConf, SparkContext}
object HigthRdd {
? def main(args:Array[String]):Unit = {
? ? val conf = new SparkConf().setAppName("high-rdd")
? ? val sc = new SparkContext(conf)
? ? /*
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val func = (index: Int, iter: Iterator[(Int)]) => {
iter.toList.map(x => "[partID:" +? index + ", val: " + x + "]").iterator
}
rdd1.mapPartitionsWithIndex(func).collect
*/
? ? def func1(index:Int, iter:Iterator[(Int)]) :Iterator[String] = {
? ? ? iter.toList.map(x =>"[partID:" +index +", val: " +x +"]").iterator
? ? }
? ? val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
? ? val a= rdd1.mapPartitionsWithIndex(func1).collect()
? ? print(a)
}
}
二.spark之worldcount
object Worlds {
? def main(args:Array[String]) {
? ? val words = List("haha hehe heihei heihei", "hehe heihei")
? /* val line = words.map(_.split(" ")).flatten
println(line.toBuffer)
*/
? ? val a1 = words.flatMap(_.split(" "))
? ? println(a1)
? ? val a2 = a1.map((_, 1))
? ? println(a2)
? ? val a3 = a2.groupBy(_._1)
? ? println(a3)
? ? val a4 = a3.map(t =>(t._1, t._2.size))
? ? println(a4)
? ? val a5 = a4.toList.sortBy(_._2).reverse
? ? println(a5)
? /* val aa = words.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).map(t => (t._1, t._2.size)).toList.sortBy(_._2).reverse
println(aa)*/
? ? val aa= words.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map( t =>(t._1,t._2.size)).toList.sortBy(_._2)
? ? println("aa"+aa)
}
}