GeoSpark源碼解析(三)
本節(jié)我們在來看一個SpatialRDD
的成員indexedRawRDD
public class SpatialRDD<T extends Geometry>
implements Serializable
{
/**
* The raw spatial RDD.
*/
public JavaRDD<T> rawSpatialRDD;
/**
* The spatial partitioned RDD.
*/
public JavaRDD<T> spatialPartitionedRDD;
...
}}
分區(qū)可以說是Spark的一個重要特性怨酝,幸運的是傀缩,GeoSpark自定義了分區(qū)策略,以支持空間對象分區(qū)农猬。rawSpatialRDD
和spatialPartitionedRDD
的區(qū)別就是spatialPartitionedRDD
保存的是rawSpatialRDD
分區(qū)后的RDD赡艰。我們來看下GeoSpark是如何實現(xiàn)自定義分區(qū)策略的。
我們首先從SpatialRDD
的spatialPartitioning
方法看斤葱,這里首先要傳入一個SpatialPartitioner
對象慷垮。
public void spatialPartitioning(SpatialPartitioner partitioner)
{
this.partitioner = partitioner;
this.spatialPartitionedRDD = partition(partitioner);
}
SpatialPartitioner
是一個抽象類揖闸,繼承了Spark中的Partitioner
方法,可以看到料身,若想自定義分區(qū)策略汤纸,那么只需要實現(xiàn)這兩個函數(shù),第一個函數(shù)是告訴Spark要分成多少區(qū)芹血,第二個函數(shù)是將對象與分區(qū)ID對應起來贮泞。
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
GeoSpark實現(xiàn)了三種分區(qū)策略,分別是QuadTreePartitioner
,KDBTreePartitioner
,FlatGridPartitioner
幔烛。在選定分區(qū)策略后啃擦,Geospark就開始調(diào)用private JavaRDD<T> partition(final SpatialPartitioner partitioner)
方法來進行分區(qū),它是一個私有方法饿悬,我們來看他的實現(xiàn)(這里截取了實現(xiàn)的上半部分)
private JavaRDD<T> partition(final SpatialPartitioner partitioner)
{
return this.rawSpatialRDD.flatMapToPair(
new PairFlatMapFunction<T, Integer, T>()
{
@Override
public Iterator<Tuple2<Integer, T>> call(T spatialObject)
throws Exception
{
return partitioner.placeObject(spatialObject);
}
}
).partitionBy(partitioner)
...
}
因為Spark的paritionBy
需要一個PairRDD(實際上令蛉,Spark的paritionBy
函數(shù)也就是將PariRDD的第一個值傳給partitioner
獲得一個分區(qū)ID),所以GeoSpark就先將RDD轉(zhuǎn)為PairRDD狡恬,這里注意placeObject
這個方法
@Override
public <T extends Geometry> Iterator<Tuple2<Integer, T>> placeObject(T spatialObject)
throws Exception
{
Objects.requireNonNull(spatialObject, "spatialObject");
final int overflowContainerID = grids.size();
final Envelope envelope = spatialObject.getEnvelopeInternal();
Set<Tuple2<Integer, T>> result = new HashSet();
boolean containFlag = false;
for (int i = 0; i < grids.size(); i++) {
final Envelope grid = grids.get(i);
if (grid.covers(envelope)) {
result.add(new Tuple2(i, spatialObject));
containFlag = true;
}
else if (grid.intersects(envelope) || envelope.covers(grid)) {
result.add(new Tuple2<>(i, spatialObject));
}
}
if (!containFlag) {
result.add(new Tuple2<>(overflowContainerID, spatialObject));
}
return result.iterator();
}
以第12行為例珠叔,partitioner
會首先建好格網(wǎng),然后對格網(wǎng)進行遍歷弟劲,若這個格網(wǎng)范圍包含或與這個Geometry相交祷安,那就將這格網(wǎng)ID和Geometry構(gòu)造成一個Tuple并返回,這里的格網(wǎng)ID就是分區(qū)ID了函卒。
然后再將PairRDD轉(zhuǎn)為RDD辆憔,就完成了分區(qū)操作撇眯。
那這里有個問題报嵌,就是partitioner
中的格網(wǎng)是如何構(gòu)建的?并且我們常常調(diào)用的是public void spatialPartitioning(GridType gridType, int numPartitions)
這個方法熊榛,那GeoSpark是如何根據(jù)GridType和numPartitions構(gòu)建格網(wǎng)呢锚国? 我們下節(jié)再來分析。