From c281e7f21753510cb2d00066e691c324b626704b Mon Sep 17 00:00:00 2001 From: jpolchlo Date: Fri, 7 Jun 2019 13:52:28 -0400 Subject: [PATCH] Reorganize to include examples --- build.sbt | 15 ++- ...pache.spark.sql.sources.DataSourceRegister | 0 .../main/resources/microsite/data/menu.yml | 0 .../src}/main/scala/vectorpipe/OSM.scala | 0 .../main/scala/vectorpipe/VectorPipe.scala | 90 ++++++++----- .../vectorpipe/encoders/GTEncoders.scala | 22 ++++ .../vectorpipe/functions/osm/package.scala | 0 .../scala/vectorpipe/functions/package.scala | 0 .../scala/vectorpipe/internal/package.scala | 2 +- .../main/scala/vectorpipe/model/Actions.scala | 0 .../vectorpipe/model/AugmentedDiff.scala | 0 .../main/scala/vectorpipe/model/Change.scala | 0 .../scala/vectorpipe/model/Changeset.scala | 0 .../vectorpipe/model/ChangesetComment.scala | 0 .../model/ElementWithSequence.scala | 0 .../main/scala/vectorpipe/model/Member.scala | 0 .../src}/main/scala/vectorpipe/model/Nd.scala | 0 .../vectorpipe/relations/MultiPolygons.scala | 0 .../scala/vectorpipe/relations/Routes.scala | 0 .../scala/vectorpipe/relations/package.scala | 0 .../utils/PartialCoordinateSequence.scala | 0 .../utils/ReversedCoordinateSequence.scala | 0 .../utils/VirtualCoordinateSequence.scala | 0 .../vectorpipe/relations/utils/package.scala | 0 .../AugmentedDiffMicroBatchReader.scala | 0 .../sources/AugmentedDiffProvider.scala | 0 .../sources/AugmentedDiffReader.scala | 0 .../sources/AugmentedDiffSource.scala | 0 .../sources/ChangeMicroBatchReader.scala | 0 .../vectorpipe/sources/ChangeProvider.scala | 0 .../vectorpipe/sources/ChangeReader.scala | 0 .../vectorpipe/sources/ChangeSource.scala | 0 .../sources/ChangesetMicroBatchReader.scala | 0 .../sources/ChangesetProvider.scala | 0 .../vectorpipe/sources/ChangesetReader.scala | 0 .../vectorpipe/sources/ChangesetSource.scala | 0 .../sources/ReplicationReader.scala | 0 .../ReplicationStreamBatchReader.scala | 0 .../ReplicationStreamMicroBatchReader.scala | 0 .../vectorpipe/sources/SequenceOffset.scala | 0 .../scala/vectorpipe/sources/Source.scala | 0 .../main/scala/vectorpipe/util/Auth.scala | 0 .../main/scala/vectorpipe/util/DBUtils.scala | 0 .../main/scala/vectorpipe/util/Geocode.scala | 0 .../main/scala/vectorpipe/util/Resource.scala | 0 .../main/scala/vectorpipe/util/package.scala | 0 .../vectorpipe/vectortile/Clipping.scala | 0 .../vectorpipe/vectortile/Pipeline.scala | 8 +- .../vectorpipe/vectortile/Simplify.scala | 0 .../vectortile/export/SaveToHadoop.scala | 119 ++++++++++++++++++ .../vectortile/export/SaveToS3.scala | 109 ++++++++++++++++ .../vectortile/export/package.scala | 52 ++++++++ .../scala/vectorpipe/vectortile/package.scala | 62 ++++++++- {src => core/src}/main/tut/index.md | 0 {src => core/src}/main/tut/outputs.md | 0 {src => core/src}/main/tut/sources.md | 0 {src => core/src}/main/tut/usage.md | 0 {src => core/src}/main/tut/usage/concepts.md | 0 {src => core/src}/main/tut/usage/osm.md | 0 {src => core/src}/main/tut/usage/usage.md | 0 {src => core/src}/test/resources/.gitignore | 0 .../test/resources/isle-of-man-latest.osm.orc | Bin .../src}/test/resources/log4j.properties | 0 .../src}/test/resources/relation-110564.orc | Bin .../src}/test/resources/relation-110564.wkt | 0 .../src}/test/resources/relation-191199.orc | Bin .../src}/test/resources/relation-191199.wkt | 0 .../src}/test/resources/relation-191204.orc | Bin .../src}/test/resources/relation-191204.wkt | 0 .../src}/test/resources/relation-1949938.orc | Bin .../src}/test/resources/relation-1949938.wkt | 0 .../src}/test/resources/relation-2554903.orc | Bin .../src}/test/resources/relation-2554903.wkt | 0 .../src}/test/resources/relation-2580685.orc | Bin .../src}/test/resources/relation-2580685.wkt | 0 .../src}/test/resources/relation-3080946.orc | Bin .../src}/test/resources/relation-3080946.wkt | 0 .../src}/test/resources/relation-3105056.orc | Bin .../src}/test/resources/relation-3105056.wkt | 0 .../src}/test/resources/relation-333501.orc | Bin .../src}/test/resources/relation-333501.wkt | 0 .../src}/test/resources/relation-393502.orc | Bin .../src}/test/resources/relation-393502.wkt | 0 .../src}/test/resources/relation-5448156.orc | Bin .../src}/test/resources/relation-5448156.wkt | 0 .../src}/test/resources/relation-5448691.orc | Bin .../src}/test/resources/relation-5448691.wkt | 0 .../src}/test/resources/relation-5612959.orc | Bin .../src}/test/resources/relation-5612959.wkt | 0 .../src}/test/resources/relation-61315.orc | Bin .../src}/test/resources/relation-61315.wkt | 0 .../src}/test/resources/relation-6710544.orc | Bin .../src}/test/resources/relation-6710544.wkt | 0 .../test/resources/view/cluster-view.html | 0 .../src}/test/resources/view/layer-test.html | 0 ...ltiPolygonRelationReconstructionSpec.scala | 0 .../scala/vectorpipe/ProcessOSMTest.scala | 0 .../scala/vectorpipe/TestEnvironment.scala | 0 .../functions/osm/FunctionSpec.scala | 0 .../vectortile/LayerTestPipeline.scala | 6 +- .../vectorpipe/vectortile/PipelineSpec.scala | 3 +- .../vectorpipe/vectortile/TestPipeline.scala | 33 ++++- .../vectortile/WeightedCentroid.scala | 0 examples/build.sbt | 11 ++ .../vectorpipe/examples/BaselineMain.scala | 93 ++++++++++++++ .../vectorpipe/examples/OSMLayerMain.scala | 51 ++++++++ .../examples/OSMLayerPipeline.scala | 25 ++++ project/plugins.sbt | 3 + 108 files changed, 651 insertions(+), 53 deletions(-) rename {src => core/src}/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (100%) rename {src => core/src}/main/resources/microsite/data/menu.yml (100%) rename {src => core/src}/main/scala/vectorpipe/OSM.scala (100%) rename {src => core/src}/main/scala/vectorpipe/VectorPipe.scala (61%) create mode 100644 core/src/main/scala/vectorpipe/encoders/GTEncoders.scala rename {src => core/src}/main/scala/vectorpipe/functions/osm/package.scala (100%) rename {src => core/src}/main/scala/vectorpipe/functions/package.scala (100%) rename {src => core/src}/main/scala/vectorpipe/internal/package.scala (99%) rename {src => core/src}/main/scala/vectorpipe/model/Actions.scala (100%) rename {src => core/src}/main/scala/vectorpipe/model/AugmentedDiff.scala (100%) rename {src => core/src}/main/scala/vectorpipe/model/Change.scala (100%) rename {src => core/src}/main/scala/vectorpipe/model/Changeset.scala (100%) rename {src => core/src}/main/scala/vectorpipe/model/ChangesetComment.scala (100%) rename {src => core/src}/main/scala/vectorpipe/model/ElementWithSequence.scala (100%) rename {src => core/src}/main/scala/vectorpipe/model/Member.scala (100%) rename {src => core/src}/main/scala/vectorpipe/model/Nd.scala (100%) rename {src => core/src}/main/scala/vectorpipe/relations/MultiPolygons.scala (100%) rename {src => core/src}/main/scala/vectorpipe/relations/Routes.scala (100%) rename {src => core/src}/main/scala/vectorpipe/relations/package.scala (100%) rename {src => core/src}/main/scala/vectorpipe/relations/utils/PartialCoordinateSequence.scala (100%) rename {src => core/src}/main/scala/vectorpipe/relations/utils/ReversedCoordinateSequence.scala (100%) rename {src => core/src}/main/scala/vectorpipe/relations/utils/VirtualCoordinateSequence.scala (100%) rename {src => core/src}/main/scala/vectorpipe/relations/utils/package.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/AugmentedDiffMicroBatchReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/AugmentedDiffProvider.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/AugmentedDiffReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/AugmentedDiffSource.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ChangeMicroBatchReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ChangeProvider.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ChangeReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ChangeSource.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ChangesetMicroBatchReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ChangesetProvider.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ChangesetReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ChangesetSource.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ReplicationReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ReplicationStreamBatchReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/ReplicationStreamMicroBatchReader.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/SequenceOffset.scala (100%) rename {src => core/src}/main/scala/vectorpipe/sources/Source.scala (100%) rename {src => core/src}/main/scala/vectorpipe/util/Auth.scala (100%) rename {src => core/src}/main/scala/vectorpipe/util/DBUtils.scala (100%) rename {src => core/src}/main/scala/vectorpipe/util/Geocode.scala (100%) rename {src => core/src}/main/scala/vectorpipe/util/Resource.scala (100%) rename {src => core/src}/main/scala/vectorpipe/util/package.scala (100%) rename {src => core/src}/main/scala/vectorpipe/vectortile/Clipping.scala (100%) rename {src => core/src}/main/scala/vectorpipe/vectortile/Pipeline.scala (94%) rename {src => core/src}/main/scala/vectorpipe/vectortile/Simplify.scala (100%) create mode 100644 core/src/main/scala/vectorpipe/vectortile/export/SaveToHadoop.scala create mode 100644 core/src/main/scala/vectorpipe/vectortile/export/SaveToS3.scala rename {src => core/src}/main/scala/vectorpipe/vectortile/export/package.scala (52%) rename {src => core/src}/main/scala/vectorpipe/vectortile/package.scala (68%) rename {src => core/src}/main/tut/index.md (100%) rename {src => core/src}/main/tut/outputs.md (100%) rename {src => core/src}/main/tut/sources.md (100%) rename {src => core/src}/main/tut/usage.md (100%) rename {src => core/src}/main/tut/usage/concepts.md (100%) rename {src => core/src}/main/tut/usage/osm.md (100%) rename {src => core/src}/main/tut/usage/usage.md (100%) rename {src => core/src}/test/resources/.gitignore (100%) rename {src => core/src}/test/resources/isle-of-man-latest.osm.orc (100%) rename {src => core/src}/test/resources/log4j.properties (100%) rename {src => core/src}/test/resources/relation-110564.orc (100%) rename {src => core/src}/test/resources/relation-110564.wkt (100%) rename {src => core/src}/test/resources/relation-191199.orc (100%) rename {src => core/src}/test/resources/relation-191199.wkt (100%) rename {src => core/src}/test/resources/relation-191204.orc (100%) rename {src => core/src}/test/resources/relation-191204.wkt (100%) rename {src => core/src}/test/resources/relation-1949938.orc (100%) rename {src => core/src}/test/resources/relation-1949938.wkt (100%) rename {src => core/src}/test/resources/relation-2554903.orc (100%) rename {src => core/src}/test/resources/relation-2554903.wkt (100%) rename {src => core/src}/test/resources/relation-2580685.orc (100%) rename {src => core/src}/test/resources/relation-2580685.wkt (100%) rename {src => core/src}/test/resources/relation-3080946.orc (100%) rename {src => core/src}/test/resources/relation-3080946.wkt (100%) rename {src => core/src}/test/resources/relation-3105056.orc (100%) rename {src => core/src}/test/resources/relation-3105056.wkt (100%) rename {src => core/src}/test/resources/relation-333501.orc (100%) rename {src => core/src}/test/resources/relation-333501.wkt (100%) rename {src => core/src}/test/resources/relation-393502.orc (100%) rename {src => core/src}/test/resources/relation-393502.wkt (100%) rename {src => core/src}/test/resources/relation-5448156.orc (100%) rename {src => core/src}/test/resources/relation-5448156.wkt (100%) rename {src => core/src}/test/resources/relation-5448691.orc (100%) rename {src => core/src}/test/resources/relation-5448691.wkt (100%) rename {src => core/src}/test/resources/relation-5612959.orc (100%) rename {src => core/src}/test/resources/relation-5612959.wkt (100%) rename {src => core/src}/test/resources/relation-61315.orc (100%) rename {src => core/src}/test/resources/relation-61315.wkt (100%) rename {src => core/src}/test/resources/relation-6710544.orc (100%) rename {src => core/src}/test/resources/relation-6710544.wkt (100%) rename {src => core/src}/test/resources/view/cluster-view.html (100%) rename {src => core/src}/test/resources/view/layer-test.html (100%) rename {src => core/src}/test/scala/vectorpipe/MultiPolygonRelationReconstructionSpec.scala (100%) rename {src => core/src}/test/scala/vectorpipe/ProcessOSMTest.scala (100%) rename {src => core/src}/test/scala/vectorpipe/TestEnvironment.scala (100%) rename {src => core/src}/test/scala/vectorpipe/functions/osm/FunctionSpec.scala (100%) rename {src => core/src}/test/scala/vectorpipe/vectortile/LayerTestPipeline.scala (68%) rename {src => core/src}/test/scala/vectorpipe/vectortile/PipelineSpec.scala (95%) rename {src => core/src}/test/scala/vectorpipe/vectortile/TestPipeline.scala (57%) rename {src => core/src}/test/scala/vectorpipe/vectortile/WeightedCentroid.scala (100%) create mode 100644 examples/build.sbt create mode 100644 examples/src/main/scala/vectorpipe/examples/BaselineMain.scala create mode 100644 examples/src/main/scala/vectorpipe/examples/OSMLayerMain.scala create mode 100644 examples/src/main/scala/vectorpipe/examples/OSMLayerPipeline.scala diff --git a/build.sbt b/build.sbt index 4bcddf29..8240ff3b 100644 --- a/build.sbt +++ b/build.sbt @@ -147,11 +147,22 @@ val vpExtraSettings = Seq( // micrositeBaseUrl := "/vectorpipe" // micrositeDocumentationUrl := "/vectorpipe/latest/api/#vectorpipe.package" /* Location of Scaladocs */ +lazy val root = project + .in(file(".")) + .aggregate(vectorpipe, examples) + .settings(commonSettings, vpExtraSettings) + /* Main project */ lazy val vectorpipe = project - .in(file(".")) + .in(file("core")) .settings(commonSettings, vpExtraSettings, release) +/* Example projects */ +lazy val examples = project + .in(file("examples")) + .settings(commonSettings, vpExtraSettings) + .dependsOn(vectorpipe) + /* Benchmarking suite. * Benchmarks can be executed by first switching to the `bench` project and then by running: jmh:run -t 1 -f 1 -wi 5 -i 5 .*Bench.* @@ -162,7 +173,7 @@ lazy val bench = project .dependsOn(vectorpipe) .enablePlugins(JmhPlugin) - +onLoad in Global ~= (_ andThen ("project vectorpipe" :: _)) // assemblyShadeRules in assembly := { diff --git a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister similarity index 100% rename from src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister rename to core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister diff --git a/src/main/resources/microsite/data/menu.yml b/core/src/main/resources/microsite/data/menu.yml similarity index 100% rename from src/main/resources/microsite/data/menu.yml rename to core/src/main/resources/microsite/data/menu.yml diff --git a/src/main/scala/vectorpipe/OSM.scala b/core/src/main/scala/vectorpipe/OSM.scala similarity index 100% rename from src/main/scala/vectorpipe/OSM.scala rename to core/src/main/scala/vectorpipe/OSM.scala diff --git a/src/main/scala/vectorpipe/VectorPipe.scala b/core/src/main/scala/vectorpipe/VectorPipe.scala similarity index 61% rename from src/main/scala/vectorpipe/VectorPipe.scala rename to core/src/main/scala/vectorpipe/VectorPipe.scala index c5d63c38..cd6fc565 100644 --- a/src/main/scala/vectorpipe/VectorPipe.scala +++ b/core/src/main/scala/vectorpipe/VectorPipe.scala @@ -15,6 +15,8 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StringType import org.locationtech.jts.{geom => jts} +import scala.reflect.ClassTag + object VectorPipe { /** Vectortile conversion options. @@ -46,7 +48,10 @@ object VectorPipe { def forAllZoomsWithSrcProjection(zoom: Int, crs: CRS) = Options(zoom, Some(0), crs, None) } - def apply(input: DataFrame, pipeline: vectortile.Pipeline, options: Options): Unit = { + def apply[T: ClassTag](input: DataFrame, pipeline: vectortile.Pipeline, options: Options): Unit = { + import input.sparkSession.implicits._ + import vectorpipe.encoders._ + val geomColumn = pipeline.geometryColumn assert(input.columns.contains(geomColumn) && input.schema(geomColumn).dataType.isInstanceOf[org.apache.spark.sql.jts.AbstractGeometryUDT[jts.Geometry]], @@ -74,46 +79,49 @@ object VectorPipe { SpatialKey(k.col / 2, k.row / 2) }.toSeq } - def generateVectorTiles[G <: Geometry](df: DataFrame, level: LayoutLevel): RDD[(SpatialKey, VectorTile)] = { + def generateVectorTiles[G <: Geometry](df: DataFrame, level: LayoutLevel): Dataset[(SpatialKey, Array[Byte])] = { val zoom = level.zoom - val clip = udf { (g: jts.Geometry, key: GenericRowWithSchema) => - val k = getSpatialKey(key) - pipeline.clip(g, k, level) - } - val selectedGeometry = pipeline - .select(df, zoom, keyColumn) + val selectedGeometry = pipeline.select match { + case None => df + case Some(select) => select(df, zoom, keyColumn) + } - val clipped = selectedGeometry + val keyed = selectedGeometry .withColumn(keyColumn, explode(col(keyColumn))) - .repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping - .withColumn(geomColumn, clip(col(geomColumn), col(keyColumn))) + + val clipped = pipeline.clip match { + case None => keyed + case Some(clipper) => + val clip = udf { (g: jts.Geometry, key: GenericRowWithSchema) => + val k = getSpatialKey(key) + clipper(g, k, level) + } + val toClip = keyed.repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping + toClip.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn))) + } pipeline.layerMultiplicity match { case SingleLayer(layerName) => clipped - .rdd - .map { r => (getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) } - .groupByKey - .map { case (key, feats) => + .map { r => SingleLayerEntry(getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) } + .groupByKey(_.key) + .mapGroups { (key: SpatialKey, sleIter: Iterator[SingleLayerEntry]) => val ex = level.layout.mapTransform.keyToExtent(key) - key -> buildVectorTile(feats, layerName, ex, options.tileResolution, options.orderAreas) + key -> buildVectorTile(sleIter.map(_.feature).toIterable, layerName, ex, options.tileResolution, options.orderAreas).toBytes } case LayerNamesInColumn(layerNameCol) => assert(selectedGeometry.schema(layerNameCol).dataType == StringType, s"layerMultiplicity=${pipeline.layerMultiplicity} requires String-type column of name ${layerNameCol}") + clipped - .rdd - .map { r => (getSpatialKey(r, keyColumn), r.getAs[String](layerNameCol) -> pipeline.pack(r, zoom)) } - .groupByKey - .mapPartitions{ iter: Iterator[(SpatialKey, Iterable[(String, VectorTileFeature[Geometry])])] => - iter.map{ case (key, groupedFeatures) => { - val layerFeatures: Map[String, Iterable[VectorTileFeature[Geometry]]] = - groupedFeatures.groupBy(_._1).mapValues(_.map(_._2)) - val ex = level.layout.mapTransform.keyToExtent(key) - key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas) - }} - } + .map { r => MultipleLayerEntry(getSpatialKey(r, keyColumn), r.getAs[String](layerNameCol), pipeline.pack(r, zoom)) } + .groupByKey(_.key) + .mapGroups{ (key: SpatialKey, iter: Iterator[MultipleLayerEntry]) => + val ex = level.layout.mapTransform.keyToExtent(key) + val layerFeatures = iter.toSeq.groupBy(_.layer).mapValues(_.map(_.feature)) + key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas).toBytes + } } } @@ -134,16 +142,30 @@ object VectorPipe { } else { df } - val simplify = udf { g: jts.Geometry => pipeline.simplify(g, level.layout) } - val reduced = pipeline - .reduce(working, level, keyColumn) - val prepared = reduced - .withColumn(geomColumn, simplify(col(geomColumn))) - val vts = generateVectorTiles(prepared, level) + + val reduced = pipeline.reduce match { + case None => working + case Some(reduce) => reduce(working, level, keyColumn) + } + + val simplified = pipeline.simplify match { + case None => reduced + case Some(simplifier) => + val simplify = udf { g: jts.Geometry => simplifier(g, level.layout) } + reduced.withColumn(geomColumn, simplify(col(geomColumn))) + } + + val vts = generateVectorTiles(simplified, level) saveVectorTiles(vts, zoom, pipeline.baseOutputURI) - prepared.withColumn(keyColumn, reduceKeys(col(keyColumn))) + + simplified.withColumn(keyColumn, reduceKeys(col(keyColumn))) } } + private case class SingleLayerEntry(key: SpatialKey, feature: VectorTileFeature[Geometry]) + private case class MultipleLayerEntry(key: SpatialKey, layer: String, feature: VectorTileFeature[Geometry]) + + private implicit def sleEncoder: Encoder[SingleLayerEntry] = Encoders.kryo[SingleLayerEntry] + private implicit def mleEncoder: Encoder[MultipleLayerEntry] = Encoders.kryo[MultipleLayerEntry] } diff --git a/core/src/main/scala/vectorpipe/encoders/GTEncoders.scala b/core/src/main/scala/vectorpipe/encoders/GTEncoders.scala new file mode 100644 index 00000000..09c3dcce --- /dev/null +++ b/core/src/main/scala/vectorpipe/encoders/GTEncoders.scala @@ -0,0 +1,22 @@ +package vectorpipe.encoders + +import geotrellis.vector._ +import geotrellis.vectortile._ +import org.apache.spark.sql.{Encoder, Encoders} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder + +object GTEncoders { + implicit def gtGeometryEncoder: Encoder[Geometry] = Encoders.kryo[Geometry] + implicit def gtPointEncoder: Encoder[Point] = ExpressionEncoder() + implicit def gtMultiPointEncoder: Encoder[MultiPoint] = ExpressionEncoder() + implicit def gtLineEncoder: Encoder[Line] = ExpressionEncoder() + implicit def gtMultiLineEncoder: Encoder[MultiLine] = ExpressionEncoder() + implicit def gtPolygonEncoder: Encoder[Polygon] = ExpressionEncoder() + implicit def gtMultiPolygonEncoder: Encoder[MultiPolygon] = ExpressionEncoder() + + implicit def gtFeatureEncoder[G <: Geometry, D](implicit ev1: Encoder[G], ev2: Encoder[D]): Encoder[Feature[G, D]] = Encoders.kryo[Feature[G, D]] + + implicit def gtVectorTileEncoder: Encoder[VectorTile] = Encoders.kryo[VectorTile] + //implicit def gtLayerEncoder: Encoder[Layer] = Encoders.javaSerialization[Layer] + //implicit def gtStrictLayerEncoder: Encoder[StrictLayer] = Encoders.kryo[StrictLayer] +} diff --git a/src/main/scala/vectorpipe/functions/osm/package.scala b/core/src/main/scala/vectorpipe/functions/osm/package.scala similarity index 100% rename from src/main/scala/vectorpipe/functions/osm/package.scala rename to core/src/main/scala/vectorpipe/functions/osm/package.scala diff --git a/src/main/scala/vectorpipe/functions/package.scala b/core/src/main/scala/vectorpipe/functions/package.scala similarity index 100% rename from src/main/scala/vectorpipe/functions/package.scala rename to core/src/main/scala/vectorpipe/functions/package.scala diff --git a/src/main/scala/vectorpipe/internal/package.scala b/core/src/main/scala/vectorpipe/internal/package.scala similarity index 99% rename from src/main/scala/vectorpipe/internal/package.scala rename to core/src/main/scala/vectorpipe/internal/package.scala index 61d7004b..e59d7b3c 100644 --- a/src/main/scala/vectorpipe/internal/package.scala +++ b/core/src/main/scala/vectorpipe/internal/package.scala @@ -182,7 +182,7 @@ package object internal { // when an element has been deleted, it doesn't include any tags; use a window function to retrieve the last tags // present and use those - history + frame .where('type === "relation") .repartition('id) .select( diff --git a/src/main/scala/vectorpipe/model/Actions.scala b/core/src/main/scala/vectorpipe/model/Actions.scala similarity index 100% rename from src/main/scala/vectorpipe/model/Actions.scala rename to core/src/main/scala/vectorpipe/model/Actions.scala diff --git a/src/main/scala/vectorpipe/model/AugmentedDiff.scala b/core/src/main/scala/vectorpipe/model/AugmentedDiff.scala similarity index 100% rename from src/main/scala/vectorpipe/model/AugmentedDiff.scala rename to core/src/main/scala/vectorpipe/model/AugmentedDiff.scala diff --git a/src/main/scala/vectorpipe/model/Change.scala b/core/src/main/scala/vectorpipe/model/Change.scala similarity index 100% rename from src/main/scala/vectorpipe/model/Change.scala rename to core/src/main/scala/vectorpipe/model/Change.scala diff --git a/src/main/scala/vectorpipe/model/Changeset.scala b/core/src/main/scala/vectorpipe/model/Changeset.scala similarity index 100% rename from src/main/scala/vectorpipe/model/Changeset.scala rename to core/src/main/scala/vectorpipe/model/Changeset.scala diff --git a/src/main/scala/vectorpipe/model/ChangesetComment.scala b/core/src/main/scala/vectorpipe/model/ChangesetComment.scala similarity index 100% rename from src/main/scala/vectorpipe/model/ChangesetComment.scala rename to core/src/main/scala/vectorpipe/model/ChangesetComment.scala diff --git a/src/main/scala/vectorpipe/model/ElementWithSequence.scala b/core/src/main/scala/vectorpipe/model/ElementWithSequence.scala similarity index 100% rename from src/main/scala/vectorpipe/model/ElementWithSequence.scala rename to core/src/main/scala/vectorpipe/model/ElementWithSequence.scala diff --git a/src/main/scala/vectorpipe/model/Member.scala b/core/src/main/scala/vectorpipe/model/Member.scala similarity index 100% rename from src/main/scala/vectorpipe/model/Member.scala rename to core/src/main/scala/vectorpipe/model/Member.scala diff --git a/src/main/scala/vectorpipe/model/Nd.scala b/core/src/main/scala/vectorpipe/model/Nd.scala similarity index 100% rename from src/main/scala/vectorpipe/model/Nd.scala rename to core/src/main/scala/vectorpipe/model/Nd.scala diff --git a/src/main/scala/vectorpipe/relations/MultiPolygons.scala b/core/src/main/scala/vectorpipe/relations/MultiPolygons.scala similarity index 100% rename from src/main/scala/vectorpipe/relations/MultiPolygons.scala rename to core/src/main/scala/vectorpipe/relations/MultiPolygons.scala diff --git a/src/main/scala/vectorpipe/relations/Routes.scala b/core/src/main/scala/vectorpipe/relations/Routes.scala similarity index 100% rename from src/main/scala/vectorpipe/relations/Routes.scala rename to core/src/main/scala/vectorpipe/relations/Routes.scala diff --git a/src/main/scala/vectorpipe/relations/package.scala b/core/src/main/scala/vectorpipe/relations/package.scala similarity index 100% rename from src/main/scala/vectorpipe/relations/package.scala rename to core/src/main/scala/vectorpipe/relations/package.scala diff --git a/src/main/scala/vectorpipe/relations/utils/PartialCoordinateSequence.scala b/core/src/main/scala/vectorpipe/relations/utils/PartialCoordinateSequence.scala similarity index 100% rename from src/main/scala/vectorpipe/relations/utils/PartialCoordinateSequence.scala rename to core/src/main/scala/vectorpipe/relations/utils/PartialCoordinateSequence.scala diff --git a/src/main/scala/vectorpipe/relations/utils/ReversedCoordinateSequence.scala b/core/src/main/scala/vectorpipe/relations/utils/ReversedCoordinateSequence.scala similarity index 100% rename from src/main/scala/vectorpipe/relations/utils/ReversedCoordinateSequence.scala rename to core/src/main/scala/vectorpipe/relations/utils/ReversedCoordinateSequence.scala diff --git a/src/main/scala/vectorpipe/relations/utils/VirtualCoordinateSequence.scala b/core/src/main/scala/vectorpipe/relations/utils/VirtualCoordinateSequence.scala similarity index 100% rename from src/main/scala/vectorpipe/relations/utils/VirtualCoordinateSequence.scala rename to core/src/main/scala/vectorpipe/relations/utils/VirtualCoordinateSequence.scala diff --git a/src/main/scala/vectorpipe/relations/utils/package.scala b/core/src/main/scala/vectorpipe/relations/utils/package.scala similarity index 100% rename from src/main/scala/vectorpipe/relations/utils/package.scala rename to core/src/main/scala/vectorpipe/relations/utils/package.scala diff --git a/src/main/scala/vectorpipe/sources/AugmentedDiffMicroBatchReader.scala b/core/src/main/scala/vectorpipe/sources/AugmentedDiffMicroBatchReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/AugmentedDiffMicroBatchReader.scala rename to core/src/main/scala/vectorpipe/sources/AugmentedDiffMicroBatchReader.scala diff --git a/src/main/scala/vectorpipe/sources/AugmentedDiffProvider.scala b/core/src/main/scala/vectorpipe/sources/AugmentedDiffProvider.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/AugmentedDiffProvider.scala rename to core/src/main/scala/vectorpipe/sources/AugmentedDiffProvider.scala diff --git a/src/main/scala/vectorpipe/sources/AugmentedDiffReader.scala b/core/src/main/scala/vectorpipe/sources/AugmentedDiffReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/AugmentedDiffReader.scala rename to core/src/main/scala/vectorpipe/sources/AugmentedDiffReader.scala diff --git a/src/main/scala/vectorpipe/sources/AugmentedDiffSource.scala b/core/src/main/scala/vectorpipe/sources/AugmentedDiffSource.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/AugmentedDiffSource.scala rename to core/src/main/scala/vectorpipe/sources/AugmentedDiffSource.scala diff --git a/src/main/scala/vectorpipe/sources/ChangeMicroBatchReader.scala b/core/src/main/scala/vectorpipe/sources/ChangeMicroBatchReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ChangeMicroBatchReader.scala rename to core/src/main/scala/vectorpipe/sources/ChangeMicroBatchReader.scala diff --git a/src/main/scala/vectorpipe/sources/ChangeProvider.scala b/core/src/main/scala/vectorpipe/sources/ChangeProvider.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ChangeProvider.scala rename to core/src/main/scala/vectorpipe/sources/ChangeProvider.scala diff --git a/src/main/scala/vectorpipe/sources/ChangeReader.scala b/core/src/main/scala/vectorpipe/sources/ChangeReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ChangeReader.scala rename to core/src/main/scala/vectorpipe/sources/ChangeReader.scala diff --git a/src/main/scala/vectorpipe/sources/ChangeSource.scala b/core/src/main/scala/vectorpipe/sources/ChangeSource.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ChangeSource.scala rename to core/src/main/scala/vectorpipe/sources/ChangeSource.scala diff --git a/src/main/scala/vectorpipe/sources/ChangesetMicroBatchReader.scala b/core/src/main/scala/vectorpipe/sources/ChangesetMicroBatchReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ChangesetMicroBatchReader.scala rename to core/src/main/scala/vectorpipe/sources/ChangesetMicroBatchReader.scala diff --git a/src/main/scala/vectorpipe/sources/ChangesetProvider.scala b/core/src/main/scala/vectorpipe/sources/ChangesetProvider.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ChangesetProvider.scala rename to core/src/main/scala/vectorpipe/sources/ChangesetProvider.scala diff --git a/src/main/scala/vectorpipe/sources/ChangesetReader.scala b/core/src/main/scala/vectorpipe/sources/ChangesetReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ChangesetReader.scala rename to core/src/main/scala/vectorpipe/sources/ChangesetReader.scala diff --git a/src/main/scala/vectorpipe/sources/ChangesetSource.scala b/core/src/main/scala/vectorpipe/sources/ChangesetSource.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ChangesetSource.scala rename to core/src/main/scala/vectorpipe/sources/ChangesetSource.scala diff --git a/src/main/scala/vectorpipe/sources/ReplicationReader.scala b/core/src/main/scala/vectorpipe/sources/ReplicationReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ReplicationReader.scala rename to core/src/main/scala/vectorpipe/sources/ReplicationReader.scala diff --git a/src/main/scala/vectorpipe/sources/ReplicationStreamBatchReader.scala b/core/src/main/scala/vectorpipe/sources/ReplicationStreamBatchReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ReplicationStreamBatchReader.scala rename to core/src/main/scala/vectorpipe/sources/ReplicationStreamBatchReader.scala diff --git a/src/main/scala/vectorpipe/sources/ReplicationStreamMicroBatchReader.scala b/core/src/main/scala/vectorpipe/sources/ReplicationStreamMicroBatchReader.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/ReplicationStreamMicroBatchReader.scala rename to core/src/main/scala/vectorpipe/sources/ReplicationStreamMicroBatchReader.scala diff --git a/src/main/scala/vectorpipe/sources/SequenceOffset.scala b/core/src/main/scala/vectorpipe/sources/SequenceOffset.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/SequenceOffset.scala rename to core/src/main/scala/vectorpipe/sources/SequenceOffset.scala diff --git a/src/main/scala/vectorpipe/sources/Source.scala b/core/src/main/scala/vectorpipe/sources/Source.scala similarity index 100% rename from src/main/scala/vectorpipe/sources/Source.scala rename to core/src/main/scala/vectorpipe/sources/Source.scala diff --git a/src/main/scala/vectorpipe/util/Auth.scala b/core/src/main/scala/vectorpipe/util/Auth.scala similarity index 100% rename from src/main/scala/vectorpipe/util/Auth.scala rename to core/src/main/scala/vectorpipe/util/Auth.scala diff --git a/src/main/scala/vectorpipe/util/DBUtils.scala b/core/src/main/scala/vectorpipe/util/DBUtils.scala similarity index 100% rename from src/main/scala/vectorpipe/util/DBUtils.scala rename to core/src/main/scala/vectorpipe/util/DBUtils.scala diff --git a/src/main/scala/vectorpipe/util/Geocode.scala b/core/src/main/scala/vectorpipe/util/Geocode.scala similarity index 100% rename from src/main/scala/vectorpipe/util/Geocode.scala rename to core/src/main/scala/vectorpipe/util/Geocode.scala diff --git a/src/main/scala/vectorpipe/util/Resource.scala b/core/src/main/scala/vectorpipe/util/Resource.scala similarity index 100% rename from src/main/scala/vectorpipe/util/Resource.scala rename to core/src/main/scala/vectorpipe/util/Resource.scala diff --git a/src/main/scala/vectorpipe/util/package.scala b/core/src/main/scala/vectorpipe/util/package.scala similarity index 100% rename from src/main/scala/vectorpipe/util/package.scala rename to core/src/main/scala/vectorpipe/util/package.scala diff --git a/src/main/scala/vectorpipe/vectortile/Clipping.scala b/core/src/main/scala/vectorpipe/vectortile/Clipping.scala similarity index 100% rename from src/main/scala/vectorpipe/vectortile/Clipping.scala rename to core/src/main/scala/vectorpipe/vectortile/Clipping.scala diff --git a/src/main/scala/vectorpipe/vectortile/Pipeline.scala b/core/src/main/scala/vectorpipe/vectortile/Pipeline.scala similarity index 94% rename from src/main/scala/vectorpipe/vectortile/Pipeline.scala rename to core/src/main/scala/vectorpipe/vectortile/Pipeline.scala index 9ba9401b..6d5dc616 100644 --- a/src/main/scala/vectorpipe/vectortile/Pipeline.scala +++ b/core/src/main/scala/vectorpipe/vectortile/Pipeline.scala @@ -97,7 +97,7 @@ trait Pipeline { * Array[SpatialKey] giving the list of keys that the * geometry interacts with */ - def reduce(input: DataFrame, layoutLevel: LayoutLevel, keyColumn: String): DataFrame = input + val reduce: Option[(DataFrame, LayoutLevel, String) => DataFrame] = None /* * Lower complexity of geometry while moving to less resolute zoom levels. @@ -111,7 +111,7 @@ trait Pipeline { * is a simplifier using JTS's topology-preserving simplifier available in * [[vectorpipe.vectortile.Simplify]]. */ - def simplify(g: jts.Geometry, layout: LayoutDefinition): jts.Geometry = g + val simplify: Option[(jts.Geometry, LayoutDefinition) => jts.Geometry] = None /** * Select geometries for display at a given zoom level. @@ -129,7 +129,7 @@ trait Pipeline { * String-typed column of the name contained in [[layerName]] to indicate * which layer a geometry belongs in. */ - def select(input: DataFrame, targetZoom: Int, keyColumn: String): DataFrame = input + val select: Option[(DataFrame, Int, String) => DataFrame] = None /** * Clip geometries prior to writing to vector tiles. @@ -140,7 +140,7 @@ trait Pipeline { * * Basic (non-no-op) clipping functions can be found in [[Clipping]]. */ - def clip(geom: jts.Geometry, key: SpatialKey, layoutLevel: LayoutLevel): jts.Geometry = geom + val clip: Option[(jts.Geometry, SpatialKey, LayoutLevel) => jts.Geometry] = None /** * Convert table rows to output features. diff --git a/src/main/scala/vectorpipe/vectortile/Simplify.scala b/core/src/main/scala/vectorpipe/vectortile/Simplify.scala similarity index 100% rename from src/main/scala/vectorpipe/vectortile/Simplify.scala rename to core/src/main/scala/vectorpipe/vectortile/Simplify.scala diff --git a/core/src/main/scala/vectorpipe/vectortile/export/SaveToHadoop.scala b/core/src/main/scala/vectorpipe/vectortile/export/SaveToHadoop.scala new file mode 100644 index 00000000..dc13e956 --- /dev/null +++ b/core/src/main/scala/vectorpipe/vectortile/export/SaveToHadoop.scala @@ -0,0 +1,119 @@ +/* + * Copyright 2016 Azavea + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package geotrellis.spark.io.hadoop + +import geotrellis.spark.render._ +import geotrellis.spark.SpatialKey + +import java.net.URI +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql._ + +import scala.collection.concurrent.TrieMap + +object SaveToHadoop { + /** Saves records from an iterator and returns them unchanged. + * + * @param recs Key, Value records to be saved + * @param keyToUri A function from K (a key) to Hadoop URI + * @param toBytes A function from record to array of bytes + * @param conf Hadoop Configuration to used to get FileSystem + */ + def saveIterator[K, V]( + recs: Iterator[(K, V)], + keyToUri: K => String, + conf: Configuration + )(toBytes: (K, V) => Array[Byte]): Iterator[(K, V)] = { + val fsCache = TrieMap.empty[String, FileSystem] + + for ( row @ (key, data) <- recs ) yield { + val path = keyToUri(key) + val uri = new URI(path) + val fs = fsCache.getOrElseUpdate( + uri.getScheme, + FileSystem.get(uri, conf)) + val out = fs.create(new Path(path)) + try { out.write(toBytes(key, data)) } + finally { out.close() } + row + } + } + + /** + * Sets up saving to Hadoop, but returns an RDD so that writes can + * be chained. + * + * @param keyToUri A function from K (a key) to a Hadoop URI + */ + def setup[K]( + dataset: Dataset[(K, Array[Byte])], + keyToUri: K => String + )(implicit ev: Encoder[(K, Array[Byte])]): Dataset[(K, Array[Byte])] = { + import dataset.sparkSession.implicits._ + dataset.mapPartitions { partition => + saveIterator(partition, keyToUri, new Configuration){ (k, v) => v } + } + } + + /** + * Sets up saving to Hadoop, but returns an RDD so that writes can + * be chained. + * + * @param keyToUri A function from K (a key) to a Hadoop URI + * @param toBytes A function from record to array of bytes + */ + def setup[K, V]( + dataset: Dataset[(K, V)], + keyToUri: K => String, + toBytes: (K, V) => Array[Byte] + )(implicit ev: Encoder[(K, V)]): Dataset[(K, V)] = { + import dataset.sparkSession.implicits._ + val conf = dataset.sparkSession.sparkContext.hadoopConfiguration + dataset.mapPartitions { partition => + saveIterator(partition, keyToUri, new Configuration)(toBytes) + } + } + + /** + * Saves to Hadoop FileSystem, returns an count of records saved. + * + * @param keyToUri A function from K (a key) to a Hadoop URI + */ + def apply[K]( + dataset: Dataset[(K, Array[Byte])], + keyToUri: K => String + )( implicit ev: Encoder[(K, Array[Byte])]): Long = { + import dataset.sparkSession.implicits._ + setup(dataset, keyToUri).count + } + + /** + * Saves to Hadoop FileSystem, returns an count of records saved. + * + * @param keyToUri A function from K (a key) to a Hadoop URI + * @param toBytes A function from record to array of bytes + */ + def apply[K, V]( + dataset: Dataset[(K, V)], + keyToUri: K => String, + toBytes: (K, V) => Array[Byte] + )(implicit ev: Encoder[(K, V)]): Long = { + import dataset.sparkSession.implicits._ + setup(dataset, keyToUri, toBytes).count + } +} diff --git a/core/src/main/scala/vectorpipe/vectortile/export/SaveToS3.scala b/core/src/main/scala/vectorpipe/vectortile/export/SaveToS3.scala new file mode 100644 index 00000000..a0784fdf --- /dev/null +++ b/core/src/main/scala/vectorpipe/vectortile/export/SaveToS3.scala @@ -0,0 +1,109 @@ +/* + * Copyright 2016 Azavea + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package vectorpipe.vectortile.export + +import geotrellis.spark.SpatialKey + +import cats.effect.{IO, Timer} +import cats.syntax.apply._ +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.retry.PredefinedRetryPolicies +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest, PutObjectResult} +import com.amazonaws.services.s3.model.AmazonS3Exception +import org.apache.spark.sql.Dataset + +import scala.concurrent.ExecutionContext + +import java.io.ByteArrayInputStream +import java.util.concurrent.Executors +import java.net.URI + +object SaveToS3 { + final val defaultThreadCount = Runtime.getRuntime.availableProcessors + + private def defaultConfiguration = { + val config = new com.amazonaws.ClientConfiguration + config.setMaxConnections(128) + config.setMaxErrorRetry(16) + config.setConnectionTimeout(100000) + config.setSocketTimeout(100000) + config.setRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(32)) + config + } + + /** + * @param keyToUri A function that maps each key to full s3 uri + * @param rdd An RDD of K, Byte-Array pairs (where the byte-arrays contains image data) to send to S3 + * @param putObjectModifier Function that will be applied ot S3 PutObjectRequests, so that they can be modified (e.g. to change the ACL settings) + * @param s3Maker A function which returns an S3 Client (real or mock) into-which to save the data + * @param threads Number of threads dedicated for the IO + */ + def apply[K]( + dataset: Dataset[(K, Array[Byte])], + keyToUri: K => String, + putObjectModifier: PutObjectRequest => PutObjectRequest = { p => p }, + threads: Int = defaultThreadCount + ): Unit = { + val keyToPrefix: K => (String, String) = key => { + val uri = new URI(keyToUri(key)) + require(uri.getScheme == "s3", s"SaveToS3 only supports s3 scheme: $uri") + val bucket = uri.getAuthority + val prefix = uri.getPath.substring(1) // drop the leading / from the prefix + (bucket, prefix) + } + + dataset.foreachPartition { partition => + val s3Client = AmazonS3ClientBuilder.defaultClient() + + val requests: fs2.Stream[IO, PutObjectRequest] = + fs2.Stream.fromIterator[IO, PutObjectRequest]( + partition.map { case (key, bytes) => + val metadata = new ObjectMetadata() + metadata.setContentLength(bytes.length) + val is = new ByteArrayInputStream(bytes) + val (bucket, path) = keyToPrefix(key) + putObjectModifier(new PutObjectRequest(bucket, path, is, metadata)) + } + ) + + val pool = Executors.newFixedThreadPool(threads) + implicit val ec = ExecutionContext.fromExecutor(pool) + implicit val timer: Timer[IO] = IO.timer(ec) + implicit val cs = IO.contextShift(ec) + + import geotrellis.spark.util.TaskUtils._ + val write: PutObjectRequest => fs2.Stream[IO, PutObjectResult] = { request => + fs2.Stream eval IO.shift(ec) *> IO { + request.getInputStream.reset() // reset in case of retransmission to avoid 400 error + s3Client.putObject(request) + }.retryEBO { + case e: AmazonS3Exception if e.getStatusCode == 503 => true + case _ => false + } + } + + requests + .map(write) + .parJoin(threads) + .compile + .toVector + .unsafeRunSync() + pool.shutdown() + } + } +} diff --git a/src/main/scala/vectorpipe/vectortile/export/package.scala b/core/src/main/scala/vectorpipe/vectortile/export/package.scala similarity index 52% rename from src/main/scala/vectorpipe/vectortile/export/package.scala rename to core/src/main/scala/vectorpipe/vectortile/export/package.scala index e6d0c3a9..88fb92a3 100644 --- a/src/main/scala/vectorpipe/vectortile/export/package.scala +++ b/core/src/main/scala/vectorpipe/vectortile/export/package.scala @@ -1,5 +1,7 @@ package vectorpipe.vectortile +import vectorpipe.vectortile.export.SaveToS3 + import com.amazonaws.services.s3.model.CannedAccessControlList._ import geotrellis.spark.SpatialKey import geotrellis.spark.io.hadoop._ @@ -7,6 +9,7 @@ import geotrellis.spark.io.index.zcurve.Z2 import geotrellis.spark.io.s3._ import geotrellis.vectortile._ import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ import java.net.URI import java.io.ByteArrayOutputStream @@ -24,6 +27,17 @@ package object export { } } + def saveVectorTiles(vectorTiles: Dataset[(SpatialKey, Array[Byte])], zoom: Int, uri: URI): Unit = { + uri.getScheme match { + case "s3" => + val path = uri.getPath + val prefix = path.stripPrefix("/").stripSuffix("/") + saveToS3(vectorTiles, zoom, uri.getAuthority, prefix) + case _ => + saveHadoop(vectorTiles, zoom, uri) + } + } + private def saveToS3(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int, bucket: String, prefix: String) = { vectorTiles .mapValues { tile => @@ -55,10 +69,48 @@ package object export { }) } + private def saveToS3(vectorTiles: Dataset[(SpatialKey, Array[Byte])], zoom: Int, bucket: String, prefix: String) = { + import vectorTiles.sparkSession.implicits._ + + SaveToS3( + vectorTiles + .map { tup: (SpatialKey, Array[Byte]) => + val byteStream = new ByteArrayOutputStream() + + try { + val gzipStream = new GZIPOutputStream(byteStream) + try { + gzipStream.write(tup._2) + } finally { + gzipStream.close() + } + } finally { + byteStream.close() + } + + (tup._1, byteStream.toByteArray) + }, + { sk: SpatialKey => s"s3://${bucket}/${prefix}/${zoom}/${sk.col}/${sk.row}.mvt" }, + putObjectModifier = { o => + val md = o.getMetadata + md.setContentEncoding("gzip") + o.withMetadata(md).withCannedAcl(PublicRead) + } + ) + } + private def saveHadoop(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int, uri: URI) = { vectorTiles .mapValues(_.toBytes) .saveToHadoop({ sk: SpatialKey => s"${uri}/${zoom}/${sk.col}/${sk.row}.mvt" }) } + private def saveHadoop(vectorTiles: Dataset[(SpatialKey, Array[Byte])], zoom: Int, uri: URI) = { + import vectorTiles.sparkSession.implicits._ + SaveToHadoop( + vectorTiles.map { tup: (SpatialKey, Array[Byte]) => (tup._1, tup._2) }, + { sk: SpatialKey => s"${uri}/${zoom}/${sk.col}/${sk.row}.mvt" } + ) + } + } diff --git a/src/main/scala/vectorpipe/vectortile/package.scala b/core/src/main/scala/vectorpipe/vectortile/package.scala similarity index 68% rename from src/main/scala/vectorpipe/vectortile/package.scala rename to core/src/main/scala/vectorpipe/vectortile/package.scala index e2c77340..c2f24f85 100644 --- a/src/main/scala/vectorpipe/vectortile/package.scala +++ b/core/src/main/scala/vectorpipe/vectortile/package.scala @@ -7,23 +7,81 @@ import geotrellis.vector._ import geotrellis.vector.reproject._ import geotrellis.vectortile._ import org.apache.spark.sql._ +import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions._ import org.locationtech.jts.{geom => jts} import scala.concurrent._ import scala.concurrent.duration._ +import scala.reflect.{classTag, ClassTag} import scala.util.{Try, Success, Failure} package object vectortile { - type VectorTileFeature[+G <: Geometry] = Feature[G, Map[String, Value]] - sealed trait LayerMultiplicity { val name: String } case class SingleLayer(val name: String) extends LayerMultiplicity case class LayerNamesInColumn(val name: String) extends LayerMultiplicity @transient lazy val logger = org.apache.log4j.Logger.getRootLogger + type VectorTileFeature[+G <: Geometry] = Feature[G, Map[String, Value]] + + def caseClassToVTFeature[T: ClassTag]: T => VectorTileFeature[Geometry] = { + // val accessors = typeOf[T].members.collect { + // case m: MethodSymbol if m.isCaseAccessor => m + // }.toList + // val jtsGeomType = typeOf[jts.Geometry] + // assert(accessors.exists(_.returnType <:< jtsGeomType), "Automatic case class conversion to Features requires a JTS Geometry type") + + val fields = classTag[T].runtimeClass.getDeclaredFields.filterNot(_.isSynthetic).toList + val jtsGeomType = classOf[jts.Geometry] + assert(fields.exists{ f => classOf[jts.Geometry].isAssignableFrom(f.getType) }, + "Automatic case class conversion to Features requires a JTS Geometry type") + + { t: T => + fields.foldLeft( (None: Option[Geometry], Map.empty[String, Value]) ) { (state, field) => + field.getType match { + case ty if jtsGeomType.isAssignableFrom(ty) => + field.setAccessible(true) + val g = Geometry(field.get(t).asInstanceOf[jts.Geometry]) + field.setAccessible(false) + (Some(g), state._2) + case ty if ty == classOf[String] => + field.setAccessible(true) + val v = field.get(t).asInstanceOf[String] + field.setAccessible(false) + (state._1, state._2 + (field.getName -> VString(v))) + case ty if ty == classOf[Int] => + field.setAccessible(true) + val v = field.getInt(t) + field.setAccessible(false) + (state._1, state._2 + (field.getName -> VInt64(v.toLong))) + case ty if ty == classOf[Long] => + field.setAccessible(true) + val v = field.getLong(t) + field.setAccessible(false) + (state._1, state._2 + (field.getName -> VInt64(v))) + case ty if ty == classOf[Float] => + field.setAccessible(true) + val v = field.getFloat(t) + field.setAccessible(false) + (state._1, state._2 + (field.getName -> VFloat(v))) + case ty if ty == classOf[Double] => + field.setAccessible(true) + val v = field.getDouble(t) + field.setAccessible(false) + (state._1, state._2 + (field.getName -> VDouble(v))) + case _ => + logger.warn(s"Dropped ${field.getName} of incompatible type ${field.getType} from vector tile feature") + state + } + } match { + case (None, m) => throw new IllegalStateException("JTS Geometry type not found") + case (Some(g), attrs) => Feature(g, attrs) + } + } + } + @transient lazy val st_reprojectGeom = udf { (g: jts.Geometry, srcProj: String, destProj: String) => val trans = Proj4Transform(CRS.fromString(srcProj), CRS.fromString(destProj)) val gt = Geometry(g) diff --git a/src/main/tut/index.md b/core/src/main/tut/index.md similarity index 100% rename from src/main/tut/index.md rename to core/src/main/tut/index.md diff --git a/src/main/tut/outputs.md b/core/src/main/tut/outputs.md similarity index 100% rename from src/main/tut/outputs.md rename to core/src/main/tut/outputs.md diff --git a/src/main/tut/sources.md b/core/src/main/tut/sources.md similarity index 100% rename from src/main/tut/sources.md rename to core/src/main/tut/sources.md diff --git a/src/main/tut/usage.md b/core/src/main/tut/usage.md similarity index 100% rename from src/main/tut/usage.md rename to core/src/main/tut/usage.md diff --git a/src/main/tut/usage/concepts.md b/core/src/main/tut/usage/concepts.md similarity index 100% rename from src/main/tut/usage/concepts.md rename to core/src/main/tut/usage/concepts.md diff --git a/src/main/tut/usage/osm.md b/core/src/main/tut/usage/osm.md similarity index 100% rename from src/main/tut/usage/osm.md rename to core/src/main/tut/usage/osm.md diff --git a/src/main/tut/usage/usage.md b/core/src/main/tut/usage/usage.md similarity index 100% rename from src/main/tut/usage/usage.md rename to core/src/main/tut/usage/usage.md diff --git a/src/test/resources/.gitignore b/core/src/test/resources/.gitignore similarity index 100% rename from src/test/resources/.gitignore rename to core/src/test/resources/.gitignore diff --git a/src/test/resources/isle-of-man-latest.osm.orc b/core/src/test/resources/isle-of-man-latest.osm.orc similarity index 100% rename from src/test/resources/isle-of-man-latest.osm.orc rename to core/src/test/resources/isle-of-man-latest.osm.orc diff --git a/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties similarity index 100% rename from src/test/resources/log4j.properties rename to core/src/test/resources/log4j.properties diff --git a/src/test/resources/relation-110564.orc b/core/src/test/resources/relation-110564.orc similarity index 100% rename from src/test/resources/relation-110564.orc rename to core/src/test/resources/relation-110564.orc diff --git a/src/test/resources/relation-110564.wkt b/core/src/test/resources/relation-110564.wkt similarity index 100% rename from src/test/resources/relation-110564.wkt rename to core/src/test/resources/relation-110564.wkt diff --git a/src/test/resources/relation-191199.orc b/core/src/test/resources/relation-191199.orc similarity index 100% rename from src/test/resources/relation-191199.orc rename to core/src/test/resources/relation-191199.orc diff --git a/src/test/resources/relation-191199.wkt b/core/src/test/resources/relation-191199.wkt similarity index 100% rename from src/test/resources/relation-191199.wkt rename to core/src/test/resources/relation-191199.wkt diff --git a/src/test/resources/relation-191204.orc b/core/src/test/resources/relation-191204.orc similarity index 100% rename from src/test/resources/relation-191204.orc rename to core/src/test/resources/relation-191204.orc diff --git a/src/test/resources/relation-191204.wkt b/core/src/test/resources/relation-191204.wkt similarity index 100% rename from src/test/resources/relation-191204.wkt rename to core/src/test/resources/relation-191204.wkt diff --git a/src/test/resources/relation-1949938.orc b/core/src/test/resources/relation-1949938.orc similarity index 100% rename from src/test/resources/relation-1949938.orc rename to core/src/test/resources/relation-1949938.orc diff --git a/src/test/resources/relation-1949938.wkt b/core/src/test/resources/relation-1949938.wkt similarity index 100% rename from src/test/resources/relation-1949938.wkt rename to core/src/test/resources/relation-1949938.wkt diff --git a/src/test/resources/relation-2554903.orc b/core/src/test/resources/relation-2554903.orc similarity index 100% rename from src/test/resources/relation-2554903.orc rename to core/src/test/resources/relation-2554903.orc diff --git a/src/test/resources/relation-2554903.wkt b/core/src/test/resources/relation-2554903.wkt similarity index 100% rename from src/test/resources/relation-2554903.wkt rename to core/src/test/resources/relation-2554903.wkt diff --git a/src/test/resources/relation-2580685.orc b/core/src/test/resources/relation-2580685.orc similarity index 100% rename from src/test/resources/relation-2580685.orc rename to core/src/test/resources/relation-2580685.orc diff --git a/src/test/resources/relation-2580685.wkt b/core/src/test/resources/relation-2580685.wkt similarity index 100% rename from src/test/resources/relation-2580685.wkt rename to core/src/test/resources/relation-2580685.wkt diff --git a/src/test/resources/relation-3080946.orc b/core/src/test/resources/relation-3080946.orc similarity index 100% rename from src/test/resources/relation-3080946.orc rename to core/src/test/resources/relation-3080946.orc diff --git a/src/test/resources/relation-3080946.wkt b/core/src/test/resources/relation-3080946.wkt similarity index 100% rename from src/test/resources/relation-3080946.wkt rename to core/src/test/resources/relation-3080946.wkt diff --git a/src/test/resources/relation-3105056.orc b/core/src/test/resources/relation-3105056.orc similarity index 100% rename from src/test/resources/relation-3105056.orc rename to core/src/test/resources/relation-3105056.orc diff --git a/src/test/resources/relation-3105056.wkt b/core/src/test/resources/relation-3105056.wkt similarity index 100% rename from src/test/resources/relation-3105056.wkt rename to core/src/test/resources/relation-3105056.wkt diff --git a/src/test/resources/relation-333501.orc b/core/src/test/resources/relation-333501.orc similarity index 100% rename from src/test/resources/relation-333501.orc rename to core/src/test/resources/relation-333501.orc diff --git a/src/test/resources/relation-333501.wkt b/core/src/test/resources/relation-333501.wkt similarity index 100% rename from src/test/resources/relation-333501.wkt rename to core/src/test/resources/relation-333501.wkt diff --git a/src/test/resources/relation-393502.orc b/core/src/test/resources/relation-393502.orc similarity index 100% rename from src/test/resources/relation-393502.orc rename to core/src/test/resources/relation-393502.orc diff --git a/src/test/resources/relation-393502.wkt b/core/src/test/resources/relation-393502.wkt similarity index 100% rename from src/test/resources/relation-393502.wkt rename to core/src/test/resources/relation-393502.wkt diff --git a/src/test/resources/relation-5448156.orc b/core/src/test/resources/relation-5448156.orc similarity index 100% rename from src/test/resources/relation-5448156.orc rename to core/src/test/resources/relation-5448156.orc diff --git a/src/test/resources/relation-5448156.wkt b/core/src/test/resources/relation-5448156.wkt similarity index 100% rename from src/test/resources/relation-5448156.wkt rename to core/src/test/resources/relation-5448156.wkt diff --git a/src/test/resources/relation-5448691.orc b/core/src/test/resources/relation-5448691.orc similarity index 100% rename from src/test/resources/relation-5448691.orc rename to core/src/test/resources/relation-5448691.orc diff --git a/src/test/resources/relation-5448691.wkt b/core/src/test/resources/relation-5448691.wkt similarity index 100% rename from src/test/resources/relation-5448691.wkt rename to core/src/test/resources/relation-5448691.wkt diff --git a/src/test/resources/relation-5612959.orc b/core/src/test/resources/relation-5612959.orc similarity index 100% rename from src/test/resources/relation-5612959.orc rename to core/src/test/resources/relation-5612959.orc diff --git a/src/test/resources/relation-5612959.wkt b/core/src/test/resources/relation-5612959.wkt similarity index 100% rename from src/test/resources/relation-5612959.wkt rename to core/src/test/resources/relation-5612959.wkt diff --git a/src/test/resources/relation-61315.orc b/core/src/test/resources/relation-61315.orc similarity index 100% rename from src/test/resources/relation-61315.orc rename to core/src/test/resources/relation-61315.orc diff --git a/src/test/resources/relation-61315.wkt b/core/src/test/resources/relation-61315.wkt similarity index 100% rename from src/test/resources/relation-61315.wkt rename to core/src/test/resources/relation-61315.wkt diff --git a/src/test/resources/relation-6710544.orc b/core/src/test/resources/relation-6710544.orc similarity index 100% rename from src/test/resources/relation-6710544.orc rename to core/src/test/resources/relation-6710544.orc diff --git a/src/test/resources/relation-6710544.wkt b/core/src/test/resources/relation-6710544.wkt similarity index 100% rename from src/test/resources/relation-6710544.wkt rename to core/src/test/resources/relation-6710544.wkt diff --git a/src/test/resources/view/cluster-view.html b/core/src/test/resources/view/cluster-view.html similarity index 100% rename from src/test/resources/view/cluster-view.html rename to core/src/test/resources/view/cluster-view.html diff --git a/src/test/resources/view/layer-test.html b/core/src/test/resources/view/layer-test.html similarity index 100% rename from src/test/resources/view/layer-test.html rename to core/src/test/resources/view/layer-test.html diff --git a/src/test/scala/vectorpipe/MultiPolygonRelationReconstructionSpec.scala b/core/src/test/scala/vectorpipe/MultiPolygonRelationReconstructionSpec.scala similarity index 100% rename from src/test/scala/vectorpipe/MultiPolygonRelationReconstructionSpec.scala rename to core/src/test/scala/vectorpipe/MultiPolygonRelationReconstructionSpec.scala diff --git a/src/test/scala/vectorpipe/ProcessOSMTest.scala b/core/src/test/scala/vectorpipe/ProcessOSMTest.scala similarity index 100% rename from src/test/scala/vectorpipe/ProcessOSMTest.scala rename to core/src/test/scala/vectorpipe/ProcessOSMTest.scala diff --git a/src/test/scala/vectorpipe/TestEnvironment.scala b/core/src/test/scala/vectorpipe/TestEnvironment.scala similarity index 100% rename from src/test/scala/vectorpipe/TestEnvironment.scala rename to core/src/test/scala/vectorpipe/TestEnvironment.scala diff --git a/src/test/scala/vectorpipe/functions/osm/FunctionSpec.scala b/core/src/test/scala/vectorpipe/functions/osm/FunctionSpec.scala similarity index 100% rename from src/test/scala/vectorpipe/functions/osm/FunctionSpec.scala rename to core/src/test/scala/vectorpipe/functions/osm/FunctionSpec.scala diff --git a/src/test/scala/vectorpipe/vectortile/LayerTestPipeline.scala b/core/src/test/scala/vectorpipe/vectortile/LayerTestPipeline.scala similarity index 68% rename from src/test/scala/vectorpipe/vectortile/LayerTestPipeline.scala rename to core/src/test/scala/vectorpipe/vectortile/LayerTestPipeline.scala index a0d1d6d1..6233edf8 100644 --- a/src/test/scala/vectorpipe/vectortile/LayerTestPipeline.scala +++ b/core/src/test/scala/vectorpipe/vectortile/LayerTestPipeline.scala @@ -12,7 +12,7 @@ import vectorpipe.vectortile._ case class LayerTestPipeline(geometryColumn: String, baseOutputURI: java.net.URI) extends Pipeline { val layerMultiplicity = LayerNamesInColumn("layers") - override def select(wayGeoms: DataFrame, targetZoom: Int, keyColumn: String): DataFrame = { + override val select: Option[(DataFrame, Int, String) => DataFrame] = Some { (wayGeoms: DataFrame, targetZoom: Int, keyColumn: String) => import wayGeoms.sparkSession.implicits._ wayGeoms @@ -20,6 +20,6 @@ case class LayerTestPipeline(geometryColumn: String, baseOutputURI: java.net.URI .where(functions.not(functions.isnull('layers))) } - override def clip(geom: jts.Geometry, key: geotrellis.spark.SpatialKey, layoutLevel: geotrellis.spark.tiling.LayoutLevel): jts.Geometry = - Clipping.byLayoutCell(geom, key, layoutLevel) + override val clip: Option[(jts.Geometry, geotrellis.spark.SpatialKey, geotrellis.spark.tiling.LayoutLevel) => jts.Geometry] = + Some(Clipping.byLayoutCell) } diff --git a/src/test/scala/vectorpipe/vectortile/PipelineSpec.scala b/core/src/test/scala/vectorpipe/vectortile/PipelineSpec.scala similarity index 95% rename from src/test/scala/vectorpipe/vectortile/PipelineSpec.scala rename to core/src/test/scala/vectorpipe/vectortile/PipelineSpec.scala index 3920ad73..02ec4ca6 100644 --- a/src/test/scala/vectorpipe/vectortile/PipelineSpec.scala +++ b/core/src/test/scala/vectorpipe/vectortile/PipelineSpec.scala @@ -19,7 +19,8 @@ class PipelineSpec extends FunSpec with TestEnvironment with Matchers { val nodeGeoms = nodes .filter(functions.not(isnull('lat))) .withColumn("geometry", st_makePoint('lon, 'lat)) - .drop("lat", "lon") + .withColumnRenamed("lon", "xw") + .withColumnRenamed("lat", "yw") .withColumn("weight", lit(1)) .cache diff --git a/src/test/scala/vectorpipe/vectortile/TestPipeline.scala b/core/src/test/scala/vectorpipe/vectortile/TestPipeline.scala similarity index 57% rename from src/test/scala/vectorpipe/vectortile/TestPipeline.scala rename to core/src/test/scala/vectorpipe/vectortile/TestPipeline.scala index 7a954e68..2e916fe5 100644 --- a/src/test/scala/vectorpipe/vectortile/TestPipeline.scala +++ b/core/src/test/scala/vectorpipe/vectortile/TestPipeline.scala @@ -22,11 +22,23 @@ object Bin { } case class TestPipeline(geometryColumn: String, baseOutputURI: java.net.URI, gridResolution: Int) extends Pipeline { - val weightedCentroid = new WeightedCentroid + //val weightedCentroid = new WeightedCentroid + + @transient lazy val locallogger = org.apache.log4j.Logger.getLogger(VectorPipe.getClass) + + def time[T](msg: String)(f: => T): T = { + val s = System.currentTimeMillis + val result = f + val e = System.currentTimeMillis + val t = "%,d".format(e - s) + locallogger.warn(s"${msg}: Completed in $t ms") + result + } val layerMultiplicity = SingleLayer("points") + val gf = new jts.GeometryFactory - override def reduce(input: DataFrame, layoutLevel: LayoutLevel, keyColumn: String): DataFrame = { + override val reduce: Option[(DataFrame, LayoutLevel, String) => DataFrame] = Some { (input, layoutLevel, keyColumn) => import input.sparkSession.implicits._ val layout = layoutLevel.layout @@ -37,15 +49,24 @@ case class TestPipeline(geometryColumn: String, baseOutputURI: java.net.URI, gri val c = pt.getCoordinate Bin(re.mapToGrid(c.x, c.y)) } + val st_makePoint = functions.udf { (x: Double, y: Double) => gf.createPoint(new jts.Coordinate(x, y)) } - val st_geomToPoint = functions.udf { g: jts.Geometry => g.asInstanceOf[jts.Point] } - - input.withColumn(keyColumn, explode(col(keyColumn))) + val binned = input.withColumn(keyColumn, explode(col(keyColumn))) .withColumn("bin", binOfTile(col(geometryColumn), col(keyColumn))) + time("Binned points into key cells")(binned.count) + + val summed = binned .groupBy(col(keyColumn), col("bin")) - .agg(sum('weight) as 'weight, weightedCentroid(st_geomToPoint(col(geometryColumn)), 'weight) as geometryColumn) + //.agg(sum('weight) as 'weight, weightedCentroid(st_geomToPoint(col(geometryColumn)), 'weight) as geometryColumn) + .agg(sum('weight) as 'weight, sum('xw) as 'xw, sum('yw) as 'yw) .drop('bin) + time("Summed weighted coords")(summed.count) + + val reduced = summed + .withColumn(geometryColumn, st_makePoint('xw / 'weight, 'yw / 'weight)) .withColumn(keyColumn, array(col(keyColumn))) + time("Converted to points and set keys")(reduced.count) + reduced } override def pack(row: Row, zoom: Int): VectorTileFeature[Point] = { diff --git a/src/test/scala/vectorpipe/vectortile/WeightedCentroid.scala b/core/src/test/scala/vectorpipe/vectortile/WeightedCentroid.scala similarity index 100% rename from src/test/scala/vectorpipe/vectortile/WeightedCentroid.scala rename to core/src/test/scala/vectorpipe/vectortile/WeightedCentroid.scala diff --git a/examples/build.sbt b/examples/build.sbt new file mode 100644 index 00000000..78317674 --- /dev/null +++ b/examples/build.sbt @@ -0,0 +1,11 @@ +import Dependencies._ + +name := "vectorpipe-examples" + +// libraryDependencies ++= Seq( +// "com.etsy" % "statsd-jvm-profiler" % "2.0.0" +// ) + +assemblyJarName in assembly := "vectorpipe-examples.jar" + +// javaAgents += "com.etsy" % "statsd-jvm-profiler" % "2.0.0" % "compile;test" diff --git a/examples/src/main/scala/vectorpipe/examples/BaselineMain.scala b/examples/src/main/scala/vectorpipe/examples/BaselineMain.scala new file mode 100644 index 00000000..c5936a55 --- /dev/null +++ b/examples/src/main/scala/vectorpipe/examples/BaselineMain.scala @@ -0,0 +1,93 @@ +package vectorpipe.examples + +import cats.implicits._ +import com.monovore.decline._ +import geotrellis.proj4.{LatLng, WebMercator, Proj4Transform, Transform} +import geotrellis.spark.SpatialKey +import geotrellis.spark.tiling.{LayoutDefinition, ZoomedLayoutScheme} +import geotrellis.vector.{Geometry => GTGeometry, Feature} +import geotrellis.vectortile.Value +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.{when, udf, not, isnull} +import org.locationtech.geomesa.spark.jts._ +import org.locationtech.jts.geom._ + +import vectorpipe._ +import vectorpipe.functions.osm._ +import vectorpipe.vectortile.{buildVectorTile, Clipping} +import vectorpipe.vectortile.export._ + +import java.net.URI +import java.nio.file.Path + +/* + spark-submit --class vectorpipe.examples.BaselineMain vectorpipe-examples.jar \ + --output s3://geotrellis-test/jpolchlopek/vectorpipe/vectortiles/layers \ + s3://geotrellis-test/jpolchlopek/vectorpipe/isle-of-man-latest.osm.orc +*/ + +case class Geom(geom: Geometry, layer: String) +object Geom { + val st_reproject = udf { g: Geometry => + val trans = Proj4Transform(LatLng, WebMercator) + GTGeometry(g).reproject(trans).jtsGeom + } +} + +case class KeyedGeom(key: SpatialKey, layer: String, geom: Geometry) +object KeyedGeom { + def fromGeom(layout: LayoutDefinition)(g: Geom): Seq[KeyedGeom] = { + layout.mapTransform.keysForGeometry(GTGeometry(g.geom.asInstanceOf[Geometry])).map{ k => KeyedGeom(k, g.layer, g.geom) }.toSeq + } +} + +object BaselineMain extends CommandApp( + name = "OSM-layers", + header = "Convert OSM ORC file into vector tile set containing roads and buildings", + main = { + val outputOpt = Opts.option[URI]("output", help = "Base URI for output.") + val inputOpt = Opts.argument[String]() + + (outputOpt, inputOpt).mapN { (output, input) => + implicit val spark: SparkSession = SparkSession.builder + .appName("Layer VT Generation") + .config("spark.ui.enabled", "true") + .config("spark.default.parallelism","8") + .config("spark.serializer", classOf[KryoSerializer].getName) + .config("spark.kryo.registrationRequired", "false") + .config("spark.kryoserializer.buffer.max", "500m") + .config("spark.sql.orc.impl", "native") + .getOrCreate() + .withJTS + + import spark.implicits._ + + val level = ZoomedLayoutScheme(WebMercator).levelForZoom(14) + val layout = level.layout + + val df = spark.read.orc(input) + val keyedGeoms = OSM + .toGeometry(df) + .withColumn("layer", when(isBuilding('tags), "buildings").when(isRoad('tags), "roads")) + .where(not(isnull('layer))) + // .count + .withColumn("geom", Geom.st_reproject('geom)) + .as[Geom] + .flatMap(KeyedGeom.fromGeom(layout)(_)) + val grouped = keyedGeoms.groupByKey(_.key) + val tiles = grouped.mapGroups{ (key, geoms) => + val clipped = geoms.map{ klg => KeyedGeom(key, klg.layer, Clipping.byLayoutCell(klg.geom, key, level)) } + val layers = clipped.toIterable.groupBy(_.layer) + val features = layers.mapValues{ _.map{ klg => Feature(GTGeometry(klg.geom), Map.empty[String, Value]) } } + key -> buildVectorTile(features, layout.mapTransform.keyToExtent(key), 4096, false).toBytes + } + + saveVectorTiles(tiles, 14, output) + // System.err.println(keyedGeoms) + + spark.stop + } + } + +) diff --git a/examples/src/main/scala/vectorpipe/examples/OSMLayerMain.scala b/examples/src/main/scala/vectorpipe/examples/OSMLayerMain.scala new file mode 100644 index 00000000..d82b198e --- /dev/null +++ b/examples/src/main/scala/vectorpipe/examples/OSMLayerMain.scala @@ -0,0 +1,51 @@ +package vectorpipe.examples + +import cats.implicits._ +import com.monovore.decline._ +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.sql._ +import org.locationtech.geomesa.spark.jts._ + +import vectorpipe._ + +import java.net.URI +import java.nio.file.Path + +/* + spark-submit --class vectorpipe.examples.OSMLayerMain vectorpipe-examples.jar \ + --output s3://geotrellis-test/jpolchlopek/vectorpipe/vectortiles/layers \ + s3://geotrellis-test/jpolchlopek/vectorpipe/isle-of-man-latest.osm.orc +*/ + +object OSMLayerMain extends CommandApp( + name = "OSM-layers", + header = "Convert OSM ORC file into vector tile set containing roads and buildings", + main = { + val outputOpt = Opts.option[URI]("output", help = "Base URI for output.") + val inputOpt = Opts.argument[String]() + + (outputOpt, inputOpt).mapN { (output, input) => + implicit val spark: SparkSession = SparkSession.builder + .appName("Layer VT Generation") + .config("spark.ui.enabled", "true") + .config("spark.default.parallelism","8") + .config("spark.serializer", classOf[KryoSerializer].getName) + .config("spark.kryo.registrationRequired", "false") + .config("spark.kryoserializer.buffer.max", "500m") + .config("spark.sql.orc.impl", "native") + .getOrCreate() + .withJTS + + import spark.implicits._ + + val df = spark.read.orc(input) + val wayGeoms = OSM.toGeometry(df) + val pipeline = OSMLayerPipeline("geom", output) + + VectorPipe(wayGeoms, pipeline, VectorPipe.Options.forZoom(14)) + + spark.stop + } + } + +) diff --git a/examples/src/main/scala/vectorpipe/examples/OSMLayerPipeline.scala b/examples/src/main/scala/vectorpipe/examples/OSMLayerPipeline.scala new file mode 100644 index 00000000..7de59725 --- /dev/null +++ b/examples/src/main/scala/vectorpipe/examples/OSMLayerPipeline.scala @@ -0,0 +1,25 @@ +package vectorpipe.examples + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions +import org.apache.spark.sql.functions.when +import org.locationtech.jts.{geom => jts} + +import vectorpipe._ +import vectorpipe.functions.osm._ +import vectorpipe.vectortile._ + +case class OSMLayerPipeline(geometryColumn: String, baseOutputURI: java.net.URI) extends Pipeline { + val layerMultiplicity = LayerNamesInColumn("layers") + + override val select: Option[(DataFrame, Int, String) => DataFrame] = Some { (wayGeoms: DataFrame, targetZoom: Int, keyColumn: String) => + import wayGeoms.sparkSession.implicits._ + + wayGeoms + .withColumn("layers", when(isBuilding('tags), "buildings").when(isRoad('tags), "roads")) + .where(functions.not(functions.isnull('layers))) + } + + override val clip: Option[(jts.Geometry, geotrellis.spark.SpatialKey, geotrellis.spark.tiling.LayoutLevel) => jts.Geometry] = + Some(Clipping.byLayoutCell) +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 61fa9feb..1f961954 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -7,3 +7,6 @@ addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.1") addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.0") addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full) + +//addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.21") +//addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.5")