本次以疊加分析為例,介紹Sedona的RDD和SQL兩種空間分析方式,數(shù)據(jù)源選擇Postgresql,是已經(jīng)入好庫的OSM的building數(shù)據(jù)渗鬼,可能有人問那為什么不用Postgis做分析,主要是千萬級的我除了這個(gè)不知道選擇什么數(shù)據(jù)來源……
廢話不多說荧琼,開始干譬胎。
準(zhǔn)備工作
-
Spark-Shell
如果是用Spark-Shell方式,提前把sedona編譯好的包下載下來命锄,然后用下面這個(gè)命令替換下jar包路徑和master地址就可以了(注意自己準(zhǔn)備postgresql的jar包)堰乔。
spark-shell --jars file:///opt/sedona/sedona-core-3.0_2.12-1.1.0-incubating.jar,file:///opt/sedona/sedona-sql-3.0_2.12-1.1.0-incubating.jar,file:///opt/sedona/sedona-python-adapter-3.0_2.12-1.1.0-incubating.jar,file:///opt/sedona/sedona-viz-3.0_2.12-1.1.0-incubating.jar,file:///opt/sedona/postgresql-42.2.24.jar,file:///opt/sedona/jts-core-1.18.0.jar,file:///opt/sedona/geotools-wrapper-1.1.0-25.2.jar --driver-class-path file:///opt/sedona/postgresql-42.2.24.jar --master spark://master:7077
IDE的話,留在下一講整理好工程脐恩,在講
SQL代碼實(shí)現(xiàn)
-
如果是做大數(shù)據(jù)的疊加分析浩考,對PG的表最好加個(gè)索引ID,然后就能利用Spark的并行計(jì)算優(yōu)勢被盈。最簡單的就是加一個(gè)自增序列析孽,然后添加一個(gè)索引
alter table [your_table_name] add column pid bigserial primary key create index [your_index_name] on [your_table_name]("[column_name]")
-
導(dǎo)入包
import org.apache.spark.serializer.KryoSerializer import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator import org.apache.sedona.core.spatialRDD.SpatialRDD import org.apache.sedona.core.utils.SedonaConf import org.apache.sedona.sql.utils.{Adapter, SedonaSQLRegistrator} import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.SparkSession import org.apache.sedona.sql.utils.Adapter import org.locationtech.jts.geom.Geometry import org.locationtech.jts.geom.Envelope import org.apache.log4j.{Level, Logger} import org.apache.sedona.core.enums.{GridType, IndexType} import org.apache.sedona.core.spatialOperator.JoinQuery import org.apache.sedona.core.spatialRDD.{CircleRDD, SpatialRDD} import org.apache.sedona.sql.utils.{Adapter, SedonaSQLRegistrator} import org.apache.sedona.viz.core.{ImageGenerator, RasterOverlayOperator} import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator import org.apache.sedona.viz.utils.ImageType import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.SparkSession import org.locationtech.jts.geom.Geometry import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
-
注冊Sedona ,并設(shè)置一些全局參數(shù)(代碼的sc就是SparkContext,如果是在IDE里只怎,需要自己初始化)
sc.getConf.set("spark.serializer", classOf[KryoSerializer].getName) sc.getConf.set("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName) sc.getConf.set("sedona.join.numpartition", "6000") // 取決于數(shù)據(jù)量大小袜瞬,這個(gè)數(shù)字設(shè)置越大,task就越多身堡,需要的服務(wù)器性能也就越高 sc.getConf.set("sedona.global.index", "true") SedonaSQLRegistrator.registerAll(spark) SedonaVizRegistrator.registerAll(spark)
-
并行讀取PG數(shù)據(jù)
val pgSourceDF = spark.read.format("jdbc").option("url", "jdbc:postgresql://[ip]:[port]/[database]?user=[user]&password=[password]").option("dbtable","[table]").option("numPartitions", [分多少task去讀邓尤,我設(shè)置的1200,決定了你后面空間分析的效率]).option("partitionColumn","pid").option("lowerBound",1L).option("upperBound",[你最后一行數(shù)據(jù)的PID]).option("fetchSize",10000).load() pgSourceDF.createOrReplaceTempView("pgsource")
-
因?yàn)閺腜G讀來的Geom是WKB字段贴谎,我們需要在利用Sedona的SQL函數(shù)轉(zhuǎn)換Geom字段
val pgSourceMapDF = spark.sql("select pid, st_geomfromwkb(geom) as geom from pgsource") pgSourceMapDF.createOrReplaceTempView("pgsource")
-
利用Sedona的SQL函數(shù)進(jìn)行疊加分析(這里做了一個(gè)自己和自己疊加)
val joinDF = spark.sql("select count(*) from pgsource as left, pgsource as right where st_intersects(left.geom, right.geom)") joinDF.count
OK了汞扎,到了這里,其實(shí)疊加主要代碼就實(shí)現(xiàn)了擅这,后面大家可以根據(jù)自己需要進(jìn)行分析澈魄。