GeoSpark計(jì)算某個(gè)區(qū)域的面積:
測試數(shù)據(jù)如下:
10.5,32.11,30.50,60.21,33.50,60.21,10.5,32.11,china1
9.51,30.11,32.50,62.21,34.50,62.21,9.51,30.11,china2
11.5,32.11,31.50,64.21,33.50,64.21,11.5,32.11,china3
10.5,31.16,32.51,63.21,35.51,63.21,10.5,31.16,china4
11.5,32.11,30.50,59.21,33.50,59.21,11.5,32.11,china5
測試代碼如下:
package txt_demo
import GeoSpark.geoRangeQueryForPolygon.createPolygonRDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.datasyslab.geospark.enums.FileDataSplitter
import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator
import org.datasyslab.geospark.spatialRDD.PolygonRDD
import org.datasyslab.geosparksql.utils.{Adapter, GeoSparkSQLRegistrator}
import org.datasyslab.geosparkviz.core.Serde.GeoSparkVizKryoRegistrator
object area_demo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().
setAppName("SpatialRangeQueryApp").setMaster("local[*]").
set("spark.serializer", classOf[KryoSerializer].getName).
set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
implicit val sc = new SparkContext(conf)
var sparkSession = SparkSession.builder()
.master("local[*]") // Delete this if run in cluster mode
.appName("readTestScala") // Change this to a proper name
// Enable GeoSpark custom Kryo serializer
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[GeoSparkVizKryoRegistrator].getName)
.getOrCreate()
GeoSparkSQLRegistrator.registerAll(sparkSession)
val polygonRDD = createPolygonRDD
polygonRDD.rawSpatialRDD.rdd.collect().foreach(println(_))
val spatialDf = Adapter.toDf(polygonRDD,sparkSession)
spatialDf.printSchema()
spatialDf.show()
spatialDf.createOrReplaceTempView("p_view")
val p_view = sparkSession.sql(""" select _c1,ST_GeomFromWKT(p_view.geometry) as area from p_view """)
p_view.show(truncate = false)
p_view.createOrReplaceTempView("area_view")
val areaDf = sparkSession.sql(""" select _c1,ST_Area(area_view.area) from area_view """)
areaDf.show(truncate = false)
}
def createPolygonRDD(implicit sc:SparkContext):PolygonRDD={
val polygonRDDInputLocation = "D:\\idea\\demo_spark\\es_demo\\src\\data\\area.csv"
val polygonRDDStartOffset = 0
val polygonRDDEndOffset = 7
val polygonRDDSplitter = FileDataSplitter.CSV // or use FileDataSplitter.TSV
val carryOtherAttributes = true
val objectRDD = new PolygonRDD(sc, polygonRDDInputLocation, polygonRDDStartOffset, polygonRDDEndOffset, polygonRDDSplitter, carryOtherAttributes)
objectRDD
}
}
測試結(jié)果如下:
+------+------------------+
|_c1 |st_area(area) |
+------+------------------+
|china1|42.150000000000034|
|china2|32.10000000000002 |
|china3|32.099999999999966|
|china4|48.07499999999999 |
|china5|40.650000000000034|
+------+------------------+