Skip to content

Commit 22862bd

Browse files
Add v3.0-1.0.2
1 parent 8fc04bd commit 22862bd

File tree

12 files changed

+146
-14
lines changed

12 files changed

+146
-14
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ plugins {
1313
}
1414

1515
group = 'org.alvearie.hri.flink'
16-
version = '2.1-1.0.1'
16+
version = '3.0-1.0.2'
1717
description = """HRI Flink Pipeline Core Library"""
1818

1919
ext {
@@ -92,7 +92,7 @@ dependencies {
9292
// Dependencies that library users should include in their job
9393
// shadow jar
9494
// --------------------------------------------------------------
95-
implementation "org.alvearie.hri:hri-api-batch-notification:2.1-2.0.1"
95+
implementation "org.alvearie.hri:hri-api-batch-notification:3.0-2.0.1"
9696
implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}"
9797
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
9898
implementation "com.fasterxml.jackson.module:jackson-module-scala_${scalaBinaryVersion}:${jacksonVersion}"

src/main/scala/org/alvearie/hri/api/MgmtClient.scala

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ import com.fasterxml.jackson.databind.json.JsonMapper
1818
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
1919
import com.fasterxml.jackson.module.scala.DefaultScalaModule
2020
import org.alvearie.hri.flink.core.serialization.NotificationDeserializer
21+
import org.apache.http.ssl.SSLContexts
22+
2123
import java.util.Base64
2224
import java.util.ArrayList
2325
import org.slf4j.LoggerFactory
2426

27+
import java.io.{File, FileNotFoundException}
2528
import scala.util.{Failure, Success, Try}
2629

2730
class MgmtClient(val baseUri: String, val clientId: String, val clientSecret: String, val audience: String, val oauthServiceBaseUrl: String,
28-
val httpClient: CloseableHttpClient = HttpClients.createDefault()) extends Serializable with BatchLookup {
31+
val httpClient: CloseableHttpClient = MgmtClient.createHttpClient()) extends Serializable with BatchLookup {
2932
private val log = LoggerFactory.getLogger(this.getClass)
3033
log.info("Creating HRI MgmtClient for {}", baseUri)
3134

@@ -127,11 +130,11 @@ class MgmtClient(val baseUri: String, val clientId: String, val clientSecret: St
127130
response = httpClient.execute(request)
128131
response.getStatusLine.getStatusCode match {
129132
case HttpStatus.SC_OK =>
130-
log.info("MgmtApi action call successful")
133+
log.info("HRI MgmtApi action call successful")
131134
Success(entityMapper(response.getEntity))
132135
case status =>
133136
val msg = status.intValue + ": " + EntityUtils.toString(response.getEntity)
134-
log.info("MgmtApi action call failed: {}", msg)
137+
log.info("HRI MgmtApi action call failed: {}", msg)
135138
Failure(new RequestException(msg, status))
136139
}
137140
} catch {
@@ -159,4 +162,44 @@ object MgmtClient {
159162
val hriConsumerScope = "hri_consumer"
160163
val accessTokenField = "access_token"
161164
val audienceField = "audience"
165+
166+
val trustStoreEnv = "HRI_TRUSTSTORE"
167+
val trustStorePasswordEnv = "HRI_TRUSTSTORE_PASSWORD"
168+
169+
/**
170+
* If 'HRI_TRUSTSTORE' and 'HRI_TRUSTSTORE_PASSWORD' are set, constructs an Http client using the specified trust
171+
* store. If not set, creates a default Http client.
172+
* If unable to load the trust store or create the client, an Exception is thrown.
173+
* @return a Http client
174+
*/
175+
def createHttpClient() : CloseableHttpClient = {
176+
val log = LoggerFactory.getLogger(this.getClass)
177+
val trustStorePath = System.getenv(trustStoreEnv)
178+
val password = System.getenv(trustStorePasswordEnv)
179+
180+
if( trustStorePath == null || trustStorePath.isEmpty ) {
181+
log.info("HRI_TRUSTSTORE is not set, so creating default Http client")
182+
return HttpClients.createDefault()
183+
} else if ( password == null || password.isEmpty ) {
184+
val msg = trustStoreEnv + " is set, but " + trustStorePasswordEnv + " is not. Both must be empty or set."
185+
log.error(msg)
186+
throw new IllegalArgumentException(msg)
187+
}
188+
log.info("Creating Http client with trust store {}", trustStorePath)
189+
190+
val trustStoreFile = new File(trustStorePath);
191+
if (!trustStoreFile.exists() || !trustStoreFile.isFile) {
192+
val msg = "Not found or not a file: " + trustStoreFile.getPath
193+
log.error(msg);
194+
throw new FileNotFoundException(msg);
195+
}
196+
197+
val sslContext = SSLContexts.custom
198+
.loadTrustMaterial(trustStoreFile, password.toCharArray)
199+
.build
200+
201+
return HttpClients.custom
202+
.setSSLContext(sslContext)
203+
.build
204+
}
162205
}

src/main/scala/org/alvearie/hri/flink/core/BaseValidationJob.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ class BaseValidationJob(
272272
// used for functional testing of the Validation Jobs without the HRI Management API.
273273
def getRecordCountSink(props: Properties): SinkFunction[NotificationRecord] = {
274274
if (useMgmtApi) {
275-
log.info("Creating MgmtApiSink({}) for Tracker output", mgmtApiUrl)
275+
log.info("Creating HRI MgmtApiSink({}) for Tracker output", mgmtApiUrl)
276276
return new MgmtApiSink(tenantId, mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl)
277277
} else {
278278
log.info("Creating KafkaProducer({}) for Tracker output", notificationTopic)

src/main/scala/org/alvearie/hri/flink/core/MgmtApiSink.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,14 @@ class MgmtApiSink(val tenantId: String, val mgmtApiUrl: String, val mgmtClientId
3131
@transient lazy val mgmtClient: MgmtClient = createMgmtClient()
3232

3333
// This enables overriding for testing
34-
def createMgmtClient(): MgmtClient = new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl)
34+
def createMgmtClient(): MgmtClient = {
35+
try {
36+
new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl)
37+
} catch {
38+
case ex: Throwable =>
39+
throw new FlinkException(ex) // can't recover from this
40+
}
41+
}
3542

3643
override def invoke(record: NotificationRecord, context: Context[_]): Unit = {
3744
val batch = record.value

src/main/scala/org/alvearie/hri/flink/core/ValidationProcessFunction.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class ValidationProcessFunction(
6262
def createMgmtClient(): BatchLookup = new MgmtClient(mgmtApiUrl, mgmtClientId, mgmtClientSecret, mgmtClientAudience, oauthServiceBaseUrl)
6363

6464
/**
65-
* constructor for testing purposes without a MgmtApiClient
65+
* constructor for testing purposes without a HRI MgmtApiClient
6666
*/
6767
def this(notificationDescriptor: MapStateDescriptor[String, BatchNotification],
6868
invalidOutputTag: OutputTag[InvalidRecord],
@@ -204,7 +204,7 @@ class ValidationProcessFunction(
204204
}
205205
}
206206
case Failure(e) =>
207-
log.error("unexpected exception from mgmtClient", e)
207+
log.error("unexpected exception from HRI mgmtClient", e)
208208
throw new FlinkException(e)
209209
}
210210
}

src/test/resources/truststore.jks

3.82 KB
Binary file not shown.

src/test/scala/org/alvearie/hri/api/MgmtClientTest.scala

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ package org.alvearie.hri.api
99
import java.time.temporal.ChronoUnit
1010
import java.time.{OffsetDateTime, ZoneOffset}
1111
import java.util.Base64
12-
1312
import org.apache.http.{Header, HttpStatus, HttpVersion, ProtocolVersion}
1413
import org.apache.http.message.BasicHttpResponse
1514
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpPut, HttpUriRequest}
@@ -19,11 +18,13 @@ import org.apache.http.util.EntityUtils
1918
import com.fasterxml.jackson.databind.DeserializationFeature
2019
import com.fasterxml.jackson.databind.json.JsonMapper
2120
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
21+
import org.alvearie.hri.flink.core.TestHelper
2222
import org.mockito.ArgumentMatcher
2323
import org.mockito.scalatest.MockitoSugar
2424
import org.scalatest.funsuite.AnyFunSuite
2525
import org.scalatest.matchers.should.Matchers._
2626

27+
import java.io.{FileNotFoundException, IOException}
2728
import scala.util.{Failure, Success}
2829

2930
class MgmtClientTest extends AnyFunSuite with MockitoSugar{
@@ -41,6 +42,9 @@ class MgmtClientTest extends AnyFunSuite with MockitoSugar{
4142
private val audience = "myAudience"
4243
private val expectedTokenRequestParams = Array("grant_type=client_credentials", "scope=", s"${MgmtClient.hriInternalScope}", s"${MgmtClient.hriConsumerScope}", s"tenant_$tenantId", s"audience=$audience")
4344

45+
private val trustStorePath = "src/test/resources/truststore.jks"
46+
private val trustStorePassword = "test_password"
47+
4448
class RequestMatcherPut(uri: String, bodyElements: Seq[String]) extends ArgumentMatcher[HttpUriRequest] {
4549
override def matches(request: HttpUriRequest): Boolean = {
4650
if (request.getMethod != "PUT") return false
@@ -362,6 +366,61 @@ class MgmtClientTest extends AnyFunSuite with MockitoSugar{
362366
}
363367
}
364368

369+
test("It should return a custom Http client when environment variables are set") {
370+
TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath)
371+
TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, trustStorePassword)
372+
373+
val client = MgmtClient.createHttpClient()
374+
assert(client != null)
375+
// there isn't a way to inspect the client's configuration to ensure the trust store was added
376+
377+
/**
378+
* This is for manual testing against an actual instance running on Kubernetes
379+
* 1. Copy the Kubernetes ca.crt from any pod at /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
380+
* 2. Import it into src/test/resources/k-truststore.jks with
381+
keytool -import -file ca.crt -keystore src/test/resources/k-truststore.jks -storepass test_password -alias kubernetes_ca
382+
* 3. Edit /etc/hosts and append `127.0.0.1 hri-mgmt-api`
383+
* 4. Then uncomment the lines below and run the test
384+
*/
385+
//val response = client.execute(new HttpGet("https://hri-mgmt-api:1323/hri/healthcheck"))
386+
//System.out.println(response.toString)
387+
388+
// reset environment
389+
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
390+
TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv)
391+
}
392+
393+
test("It should throw an IllegalArgumentException when the trust store password variable is not set") {
394+
TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath)
395+
396+
assertThrows[IllegalArgumentException](MgmtClient.createHttpClient())
397+
398+
// reset environment
399+
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
400+
}
401+
402+
test("It should throw an IOException when the trust store path is wrong") {
403+
TestHelper.setEnv(MgmtClient.trustStoreEnv, "bad/path/to/truststore.jks")
404+
TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, trustStorePassword)
405+
406+
assertThrows[FileNotFoundException](MgmtClient.createHttpClient())
407+
408+
// reset environment
409+
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
410+
TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv)
411+
}
412+
413+
test("It should throw an IOException when the trust store password variable is wrong") {
414+
TestHelper.setEnv(MgmtClient.trustStoreEnv, trustStorePath)
415+
TestHelper.setEnv(MgmtClient.trustStorePasswordEnv, "wrong_password")
416+
417+
assertThrows[IOException](MgmtClient.createHttpClient())
418+
419+
// reset environment
420+
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
421+
TestHelper.removeEnv(MgmtClient.trustStorePasswordEnv)
422+
}
423+
365424
}
366425

