Skip to content

Commit

Permalink
replaced akka actors by akka streams. based on stanikol's fork: https…
Browse files Browse the repository at this point in the history
  • Loading branch information
dportabella committed Apr 12, 2018
1 parent 529f0ff commit fb56426
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 231 deletions.
20 changes: 8 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# BatchGeocodingInScalaUsingGoogleAPI

This is a simple Scala program for parsing a list of addresses using google maps api.
This is a simple program for parsing a list of addresses using google maps api.

The main function for parsing a single address is implemented on `src/main/scala/AddressParser.scala`:
- It builds the proper URL google maps query with the requested address
- It uses Play Json to parse the json response
- It extracts the data we need and it builds an Address case class

`BatchParsderCmd` uses akka in order to query and parse a list of addresses asynchronously.
It queries the addresses from a database, and it uses three actors: GoogleGeocoder, AddressParserActor, and DB to save the results.
`BatchParsderCmd` queries the addresses from a database, queries each address using google maps api, parses it and saves the result to the DB.
It is implemented in Scala with akka streams and slick for optimal performance.


# Requirements
Expand Down Expand Up @@ -215,26 +215,22 @@ Usage: BatchParserCmd [options]
--op <value> where value = googleQueryAndParse, googleQueryOnly or parseOnly
--maxEntries <value>
--maxGoogleAPIOpenRequests <value>
--maxGoogleAPIRequestsPerSecond <value>
--maxGoogleAPIFatalErrors <value>
--googleApiKey <value>
--dbUrl <value>
--tableName <value>
--version
$ sbt "runMain BatchParserCmd --op=googleQueryAndParse --maxEntries=20 --maxGoogleAPIOpenRequests=10 --maxGoogleAPIFatalErrors=5 --googleApiKey="$googleApiKey" --dbUrl="$dbUrl" --tableName=addresses"
$ sbt "runMain BatchParserCmd --op=googleQueryAndParse --maxEntries=20 --maxGoogleAPIRequestsPerSecond=10 --maxGoogleAPIFatalErrors=5 --googleApiKey="$googleApiKey" --dbUrl="$dbUrl" --tableName=addresses"
```
`maxGoogleQueries` is the max number of google queries to do. It's best to try with a small number first.
`maxEntries` is the max number of google queries to do. It's best to try with a small number first.
The program will also stop if the max number of queries to the google api is exceeded (2500 request per day for the free account).
You can then execute this same command the following day, and it will resume the process (it will not re-download what it has queried already).

This version of the program fails with akka showing "dead letters" if there are too many addresses.
A workaround is to call this command before running sbt: `export SBT_OPTS="-Xmx2G -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xss2M -Duser.timezone=GMT"`.
The better option is to use the akka stream version of this program, which uses back-pressure. See the git branch of this project.
The program queries google in parallel. The bigger `maxGoogleAPIRequestsPerSecond`, the faster to query all addresses. Google has a rate limit and a daily limit, so try before with smalls numbers.

The program queries google in parallel. The bigger `maxOpenRequests`, the faster to query all addresses. Google has a rate limit, so try before with smalls numbers. Not more than 32.

The program will stop after `maxFatalErrors`. Set a small number.
The program will stop after `maxGoogleAPIFatalErrors`. Set a small number.


### Query the results
Expand Down
15 changes: 12 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,28 @@ version := "1.0"

scalaVersion := "2.12.2"

lazy val akkaHttpVersion = "10.0.9"
lazy val akkaVersion = "2.5.11"
lazy val akkaHttpVersion = "10.0.11"
lazy val playVersion = "2.6.2"

libraryDependencies ++= Seq(
"com.github.scopt" %% "scopt" % "3.5.0",
"com.typesafe.play" %% "play-json" % playVersion,
"com.typesafe.play" %% "anorm" % "2.5.3",
"com.typesafe.slick" %% "slick" % "3.2.3",
"com.typesafe.slick" %% "slick-hikaricp" % "3.2.3",
"mysql" % "mysql-connector-java" % "5.1.40",
"org.postgresql" % "postgresql" % "42.1.3",
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"org.scalaj" %% "scalaj-http" % "2.3.0",
"org.apache.sis.core" % "sis-referencing" % "0.7",
"com.github.nikita-volkov" % "sext" % "0.2.4",
"com.univocity" % "univocity-parsers" % "2.5.8",
"com.ibm.icu" % "icu4j" % "59.1"
"com.ibm.icu" % "icu4j" % "59.1",
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.lightbend.akka" %% "akka-stream-alpakka-slick" % "0.18",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
"org.slf4j" % "slf4j-nop" % "1.6.4"
)
17 changes: 15 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
slick-database{
profile = "slick.jdbc.MySQLProfile$"
db {
dataSourceClass = "slick.jdbc.DriverDataSource"
properties = {
driver = "com.mysql.jdbc.Driver"
}
}
}

# http://doc.akka.io/docs/akka-http/current/java/http/configuration.html

akka.http.client.connecting-timeout = 11 s

akka {
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
loglevel = "INFO"
stdout-loglevel = "INFO"

# The maximum number of open requests accepted into the pool across all
# materializations of any of its client flows.
Expand Down
16 changes: 16 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>*** \(%logger{30}\)%green(%X{debugId}) %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>

<logger name="BatchParserCmd" level="info" />
<logger name="GoogleGeocoder" level="info" />
<logger name="DB" level="info" />
<logger name="scala.slick" level="info" />
</configuration>
24 changes: 0 additions & 24 deletions src/main/scala/AddressParserActor.scala

This file was deleted.

117 changes: 74 additions & 43 deletions src/main/scala/BatchParserCmd.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import Model._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import akka.{Done, NotUsed}
import com.typesafe.scalalogging.Logger

import scala.io.StdIn
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.Try

object BatchParserCmd {
private val log = Logger("BatchParserCmd")

case class Config(
op: String = "",
maxEntries: Int = 100,
maxGoogleAPIOpenRequests: Int = 10,
maxGoogleAPIFatalErrors: Int = 5,
op: String = "",
maxEntries: Int = 100,
maxGoogleAPIRequestsPerSecond: Int = 10,
maxGoogleAPIFatalErrors: Int = 5,
googleApiKey: String = "",
dbUrl: String = "",
tableName: String = ""
Expand All @@ -22,8 +31,8 @@ object BatchParserCmd {
opt[Int]("maxEntries").required.action((x, c) =>
c.copy(maxEntries = x)).text("maxEntries")

opt[Int]("maxGoogleAPIOpenRequests").optional.action((x, c) =>
c.copy(maxGoogleAPIOpenRequests = x)).text("maxGoogleAPIOpenRequests")
opt[Int]("maxGoogleAPIRequestsPerSecond").optional.action((x, c) =>
c.copy(maxGoogleAPIRequestsPerSecond = x)).text("maxGoogleAPIRequestsPerSecond")

opt[Int]("maxGoogleAPIFatalErrors").optional.action((x, c) =>
c.copy(maxGoogleAPIFatalErrors = x)).text("maxGoogleAPIFatalErrors")
Expand All @@ -43,53 +52,75 @@ object BatchParserCmd {
def main(args: Array[String]) {
parser.parse(args, Config()) match {
case Some(config) =>
println("+++ config: " + config)
log.info("+++ config: " + config)

require(config.op == "googleQueryAndParse" || config.op == "googleQueryOnly" || config.op == "parseOnly")

val system: ActorSystem = ActorSystem("System")
try {
if (config.op == "googleQueryAndParse" || config.op == "googleQueryOnly") {
val parseAddress = config.op == "googleQueryAndParse"
googleQueryAndParse(system, config.maxEntries, config.googleApiKey, config.maxGoogleAPIOpenRequests, config.maxGoogleAPIFatalErrors, parseAddress, config.dbUrl, config.tableName)
} else {
parseOnly(system, config.maxEntries, config.dbUrl, config.tableName)
}
println(">>> Press ENTER to exit <<<")
StdIn.readLine()
} finally {
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

val db = new DB(config.dbUrl, config.tableName)

def terminateSystem() = {
log.info("terminating actor system")
Http().shutdownAllConnectionPools()
system.terminate()
}

if (config.op == "googleQueryAndParse" || config.op == "googleQueryOnly") {
val parseAddress = config.op == "googleQueryAndParse"
googleQueryAndParse(parseAddress, config.maxEntries, config.googleApiKey, config.maxGoogleAPIRequestsPerSecond, config.maxGoogleAPIFatalErrors, db, terminateSystem)
} else {
parseOnly(system, config.maxEntries, db, terminateSystem)
}
case None => sys.exit(1)
}
}

def googleQueryAndParse(system: ActorSystem, maxEntries: Int, googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, parseAddress: Boolean, dbUrl: String, tableName: String) {
val conn = Utils.getDbConnection(dbUrl)
val unformattedAddresses: List[(Int, String)] = try {
DB.getAddressesWithEmptyGoogleResponseFromDatabase(tableName, maxEntries)(conn)
} finally { conn.close() }

println(s"num unformattedAddresses to query: ${unformattedAddresses.length}")

val db = system.actorOf(DB.props(dbUrl, tableName), "DB")
val addressParser = system.actorOf(AddressParserActor.props(db), "AddressParser")
val googleGeocoder = system.actorOf(GoogleGeocoder.props(googleApiKey, maxOpenRequests: Int, maxFatalErrors: Int, db, addressParser, parseAddress), "GoogleAPI")

unformattedAddresses.foreach { case (id, unformattedAddress) => googleGeocoder ! GoogleGeocoder.GeoCode(id, unformattedAddress) }
def googleQueryAndParse(
parseAddress: Boolean,
maxEntries: Int,
googleApiKey: String,
maxGoogleAPIRequestsPerSecond: Int,
maxGoogleAPIFatalErrors: Int,
db: DB,
finishedFn: () => Unit
)
(implicit actorSystem: ActorSystem, materialize: Materializer, executionContent: ExecutionContext) {

val googleApiResponse: Source[GoogleResponse, NotUsed] =
GoogleGeocoder.flow(db, googleApiKey, maxGoogleAPIRequestsPerSecond, maxGoogleAPIFatalErrors, maxEntries)

if(!parseAddress){
googleApiResponse
.runWith(db.saveGoogleResponse())
.foreach(_ => finishedFn())
} else {
googleApiResponse
.map(parse)
.runWith(db.saveAddressParsingResultSink())
.recover { case t: Throwable => log.error("googleQueryAndParse error", t); Done }
.foreach(_ => finishedFn())
}
}

def parseOnly(system: ActorSystem, maxEntries: Int, dbUrl: String, tableName: String) {
val conn = Utils.getDbConnection(dbUrl)
val googleResponses: List[(Int, String)] = try {
DB.getUnparsedGoogleResponsesFromDatabase(tableName, maxEntries)(conn)
} finally { conn.close() }

println(s"num googleResponses: ${googleResponses.length}")

val db = system.actorOf(DB.props(dbUrl, tableName), "DB")
val addressParser = system.actorOf(AddressParserActor.props(db), "AddressParser")
def parseOnly(
system: ActorSystem,
maxEntries: Int,
db: DB,
finishedFn: () => Unit
)
(implicit actorSystem: ActorSystem, materialize: Materializer, executionContent: ExecutionContext) {
db.getUnparsedGoogleResponsesFromDatabase(maxEntries)
.map(parse)
.runWith(db.saveAddressParsingResultSink())
.recover { case t: Throwable => log.error("parseOnly error", t); Done }
.foreach(_ => finishedFn())
}

googleResponses.foreach { case (id, googleResponse) => addressParser ! AddressParserActor.ParseAddress(id, googleResponse) }
def parse(googleResponse: GoogleResponse): AddressParsingResult = {
val result = Try(AddressParser.parseAddressFromJsonResponse(googleResponse.googleResponse))
AddressParsingResult(googleResponse, result)
}
}
Loading

0 comments on commit fb56426

Please sign in to comment.