GeoMesa Spark
一诞外、Spark JTS
1.1 示例
1.2配置
1.3 地理空間用戶定義的類型和功能
1.4 geojson輸出
1.5 Building
二、Spark Core
2.1 示例
2.2 配置
2.3 簡單功能序列化
2.4 使用
三开瞭、空間RDD提供程序
3.1 Accumulo RDD Provider
3.2 HBase RDD Provider
3.3 FileSystem RDD Provider
3.4 Converter RDD Provider
3.5 GeoTools RDD Provider
四、SparkSQL
4.1 示例
4.2 配置
4.3 使用
4.4 地理空間用戶定義的類型和功能
4.5 內(nèi)存索引
4.6 空間分區(qū)和更快的連接
五虹蓄、sparkSQL函數(shù)
geomesa目前支持spark版本2.2.x痹届、2.3.x或2.4.x。
geomesa spark允許使用存儲在geomesa中的數(shù)據(jù)笛粘、其他geotools數(shù)據(jù)存儲或geomesa轉(zhuǎn)換器庫可讀的文件在apache spark上執(zhí)行作業(yè)趁怔。該庫允許創(chuàng)建spark RDD和數(shù)據(jù)幀湿硝,將spark RDD和數(shù)據(jù)幀寫入geomesa accumulo和其他地理工具數(shù)據(jù)存儲,并使用kryo對簡單功能進行序列化润努。
1关斜、GeoMesa Spark最底層為geomesa-spark-jts模塊
2、geomesa-spark-core模塊是spark core的擴展任连,支持支持geotools的Query蚤吹,生成系列化好的simplefeature類型的rdd
3例诀、geomesa-spark-sql模塊允許使用sql方式進行查詢随抠,會將sql語句轉(zhuǎn)換為Query對象進行查詢
一、Spark JTS
spark JTS模塊提供了一組用戶定義函數(shù)(UDF)和用戶定義類型(UDT)繁涂,這些函數(shù)允許在spark中執(zhí)行SQL查詢拱她,從而對地理空間數(shù)據(jù)類型執(zhí)行地理空間操作。
geomesa spark sql支持基于spark sql模塊中存在的數(shù)據(jù)集/數(shù)據(jù)幀API扔罪,以提供地理空間功能秉沼。這包括自定義地理空間數(shù)據(jù)類型和函數(shù)、從地理工具數(shù)據(jù)存儲創(chuàng)建數(shù)據(jù)幀的能力矿酵,以及改進SQL查詢性能的優(yōu)化唬复。
此功能位于geomesa spark/geomesa spark jts模塊中:
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-jts_2.11</artifactId>
// version, etc.
</dependency>
1
2
3
4
5
1.1 示例
以下是使用用戶定義類型加載數(shù)據(jù)幀的scala示例:
import org.locationtech.jts.geom.import org.apache.spark.sql.types.import org.locationtech.geomesa.spark.jts.
import spark.implicits.
val schema = StructType(Array(
StructField("name",StringType, nullable=false),
StructField("pointText", StringType, nullable=false),
StructField("polygonText", StringType, nullable=false),
StructField("latitude", DoubleType, nullable=false),
StructField("longitude", DoubleType, nullable=false)))
val dataFile = this.getClass.getClassLoader.getResource("jts-example.csv").getPathval df = spark.read
.schema(schema)
.option("sep", "-")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.csv(dataFile)
val alteredDF = df
.withColumn("polygon", st_polygonFromText("latitude", $"longitude"))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
請注意全肮,最初的模式?jīng)]有userdefinedType敞咧,但是在將用戶定義的函數(shù)應用到相應的列之后,我們將得到一個具有地理空間列類型的數(shù)據(jù)框架辜腺。
還可以從地理空間對象列表中構(gòu)建數(shù)據(jù)幀:
import spark.implicits._
val point = new GeometryFactory().createPoint(new Coordinate(3.4, 5.6))
val df = Seq(point).toDF("point")
1
2
1.2配置
若要啟用此行為休建,請導入org.locationtech.geomesa.spark.jts.,創(chuàng)建一個sparksession并調(diào)用.withjts评疗。這將為這些操作注冊UDF和UDT以及一些催化劑優(yōu)化测砂。或者百匆,可以調(diào)用initjts(sqlcontext)砌些。
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.SQLContextimport org.locationtech.geomesa.spark.jts._
val spark: SparkSession = SparkSession.builder() // ... initialize spark session
spark.withJTS
1
2
3
1.3 地理空間用戶定義的類型和功能
Spple JTS模塊需要代表幾何對象的幾個類(如OGC OpenGISS簡單特征訪問通用體系結(jié)構(gòu)規(guī)范和Java拓撲結(jié)構(gòu)套件實現(xiàn)的),并將它們注冊為用戶定義類型(UDT)在SparkSQL中加匈。例如存璃,geometry類注冊為geometryudt。已注冊以下類型:
幾何圖形 點udt 線形UDT 多邊形 多點式
多重刪除 多元論 幾何集合
spark jts還實現(xiàn)了OGC OpenGIS Simple Feature Access SQL選項規(guī)范中描述的作為sparkSQL用戶定義函數(shù)(UDF)的函數(shù)子集矩动。這些功能包括創(chuàng)建幾何圖形有巧、訪問幾何圖形的屬性、將幾何圖形對象鑄造到更具體的子類悲没、以其他格式輸出幾何圖形篮迎、測量幾何圖形之間的空間關(guān)系以及處理幾何圖形男图。
例如,下面的SQL查詢:
select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)
1
使用兩個UDF(st_contains和st_makebbox)查找芝加哥數(shù)據(jù)框中的行甜橱,其中列g(shù)eom包含在指定的邊界框中逊笆。
UDF還公開用于數(shù)據(jù)框架API,這意味著上述示例也可以通過以下代碼實現(xiàn):
import org.locationtech.geomesa.spark.jts._
import spark.implicits. _
chicagoDF.where(st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), $"geom"))
1
2
3
1.4 geojson輸出
spark jts模塊還提供了將數(shù)據(jù)幀導出到geojson字符串的方法岂傲。這允許在許多支持geojson輸入的前端映射庫(如傳單或開放層)中快速可視化數(shù)據(jù)难裆。
要轉(zhuǎn)換數(shù)據(jù)幀,請導入隱式轉(zhuǎn)換并調(diào)用togeojson方法镊掖。
import org.locationtech.geomesa.spark.jts.util.GeoJSONExtensions._
val df : DataFrame = // Some data frame
val geojsonDf = df.toGeoJSON
1
2
3
如果只給出模式乃戈,轉(zhuǎn)換器可以推斷哪個字段保存幾何圖形,但是在多個幾何字段的情況下亩进,它默認為第一個這樣的字段症虑。通過在模式中提供所需幾何體的索引(從0開始),可以覆蓋此行為归薛。例如谍憔,如果所需的幾何圖形是模式的第三個字段,則為df.togeojson(2)主籍。
如果結(jié)果可以存儲在內(nèi)存中习贫,那么可以將其收集到驅(qū)動程序中并寫入文件。如果不是這樣千元,每個執(zhí)行器都可以寫入像hdfs這樣的分布式文件系統(tǒng)苫昌。
val geoJsonString = geojsonDF.collect.mkString("[",",","]")
1
NOTE:為了實現(xiàn)這一點,數(shù)據(jù)幀應該有一個幾何字段诅炉,這意味著它的模式應該有一個結(jié)構(gòu)字段蜡歹,它是本模塊中提供的JTS幾何類型之一。但是涕烧,如果某些行的幾何圖形為空月而,則可以接受。在這種情況下议纯,空值將作為geojson中幾何體的值寫入父款。
1.5 Building
通過以下命令,可以獨立于geomesa構(gòu)建和使用此模塊:
$ mvn install -pl geomesa-spark/geomesa-spark-jts
1
二瞻凤、Spark Core
geomesa spark core用于直接處理geomesa和其他地理空間數(shù)據(jù)存儲中的特征RDD憨攒。
2.1 示例
以下是通過地理空間查詢對geomesa數(shù)據(jù)存儲創(chuàng)建RDD的完整scala示例:
// DataStore params to a hypothetical GeoMesa Accumulo tableval dsParams = Map(
"accumulo.instance.id" -> "instance",
"accumulo.zookeepers" -> "zoo1,zoo2,zoo3",
"accumulo.user" -> "user",
"accumulo.password" -> "*****",
"accumulo.catalog" -> "geomesa_catalog",
"geomesa.security.auths" -> "USER,ADMIN")
// set SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("testSpark")
val sc = SparkContext.getOrCreate(conf)
// create RDD with a geospatial query using GeoMesa functions
val spatialRDDProvider = GeoMesaSpark(dsParams)
val filter = ECQL.toFilter("CONTAINS(POLYGON((0 0, 0 90, 90 90, 90 0, 0 0)), geom)")
val query = new Query("chicago", filter)
val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, dsParams, query)
resultRDD.collect
// Array[org.opengis.feature.simple.SimpleFeature] = Array(
// ScalaSimpleFeature:4, ScalaSimpleFeature:5, ScalaSimpleFeature:6,
// ScalaSimpleFeature:7, ScalaSimpleFeature:9)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2.2 配置
geomesa spark core通過定義一個名為spaceialrddprovider的接口,提供了一個用于訪問spark中地理空間數(shù)據(jù)的API阀参。此接口的不同實現(xiàn)連接到不同的輸入源肝集。這些不同的提供者在下面的用法中有更詳細的描述。
geomesa為幾個jar提供了依賴項蛛壳,以簡化Spark類路徑的設置杏瞻。要在spark中使用這些庫所刀,可以通過–jars選項將適當?shù)年幱癹ar(例如)傳遞給spark submit命令:
--jars file://path/to/geomesa-accumulo-spark-runtime_2.11-$VERSION.jar
1
或者通過筆記本服務器(如Jupyter)中的適當機制傳遞給Spark(請參見部署geomesa spark with Jupyter筆記本)或Zeppelin。
陰影JAR還應提供轉(zhuǎn)換器RDD提供程序和geotools RDD提供程序所需的依賴項捞挥,因此這些JAR也可以簡單地添加到–jar中(盡管在后一種情況下浮创,可能需要額外的JAR來實現(xiàn)訪問的geotools數(shù)據(jù)存儲)。
2.3 簡單功能序列化
要在集群節(jié)點之間序列化SimpleFeatures的RDD砌函,spark必須配置geomesa spark core中提供的kryo序列化注冊器斩披。
Note:在本地模式下運行spark時不需要配置kryo序列化,因為作業(yè)將在單個JVM中執(zhí)行讹俊。
將這兩個條目添加到$spark\u home/conf/spark-defaults.conf(或?qū)⑺鼈冏鳛楱Cconf參數(shù)傳遞給spark submit):
spark.serializer org.apache.spark.serializer.KryoSerializerspark.kryo.registrator
org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator
1
2
或者垦沉,可以在用于創(chuàng)建SparkContext的SparkConf對象中設置這些參數(shù):
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)
1
2
在筆記本服務器中使用Spark時,需要禁用SparkContext的自動創(chuàng)建劣像。
設置配置選項后乡话,幾何空間RDDProvider實現(xiàn)創(chuàng)建的RDD將正確注冊到序列化程序提供程序摧玫。
2.4 使用
geomesa spark core提供的功能的主要入口點是geomesaspark對象:
val spatialRDDProvider = GeoMesaSpark(params)
1
當類路徑中包含適當?shù)腏AR時耳奕,geomesaspark通過SPI加載一個spaceardProvider實現(xiàn)。geomesaspark返回的實現(xiàn)是根據(jù)作為參數(shù)傳遞的參數(shù)選擇的诬像,如下面的scala代碼所示:
// parameters to pass to the SpatialRDDProvider implementation
val params = Map(
"param1" -> "foo",
"param2" -> "bar")
// GeoTools Query; may be used to filter results retrieved from the data store
val query = new Query("foo")
// val query = new Query("foo", ECQL.toFilter("name like 'A%'"))
// get the RDD, using the SparkContext configured as above
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
1
2
3
4
5
6
7
8
9
要保存功能屋群,請使用save()方法:
GeoMesaSpark(params).save(rdd, params, "gdelt")
1
請注意,某些提供程序可能是只讀的坏挠。
三芍躏、空間RDD提供程序
3.1 Accumulo RDD Provider
AccumuloSpatialRDD提供程序是Accumulo數(shù)據(jù)存儲的空間RDD提供程序。核心代碼在geomesa accumulo spark模塊中降狠,并且geomesa accumulo spark運行時模塊中提供帶依賴項(包含執(zhí)行所需的所有依賴項)的帶陰影JAR对竣。
此提供程序可以讀寫geomesa accumulodatastore。配置參數(shù)與傳遞給datastorefinder.getdatastore()的參數(shù)相同榜配。有關(guān)詳細信息否纬,請參閱Accumulo數(shù)據(jù)存儲參數(shù)。
要在geomesa中訪問的功能類型作為傳遞給rdd()方法的查詢的類型名稱傳遞蛋褥。例如临燃,要從geomesa accumulo表中加載gdelt類型特征的RDD:
val params = Map(
"accumulo.instance.id" -> "mycloud",
"accumulo.user" -> "user",
"accumulo.password" -> "password",
"accumulo.zookeepers" -> "zoo1,zoo2,zoo3",
"accumulo.catalog" -> "geomesa")
val query = new Query("gdelt")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
1
2
3
4
5
6
7
8
3.2 HBase RDD Provider
hbaspatialrddprovider是用于hbase數(shù)據(jù)存儲的空間RDD提供程序。核心代碼在geomesa-hbase-spark模塊中烙心,并且geomesa-hbase-spark運行時模塊中提供帶依賴項(包含執(zhí)行所需的所有依賴項)的帶陰影JAR膜廊。
此提供程序可以讀取和寫入geomesa hbaedatastore。配置參數(shù)與傳遞給datastorefinder.getdatastore()的參數(shù)相同淫茵。有關(guān)詳細信息爪瓜,請參閱HBase數(shù)據(jù)存儲參數(shù)。
Note:
連接到HBase通常需要在Spark類路徑上提供hbase-site.xml文件匙瘪。這可以通過指定–jars來完成铆铆。例如:
{VERSION}.jar,file:///usr/lib/hbase/conf/hbase-site.xml
1
或者炬转,您可以在數(shù)據(jù)存儲參數(shù)映射中指定zookee。但是算灸,這可能不適用于每個HBase設置扼劈。
要在geomesa中訪問的功能類型作為傳遞給rdd()方法的查詢的類型名稱傳遞。例如菲驴,要從geomesa hbase表中加載gdelt類型的功能的RDD:
val params = Map("hbase.zookeepers" -> "zoo1,zoo2,zoo3", "hbase.catalog" -> "geomesa")
val query = new Query("gdelt")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
1
2
3
3.3 FileSystem RDD Provider
filesystemrddprovider是用于geomesa文件系統(tǒng)數(shù)據(jù)存儲的空間RDD提供程序荐吵。核心代碼在geomesa fs spark模塊中,并且geomesa fs spark運行時模塊中提供帶依賴項(其中包含執(zhí)行所需的所有依賴項)的帶陰影JAR赊瞬。
此提供程序可以讀取和寫入geomesa文件系統(tǒng)數(shù)據(jù)存儲區(qū)先煎。配置參數(shù)與傳遞給datastorefinder.getdatastore()的參數(shù)相同。有關(guān)詳細信息巧涧,請參閱文件系統(tǒng)數(shù)據(jù)存儲參數(shù)薯蝎。
要在geomesa中訪問的功能類型作為傳遞給rdd()方法的查詢的類型名稱傳遞。例如谤绳,要從S3存儲桶加載gdelt類型功能的RDD:
val params = Map("fs.path" -> "s3a://mybucket/geomesa/datastore")
val query = new Query("gdelt")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
1
2
3
3.4 Converter RDD Provider
變矩器空間RDD提供器由幾何A火花變矩器模塊提供占锯。
ConvertersPatialRDDProvider從一個或多個數(shù)據(jù)文件中讀取幾何轉(zhuǎn)換庫可讀取格式的功能,包括定界和固定寬度的文本缩筛、AVRO消略、JSON和XML文件。它采用以下配置參數(shù):
geomesa.converter-轉(zhuǎn)換器定義為類型安全配置字符串
geomesa.converter.inputs-輸入文件路徑瞎抛,逗號分隔
geomeria.sft-simpleFeatureType艺演,作為規(guī)范字符串、配置字符串或環(huán)境查找名稱
geomesa.sft.name-(可選)simpleFeatureType的名稱
考慮geomesa convert文檔的示例用法部分中描述的示例數(shù)據(jù)桐臊。如果文件example.csv包含示例數(shù)據(jù)胎撤,example.conf包含轉(zhuǎn)換器的類型安全配置文件,則可以使用以下scala代碼將此數(shù)據(jù)加載到RDD中:
val exampleConf = ConfigFactory.load("example.conf").root().render()
val params = Map(
"geomesa.converter" -> exampleConf,
"geomesa.converter.inputs" -> "example.csv",
"geomesa.sft" -> "phrase:String,dtg:Date,geom:Point:srid=4326",
"geomesa.sft.name" -> "example")
val query = new Query("example")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
1
2
3
4
5
6
7
8
也可以通過maven或sbt為公共數(shù)據(jù)源(gdelt断凶、地名等)加載預打包的轉(zhuǎn)換器伤提。有關(guān)詳細信息,請參閱預打包轉(zhuǎn)換器定義懒浮。
警告:
ConvertSpatialRDDProvider是只讀的飘弧,不支持向數(shù)據(jù)文件寫入功能。
3.5 GeoTools RDD Provider
Geotoolsspatialrddprovider由Geometsa Spark Geotools模塊提供砚著。
geotoolspatialrddprovider生成并保存存儲在通用geotools數(shù)據(jù)存儲中的功能的RDD次伶。傳遞的配置參數(shù)與傳遞給datastorefinder.getdatastore()以創(chuàng)建感興趣的數(shù)據(jù)存儲的配置參數(shù)相同,還需要一個名為“geotools”的布爾參數(shù)來指示SPI加載geotoolspatialrddprovider稽穆。例如冠王,geotools contentdatastore教程中描述的csvdatastore采用一個名為“file”的參數(shù)。要將此數(shù)據(jù)存儲與geomesa spark一起使用舌镶,請執(zhí)行以下操作:
val params = Map(
"geotools" -> "true",
"file" -> "locations.csv")
val query = new Query("locations")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
1
2
3
4
5
要在數(shù)據(jù)存儲中訪問的功能類型的名稱將作為傳遞給rdd()方法的查詢的類型名稱傳遞柱彻。在csvdatastore的示例中豪娜,這是作為參數(shù)傳遞的文件名的基名稱。
警告:
不要將geotools rdd提供程序與具有提供程序?qū)崿F(xiàn)的geomesa數(shù)據(jù)存儲一起使用哟楷。上面描述的提供程序提供了額外的優(yōu)化以提高讀寫性能瘤载。
如果數(shù)據(jù)存儲支持,請使用save()方法保存功能:
GeoMesaSpark(params).save(rdd, params, "locations")
1
四卖擅、SparkSQL
geomesa spark sql支持基于spark sql模塊中存在的數(shù)據(jù)集/數(shù)據(jù)幀API鸣奔,以提供地理空間功能。這包括自定義地理空間數(shù)據(jù)類型和函數(shù)惩阶、從地理工具數(shù)據(jù)存儲創(chuàng)建數(shù)據(jù)幀的能力挎狸,以及改進SQL查詢性能的優(yōu)化。
geomesa spark sql代碼由geomesa spark sql模塊提供:
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-sql_2.11</artifactId>
// version, etc.
</dependency>
1
2
3
4
5
4.1 示例
以下是通過sparksql連接到geomesa accumulo的scala示例:
// DataStore params to a hypothetical GeoMesa Accumulo table
val dsParams = Map(
"accumulo.instance.id" -> "instance",
"accumulo.zookeepers" -> "zoo1,zoo2,zoo3",
"accumulo.user" -> "user",
"accumulo.password" -> "*****",
"accumulo.catalog" -> "geomesa_catalog",
"geomesa.security.auths" -> "USER,ADMIN")
// Create SparkSession
val sparkSession = SparkSession.builder()
.appName("testSpark")
.config("spark.sql.crossJoin.enabled", "true")
.master("local[]")
.getOrCreate()
// Create DataFrame using the "geomesa" format
val dataFrame = sparkSession.read
.format("geomesa")
.options(dsParams)
.option("geomesa.feature", "chicago")
.load()dataFrame.createOrReplaceTempView("chicago")
// Query against the "chicago" schema
val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)"
val resultDataFrame = sparkSession.sql(sqlQuery)
resultDataFrame.show
/+-------+------+-----------+--------------------+-----------------+
|fid|arrest|case_number| dtg| geom|
+-------+------+-----------+--------------------+-----------------+
| 4| true| 4|2016-01-04 00:00:...|POINT (76.5 38.5)|
| 5| true| 5|2016-01-05 00:00:...| POINT (77 38)|
| 6| true| 6|2016-01-06 00:00:...| POINT (78 39)|
| 7| true| 7|2016-01-07 00:00:...| POINT (20 20)|
| 9| true| 9|2016-01-09 00:00:...| POINT (50 50)|
+-------+------+-----------+--------------------+-----------------+*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
4.2 配置
由于geomesa sparksql堆棧位于geomesa spark core模塊的頂部断楷,因此類路徑中必須包含一個或多個spaceardprovider實現(xiàn)锨匆。有關(guān)設置Spark類路徑的詳細信息,請參見配置冬筒。
注意:
在大多數(shù)情況下恐锣,在使用sparksql時,不需要設置簡單功能序列化中描述的kryo序列化账千。但是侥蒙,使用geotools RDD提供程序時可能需要這樣做。
如果要將多個數(shù)據(jù)幀連接在一起匀奏,則在創(chuàng)建SparkSession對象時需要添加spark.sql.crossjoin.enabled屬性:
val spark = SparkSession.builder().
// ...
config("spark.sql.crossJoin.enabled", "true").
// ...
getOrCreate()
1
2
3
4
5
警告:
交叉連接可能非常、非常低效学搜。注意確保連接的一組或兩組數(shù)據(jù)非常小娃善,并考慮使用broadcast()方法確保至少一個連接的數(shù)據(jù)幀在內(nèi)存中。
4.3 使用
要使用與特定功能類型對應的數(shù)據(jù)創(chuàng)建geomesa sparksql啟用的數(shù)據(jù)幀瑞佩,請執(zhí)行以下操作:
// dsParams contains the parameters to pass to the data store
val dataFrame = sparkSession.read
.format("geomesa")
.options(dsParams)
.option("geomesa.feature", typeName)
.load()
1
2
3
4
5
6
具體來說聚磺,調(diào)用格式(“geomesa”)注冊geomesa sparksql數(shù)據(jù)源,選項(“geomesa.feature”炬丸,typename)告訴geomesa使用名為typename的功能類型瘫寝。這還注冊在geomesa sparksql中實現(xiàn)的自定義用戶定義類型和函數(shù)。
通過將數(shù)據(jù)幀注冊為臨時視圖稠炬,可以在隨后的SQL調(diào)用中訪問此數(shù)據(jù)幀焕阿。例如:
dataFrame.createOrReplaceTempView("chicago")
1
可以通過別名“chicago”調(diào)用此數(shù)據(jù)幀:
val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)"
val resultDataFrame = sparkSession.sql(sqlQuery)
1
2
也可以通過在Spark會話的sqlContext對象上調(diào)用sqltypes.init()手動注冊用戶定義的類型和函數(shù):
SQLTypes.init(sparkSession.sqlContext)
1
還可以將Spark數(shù)據(jù)框?qū)懭霂缀伪恚渲邪ǎ?/p>
dataFrame.write.format("geomesa").options(dsParams).option("geomesa.feature", "featureName").save()
1
這將自動將數(shù)據(jù)幀的基礎(chǔ)RDD[行]轉(zhuǎn)換為RDD[簡單功能]首启,并并行寫入數(shù)據(jù)存儲暮屡。要使其工作,功能類型featurename必須已經(jīng)存在于數(shù)據(jù)存儲中毅桃。
寫回功能時褒纲,可以通過特殊的_uuu fid_uuuu列指定功能ID:
dataFrame
.withColumn("fid", $"custom_fid")
.write
.format("geomesa")
.options(dsParams)
.option("geomesa.feature", "featureName")
.save
1
2
3
4
5
6
7
4.4 地理空間用戶定義的類型和功能
GeMeSSa SPARKSQL模塊采用幾何對象表示的幾個類(如OGC OpenGISS簡單特征訪問通用體系結(jié)構(gòu)規(guī)范和Java拓撲結(jié)構(gòu)套件實現(xiàn)的)准夷,并將其注冊為用戶定義類型(UDT)在SparkSQL中。例如莺掠,geometry類注冊為geometryudt衫嵌。在geomesa sparksql中,注冊了以下類型:
幾何圖形
點udt
線形UDT
多邊形
多點式
多重刪除
多元論
幾何集合
geomesa sparksql還實現(xiàn)了OGC OpenGIS Simple Feature Access SQL選項規(guī)范中描述的作為sparksql用戶定義函數(shù)(UDF)的函數(shù)子集彻秆。這些功能包括創(chuàng)建幾何圖形渐扮、訪問幾何圖形的屬性、將幾何圖形對象鑄造到更具體的子類掖棉、以其他格式輸出幾何圖形墓律、測量幾何圖形之間的空間關(guān)系以及處理幾何圖形。
例如幔亥,下面的SQL查詢:
select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)
1
使用兩個UDF(st_contains和st_makebbox)查找芝加哥數(shù)據(jù)框中的行耻讽,其中列g(shù)eom包含在指定的邊界框中。
4.5 內(nèi)存索引
如果您的數(shù)據(jù)足夠小帕棉,可以放入執(zhí)行器的內(nèi)存中针肥,那么您可以告訴geomesa sparksql在內(nèi)存中持久化RDD,并利用cqengine作為內(nèi)存索引數(shù)據(jù)存儲香伴。為此慰枕,請在創(chuàng)建數(shù)據(jù)幀時添加option(“cache”, “true”)。這將對每個屬性(不包括FID和幾何體)放置索引即纲。要基于幾何圖形進行索引具帮,請?zhí)砑觨ption(“indexGeom”, “true”)。對此關(guān)系的查詢將自動命中緩存的RDD低斋,并查詢位于每個分區(qū)上的內(nèi)存中數(shù)據(jù)存儲蜂厅,這可能會顯著加快速度。
考慮到您對數(shù)據(jù)的一些了解膊畴,還可以通過應用初始查詢來確保數(shù)據(jù)適合內(nèi)存掘猿。這可以通過query來完成。例如唇跨,
option("query", "dtg AFTER 2016-12-31T23:59:59Z")
1
4.6 空間分區(qū)和更快的連接
還可以通過對數(shù)據(jù)進行空間分區(qū)來實現(xiàn)額外的加速稠通。添加option(“spatial”, “true”)將確保空間上彼此相鄰的數(shù)據(jù)將放置在同一分區(qū)上买猖。默認情況下改橘,您的數(shù)據(jù)將被分區(qū)到一個NXN網(wǎng)格中,但總共有4個分區(qū)策略政勃,每個分區(qū)策略都可以用option(“strategy”, strategyName)的名稱指定唧龄。
相等-計算數(shù)據(jù)的邊界并將其劃分為大小相等的nxn網(wǎng)格,其中N = sqrt(numPartitions)
加權(quán)類似于相等,但確保沿每個軸的數(shù)據(jù)在每個網(wǎng)格單元中的比例相等既棺。
像地球一樣平等讽挟,但使用整個地球作為邊界,而不是根據(jù)數(shù)據(jù)計算它們丸冕。
r tree-基于數(shù)據(jù)樣本構(gòu)造一個R樹耽梅,并使用邊界矩形的子集作為分區(qū)信封。
空間分區(qū)的優(yōu)點有兩個方面:
1)具有完全位于一個分區(qū)中的空間謂詞的查詢可以直接轉(zhuǎn)到該分區(qū)胖烛,跳過掃描分區(qū)的開銷眼姐,這些分區(qū)肯定不會包含所需的數(shù)據(jù)。
2)如果兩個數(shù)據(jù)集按相同的方案進行分區(qū)佩番,導致兩個關(guān)系的分區(qū)信封相同众旗,那么空間聯(lián)接可以使用分區(qū)信封作為聯(lián)接中的鍵。這大大減少了完成連接所需的比較數(shù)量趟畏。
其他數(shù)據(jù)幀選項允許更好地控制分區(qū)的創(chuàng)建方式贡歧。對于需要數(shù)據(jù)樣本(加權(quán)和rtree)的策略,可以使用sampleSize和threshold乘數(shù)來控制決策過程中使用了多少基礎(chǔ)數(shù)據(jù)赋秀,以及rtree信封中允許多少項利朵。
其他有用的選項如下:
option(“partitions”, “n”)—指定基礎(chǔ)RDD的分區(qū)數(shù)(覆蓋默認并行度)
option(“bounds”, “POLYGON in WellKnownText”)—限制WEIGHTED和EQUAL策略使用的網(wǎng)格的邊界。所有不在這些邊界內(nèi)的數(shù)據(jù)都將放置在單獨的分區(qū)中猎莲。
option(“cover”, “true”)—由于只有相等和地球分割策略才能保證跨關(guān)系的分區(qū)信封是相同的绍弟,因此具有此選項集的數(shù)據(jù)幀將強制與其連接的數(shù)據(jù)幀的分區(qū)方案與其自身匹配。
五著洼、sparkSQL函數(shù)
下面是由geomesa spark sql模塊定義的空間sparksql用戶定義函數(shù)的列表樟遣。
https://www.geomesa.org/documentation/user/spark/sparksql_functions.html
————————————————
版權(quán)聲明:本文為CSDN博主「愛是與世界平行」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議郭脂,轉(zhuǎn)載請附上原文出處鏈接及本聲明年碘。
原文鏈接:https://blog.csdn.net/An1090239782/article/details/95352104