367426
private class FakeHttpResponse(ver: ProtocolVersion, code: Integer, reason: String) extends BasicHttpResponse(ver, code, reason) with CloseableHttpResponse {

src/test/scala/org/alvearie/hri/flink/core/MgmtApiSinkTest.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package org.alvearie.hri.flink.core
88

99
import java.time.OffsetDateTime
1010
import java.util.concurrent.TimeUnit
11-
1211
import org.alvearie.hri.api.BatchNotification.Status
1312
import org.alvearie.hri.api.{BatchNotification, MgmtClient, RequestException}
1413
import org.alvearie.hri.flink.core.serialization.NotificationRecord
@@ -167,6 +166,15 @@ class MgmtApiSinkTest extends AnyFunSuite with MockitoSugar{
167166
exMsg should equal("Call to HRI Management API failed: Unauthorized.")
168167
}
169168

169+
test("it should throw a FlinkException when environment variables are incorrect") {
170+
TestHelper.setEnv(MgmtClient.trustStoreEnv, "bad/path/to/truststore.jks")
171+
172+
val sink = new MgmtApiSink("tenant", "https://hri-mgmt-api", "client_id", "password", "audience", "https://oauth")
173+
assertThrows[FlinkException](sink.createMgmtClient())
174+
175+
TestHelper.removeEnv(MgmtClient.trustStoreEnv)
176+
}
177+
170178
def createTestBatchNotification(status: BatchNotification.Status, actualRecordCount: Int, invalidRecordCount: Int, failMsg: String): BatchNotification = {
171179
new BatchNotification()
172180
.withId(batchId)

src/test/scala/org/alvearie/hri/flink/core/TestHelper.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,20 @@ object TestHelper {
8888
buffer.getLong
8989
}
9090

91+
// This sets an OS environment variable using reflection, which isn't normally allowed
92+
def setEnv(name: String, value: String): Unit = {
93+
val env = System.getenv
94+
val field = env.getClass.getDeclaredField("m")
95+
field.setAccessible(true)
96+
field.get(env).asInstanceOf[java.util.Map[String,String]].put(name, value)
97+
}
98+
99+
// This removes an OS environment variable using reflection, which isn't normally allowed
100+
def removeEnv(name: String): Unit = {
101+
val env = System.getenv
102+
val field = env.getClass.getDeclaredField("m")
103+
field.setAccessible(true)
104+
field.get(env).asInstanceOf[java.util.Map[String,String]].remove(name)
105+
}
106+
91107
}

src/test/scala/org/alvearie/hri/flink/core/ValidationProcessFunctionTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class ValidationProcessFunctionTest extends AnyFunSuite with MockitoSugar {
157157
verify(mockClient, times(1)).getBatchId(tenantId, DefaultTestBatchId)
158158
}
159159

160-
test("processElement should call getBatch and unexpected error should be thrown from mgmtApi") {
160+
test("processElement should call getBatch and unexpected error should be thrown from HRI mgmtApi") {
161161
val mockClient = mock[MgmtClient]
162162

163163
val validator = new TestValidationProcessFunction(

0 commit comments

Comments
 (0)