Into ReaderT-verse
To keep this article reletively small, lets scope our problem smaller
The Program
I'll start with a simple deja vu program.
val sourceBucketName = "source-bucket"
val fileName = "fileA"
val targetBucketName = "target-bucket"
val awsClient = AmazonS3ClientBuilder.standard().build()
def pureBusinessProcess(content: String): String = ???
def program = {
val content = awsClient.getObject(sourceBucketName, fileName)
val result = pureBusinessProcess(content)
awsClient.putObject(targetBucketName, fileName)
}
It's pretty simple program:
- fetch a file from s3 bucket A
- process the content according to what business you have
- put the result into a file in bucket B
The Problem
Such program looks pretty clean and readable but implementation like this is never good enough for production because it lack of:
- Error handling
- Testability
- Logging
If we add those 3 factors, the program
will not clean and readable anymore
val awsClient = AmazonS3ClientBuilder.standard().build()
def pureBusinessProcess(content: String): String = ???
val logger = org.log4s.getLogger
def program(client: AmazonS3) = {
val content = try {
val content = try {
awsClient.getObject(sourceBucketName, fileName)
} catch {
case e: Throwable =>
logger.error("error fetching file from S3")
throw e
}
val result = pureBusinessProcess(content)
try {
awsClient.putObject(targetBucketName, fileName)
} catch {
case e: Throwable =>
logger.error("error puting file to S3")
throw e
}
} catch {
case e: Throwable => logger.error(s"error processing $fileName from $sourceBucketName: ${e.getMessage}")
}
}
So, we factor the effect client: AmazonS3
as parameter to achieve little improvement of testability, however,
this is an absolutly crap, the core process of the program is totally lost in the ocean of error handling and logging.
A better version with ReaderT
With ReaderT, we will get a better version of:
- DI Dependency Injection
- Error Handling thanks to MonadError
Instead of parameterize client
, we change the return type of
program
to ReaderT[IO, AmazonS3, Unit]
def program: ReaderT[IO, AmazonS3, Unit] = for {
content <- Kleisli(client => IO(client.getObject(sourceBucketName, fileName)))
.onError{case e:Throwable => IO(logger.error("error fetching file from S3: ${e.getMessage}"))}
result = pureBusinessProcess(content)
_ <- Kleisli(client => IO(client.putObject(targetBucketName, fileName)))
.onError{case e: Throwable => IO(logger.error("error puting file to S3"))}
} yield ()
Ok it's little bit better though I don't think it's very readable.
Now it's the best feature of functional programming, ReaderT is pure so you can put that "not so readable thing" anywhere and give it a reasonable name.
val fetchContent = Kleisli(client => IO(client.getObject(sourceBucketName, fileName)))
.onError{case e:Throwable => IO(logger.error("error fetching file from S3: ${e.getMessage}"))}
def putContent(content: String) = Kleisli(client => IO(client.putObject(targetBucketName, fileName, content)))
.onError{case e: Throwable => IO(logger.error("error puting file to S3: ${e.getMessage}"))}
def program: ReaderT[IO, AmazonS3, PutObjectResult] = for {
content <- fetchContent
result = pureBusinessProcess(content)
_ <- putContent(result)
} yield ()
Fine, that's better at readability, but testing journey won't be much difference, I still need to mock the client
and stub getObject
and putObject
.
val fakeClient = mock[AmazonS3]
val res = mock[PutObjectResult]
fakeClient.getObject(sourceBucketName, fileName) returns "some content"
fakeClient.putObject(targetBucketName, fileName, "processed") returns res
program.run(fakeClient).unsafeRunSync must_== res
Not bad but not good either, we can avoid mocking and stubing by abstract another layer, just like 3 Layer Scala Cake
Layer 2
trait Interpreter[F[_]] {
def getObject(bucketName: String, fileName: String): F[String]
def putObject(bucketName: String, fileName: String, content: String): F[PutObjectResult]
}
Layer 3
def program: ReaderT[IO, Interpreter[IO], PutObjectResult] = for {
content <- Kleisli(_.getObject(sourceBucketName, fileName))
result = pureBusinessProcess(content)
_ <- Kleisli(_.putObject(targetBucketName, fileName, result))
} yield ()
A Kleisli
before _.getObject...
barely sacrifice our readability but the benifit we get is that we can
swap Layer 2 with abitrary interpreter in test
val res = mock[PutObjectResult]
program.run(new Interpreter[IO] {
def getObject(bucketName: String, fileName: String) = IO("some content")
def putObject(bucketName: String, fileName: String, content: String) = res
}).unsafeRunSync() must_== res
ReaderT-verse
Finally we just need to refactor a bit and it's production ready:
- Readable
- Testable by swaping out Layer 2
- Better Error Handling thanks to MonadError
- Logging
- Extensible Same as Tagless Final
- Composable Same as Tagless Final
def program: ReaderT[IO, Interpreter[IO], PutObjectResult] = for {
env <- Kleisli(_.getEnv)
content <- Kleisli(_.getObject(env.source, env.fileName))
.onError{case e:Throwable => IO(logger.error(s"error fetching file from S3: ${e.getMessage}"))}
result = pureBusinessProcess(content)
resp <- Kleisli(_.putObject(env.target, env.fileName, result))
.onError{case e: Throwable => IO(logger.error(s"error puting file to S3: ${e.getMessage}"))}
} yield resp
case class Env(source: String, target: String, fileName: String)
val interpreter = new Interpreter[IO] {
lazy val awsClient = AmazonS3ClientBuilder.standard().build()
def getEnv = IO(Env("sourceBucket", "targetBucket", "fileA"))
def getObject(bucketName: String, fileName: String) =
IO(awsClient.getObject(bucketName, fileName))
def putObject(bucketName: String, fileName: String, content: String) =
IO(awsClient.putObject(bucketName, fileName, content))
}
program.run(interpreter).unsafeRunSync()