Skip to content

Commit

Permalink
Update Mailgun adapter to work with the new API (close #665)
Browse files Browse the repository at this point in the history
Previously it was impossible to use Mailgun adapter.
Mailgun adapter has now been updated to work with `application/json` webhooks.
These move all the parameters (with a similar schema) from form-encoded into
json object fields. The code has been simplified to reflect on the change.

Related to [PE-3529].
  • Loading branch information
peel authored and lmath committed Aug 15, 2022
1 parent 6d7646f commit 3723964
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ class MailgunAdapterSpec extends Specification with CatsIO {
"enrichWith" should {
"enrich with MailgunAdapter" in {
val body =
"domain=sandbox57070072075d4cfd9008d4332108734c.mailgun.org&my_var_1=Mailgun+Variable+%231&my-var-2=awesome&message-headers=%5B%5B%22Received%22%2C+%22by+luna.mailgun.net+with+SMTP+mgrt+8734663311733%3B+Fri%2C+03+May+2013+18%3A26%3A27+%2B0000%22%5D%2C+%5B%22Content-Type%22%2C+%5B%22multipart%2Falternative%22%2C+%7B%22boundary%22%3A+%22eb663d73ae0a4d6c9153cc0aec8b7520%22%7D%5D%5D%2C+%5B%22Mime-Version%22%2C+%221.0%22%5D%2C+%5B%22Subject%22%2C+%22Test+deliver+webhook%22%5D%2C+%5B%22From%22%2C+%22Bob+%3Cbob%40sandbox57070072075d4cfd9008d4332108734c.mailgun.org%3E%22%5D%2C+%5B%22To%22%2C+%22Alice+%3Calice%40example.com%3E%22%5D%2C+%5B%22Message-Id%22%2C+%22%3C20130503182626.18666.16540%40sandbox57070072075d4cfd9008d4332108734c.mailgun.org%3E%22%5D%2C+%5B%22X-Mailgun-Variables%22%2C+%22%7B%5C%22my_var_1%5C%22%3A+%5C%22Mailgun+Variable+%231%5C%22%2C+%5C%22my-var-2%5C%22%3A+%5C%22awesome%5C%22%7D%22%5D%2C+%5B%22Date%22%2C+%22Fri%2C+03+May+2013+18%3A26%3A27+%2B0000%22%5D%2C+%5B%22Sender%22%2C+%22bob%40sandbox57070072075d4cfd9008d4332108734c.mailgun.org%22%5D%5D&Message-Id=%3C20130503182626.18666.16540%40sandbox57070072075d4cfd9008d4332108734c.mailgun.org%3E&recipient=alice%40example.com&event=delivered&timestamp=1510161827&token=cd87f5a30002794e37aa49e67fb46990e578b1e9197773d817&signature=c902ff9e3dea54c2dbe1871f9041653292ea9689d3d2b2d2ecfa996f025b9669&body-plain="
"""{"signature":{"token":"090e1ce3702378c8121f3765a8efe0ffb97c4e2ca2adda6729","timestamp":"1657907833","signature":"496413b238ab6affce021b850d3fc72c4832fb04f1aed6d4a2fda7d08e6e95a6"},"event-data":{"id":"CPgfbmQMTCKtHW6uIWtuVe","timestamp":1521472262.908181,"log-level":"info","event":"delivered","delivery-status":{"tls":true,"mx-host":"smtp-in.example.com","code":250,"description":"","session-seconds":0.4331989288330078,"utf8":true,"attempt-no":1,"message":"OK","certificate-verified":true},"flags":{"is-routed":false,"is-authenticated":true,"is-system-test":false,"is-test-mode":false},"envelope":{"transport":"smtp","sender":"[email protected]","sending-ip":"209.61.154.250","targets":"[email protected]"},"message":{"headers":{"to":"Alice <[email protected]>","message-id":"20130503182626.18666.16540@sandbox22aee81b13674403a5335202df94f7e7.mailgun.org","from":"Bob <[email protected]>","subject":"Test delivered webhook"},"attachments":[],"size":111},"recipient":"[email protected]","recipient-domain":"example.com","storage":{"url":"https://se.api.mailgun.net/v3/domains/sandbox22aee81b13674403a5335202df94f7e7.mailgun.org/messages/message_key","key":"message_key"},"campaigns":[],"tags":["my_tag_1","my_tag_2"],"user-variables":{"my_var_1":"Mailgun Variable #1","my-var-2":"awesome"}}}"""
val input = BlackBoxTesting.buildCollectorPayload(
path = "/com.mailgun/v1",
body = body.some,
contentType = "application/x-www-form-urlencoded".some
contentType = "application/json".some
)
val expected = Map(
"v_tracker" -> "com.mailgun-v1",
Expand All @@ -39,7 +39,7 @@ class MailgunAdapterSpec extends Specification with CatsIO {
"event_format" -> "jsonschema",
"event_version" -> "1-0-0",
"event" -> "unstruct",
"unstruct_event" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.mailgun/message_delivered/jsonschema/1-0-0","data":{"recipient":"[email protected]","timestamp":"2017-11-08T17:23:47.000Z","domain":"sandbox57070072075d4cfd9008d4332108734c.mailgun.org","signature":"c902ff9e3dea54c2dbe1871f9041653292ea9689d3d2b2d2ecfa996f025b9669","messageHeaders":"[[\"Received\", \"by luna.mailgun.net with SMTP mgrt 8734663311733; Fri, 03 May 2013 18:26:27 +0000\"], [\"Content-Type\", [\"multipart/alternative\", {\"boundary\": \"eb663d73ae0a4d6c9153cc0aec8b7520\"}]], [\"Mime-Version\", \"1.0\"], [\"Subject\", \"Test deliver webhook\"], [\"From\", \"Bob <[email protected]>\"], [\"To\", \"Alice <[email protected]>\"], [\"Message-Id\", \"<20130503182626.18666.16540@sandbox57070072075d4cfd9008d4332108734c.mailgun.org>\"], [\"X-Mailgun-Variables\", \"{\\\"my_var_1\\\": \\\"Mailgun Variable #1\\\", \\\"my-var-2\\\": \\\"awesome\\\"}\"], [\"Date\", \"Fri, 03 May 2013 18:26:27 +0000\"], [\"Sender\", \"[email protected]\"]]","myVar1":"Mailgun Variable #1","token":"cd87f5a30002794e37aa49e67fb46990e578b1e9197773d817","messageId":"<20130503182626.18666.16540@sandbox57070072075d4cfd9008d4332108734c.mailgun.org>","myVar2":"awesome"}}}""".noSpaces
"unstruct_event" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0", "data":{ "schema":"iglu:com.mailgun/message_delivered/jsonschema/1-0-0", "data":{"recipient":"[email protected]","deliveryStatus":{"certificateVerified":true,"sessionSeconds":0.4331989288330078,"description":"","mxHost":"smtp-in.example.com","tls":true,"code":250,"attemptNo":1,"utf8":true,"message":"OK"},"timestamp":"2022-07-15T17:57:13.000Z","flags":{"isRouted":false,"isAuthenticated":true,"isSystemTest":false,"isTestMode":false},"tags":["my_tag_1","my_tag_2"],"signature":"496413b238ab6affce021b850d3fc72c4832fb04f1aed6d4a2fda7d08e6e95a6","id":"CPgfbmQMTCKtHW6uIWtuVe","recipientDomain":"example.com","userVariables":{"myVar1":"Mailgun Variable #1","myVar2":"awesome"},"token":"090e1ce3702378c8121f3765a8efe0ffb97c4e2ca2adda6729","message":{"headers":{"to":"Alice <[email protected]>","messageId":"20130503182626.18666.16540@sandbox22aee81b13674403a5335202df94f7e7.mailgun.org","from":"Bob <[email protected]>","subject":"Test delivered webhook"},"attachments":[],"size":111},"storage":{"url":"https://se.api.mailgun.net/v3/domains/sandbox22aee81b13674403a5335202df94f7e7.mailgun.org/messages/message_key","key":"message_key"},"campaigns":[],"envelope":{"transport":"smtp","sender":"[email protected]","sendingIp":"209.61.154.250","targets":"[email protected]"},"logLevel":"info"}}}""".noSpaces
)
BlackBoxTesting.runTest(input, expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ package com.snowplowanalytics.snowplow.enrich.common
package adapters
package registry

import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
import cats.data.NonEmptyList
import cats.effect.Clock
Expand All @@ -31,8 +25,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.snowplow.badrows._
import io.circe._
import io.circe.syntax._
import org.apache.http.client.utils.URLEncodedUtils
import io.circe.parser._

import loaders.CollectorPayload
import utils.{HttpClient, JsonUtils => JU}
Expand All @@ -48,8 +41,7 @@ object MailgunAdapter extends Adapter {
private val TrackerVersion = "com.mailgun-v1"

// Expected content type for a request body
private val ContentTypes = List("application/x-www-form-urlencoded", "multipart/form-data")
private val ContentTypesStr = ContentTypes.mkString(", ")
private val ContentType = "application/json"

private val Vendor = "com.mailgun"
private val Format = "jsonschema"
Expand All @@ -73,7 +65,10 @@ object MailgunAdapter extends Adapter {
* @param client The Iglu client used for schema lookup and validation
* @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings
*/
override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](payload: CollectorPayload, client: Client[F, Json]): F[Adapted] =
override def toRawEvents[F[_]: Monad: RegistryLookup: Clock: HttpClient](
payload: CollectorPayload,
client: Client[F, Json]
): F[Adapted] =
(payload.body, payload.contentType) match {
case (None, _) =>
val failure = FailureDetails.AdapterFailure.InputData(
Expand All @@ -90,45 +85,38 @@ object MailgunAdapter extends Adapter {
)
Monad[F].pure(failure.invalidNel)
case (_, None) =>
val msg = s"no content type: expected one of $ContentTypesStr"
val msg = s"no content type: expected $ContentType"
Monad[F].pure(
FailureDetails.AdapterFailure.InputData("contentType", none, msg).invalidNel
)
case (_, Some(ct)) if !ContentTypes.exists(ct.startsWith) =>
val msg = s"expected one of $ContentTypesStr"
case (_, Some(ct)) if ct != ContentType =>
val msg = s"expected $ContentType"
Monad[F].pure(
FailureDetails.AdapterFailure
.InputData("contentType", ct.some, msg)
.invalidNel
)
case (Some(body), Some(ct)) =>
case (Some(body), Some(_)) =>
val _ = client
val params = toMap(payload.querystring)
Try {
getBoundary(ct)
.map(parseMultipartForm(body, _))
.getOrElse(
toMap(
URLEncodedUtils.parse(URI.create("http://localhost/?" + body), UTF_8).asScala.toList
)
.collect { case (k, Some(v)) => (k, v) }
)
} match {
case TF(e) =>
parse(body) match {
case Left(e) =>
val msg = s"could not parse body: ${JU.stripInstanceEtc(e.getMessage).orNull}"
Monad[F].pure(
FailureDetails.AdapterFailure
.InputData("body", body.some, msg)
.invalidNel
)
case TS(bodyMap) =>
case Right(bodyJson) =>
Monad[F].pure(
bodyMap
.get("event")
bodyJson.hcursor
.downField("event-data")
.downField("event")
.focus
.map { eventType =>
(for {
schemaUri <- lookupSchema(eventType.some, EventSchemaMap)
event <- payloadBodyToEvent(bodyMap)
schemaUri <- lookupSchema(eventType.asString, EventSchemaMap)
event <- payloadBodyToEvent(bodyJson)
mEvent <- mutateMailgunEvent(event)
} yield NonEmptyList.one(
RawEvent(
Expand All @@ -139,7 +127,7 @@ object MailgunAdapter extends Adapter {
schemaUri,
cleanupJsonEventValues(
mEvent,
("event", eventType).some,
eventType.asString.map(("event", _)),
List("timestamp")
),
"srv"
Expand Down Expand Up @@ -188,53 +176,13 @@ object MailgunAdapter extends Adapter {
}
}

private val boundaryRegex =
"""multipart/form-data.*?boundary=(?:")?([\S ]{0,69})(?: )*(?:")?$""".r

/**
* Returns the boundary parameter for a message of media type multipart/form-data
* (https://www.ietf.org/rfc/rfc2616.txt and https://www.ietf.org/rfc/rfc2046.txt)
* @param contentType Header field of the form
* "multipart/form-data; boundary=353d603f-eede-4b49-97ac-724fbc54ea3c"
* @return boundary Option[String]
* Converts payload into an event
* @param body Webhook Json request body
*/
private def getBoundary(contentType: String): Option[String] =
contentType match {
case boundaryRegex(boundaryString) => Some(boundaryString)
case _ => None
}

/**
* Rudimentary parsing the form fields of a multipart/form-data into a Map[String, String]
* other fields will be discarded
* (see https://www.ietf.org/rfc/rfc1867.txt and https://www.ietf.org/rfc/rfc2046.txt).
* This parser will only take into account part headers of content-disposition type form-data
* and only the parameter name e.g.
* Content-Disposition: form-data; anything="notllokingintothis"; name="key"
*
* value
* @param body The body of the message
* @param boundary String that separates the body parts
* @return a map of the form fields and their values (other fields are dropped)
*/
private def parseMultipartForm(body: String, boundary: String): Map[String, String] =
body
.split(s"--$boundary")
.flatMap({
case formDataRegex(k, v) => Some((k, v))
case _ => None
})
.toMap

private val formDataRegex =
"""(?sm).*Content-Disposition:\s*form-data\s*;[ \S\t]*?name="([^"]+)"[ \S\t]*$.*?(?<=^[ \t\S]*$)^\s*(.*?)(?:\s*)\z""".r

/**
* Converts a querystring payload into an event
* @param bodyMap The converted map from the querystring
*/
private def payloadBodyToEvent(bodyMap: Map[String, String]): Either[FailureDetails.AdapterFailure, Json] =
(bodyMap.get("timestamp"), bodyMap.get("token"), bodyMap.get("signature")) match {
private def payloadBodyToEvent(body: Json): Either[FailureDetails.AdapterFailure, Json] = {
val bodyMap = body.hcursor.downField("signature")
(bodyMap.downField("timestamp").focus, bodyMap.downField("token").focus, bodyMap.downField("signature").focus) match {
case (None, _, _) =>
FailureDetails.AdapterFailure
.InputData("timestamp", none, "missing 'timestamp'")
Expand All @@ -247,6 +195,18 @@ object MailgunAdapter extends Adapter {
FailureDetails.AdapterFailure
.InputData("signature", none, "missing 'signature'")
.asLeft
case (Some(_), Some(_), Some(_)) => bodyMap.asJson.asRight
case (Some(timestamp), Some(_), Some(_)) =>
body.hcursor
.downField("event-data")
.downField("timestamp")
.withFocus(_ => timestamp)
.up
.focus
.flatMap(json => bodyMap.focus.map(_.deepMerge(json)))
.toRight(
FailureDetails.AdapterFailure
.InputData("event-data", none, "missing 'event-data'")
)
}
}
}
Loading

0 comments on commit 3723964

Please sign in to comment.