Skip to content

Commit 92a39c3

Browse files
Merge pull request #19 from Alvearie/release-3.2.0
Release 3.2.0
2 parents 031d380 + 820ded8 commit 92a39c3

File tree

7 files changed

+50
-31
lines changed

7 files changed

+50
-31
lines changed

.github/workflows/ci-workflow.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
uses: actions/setup-java@v2
2929
with:
3030
java-version: '8'
31-
distribution: 'adopt'
31+
distribution: 'temurin'
3232
cache: 'gradle'
3333

3434
- name: Set GitHub Tag environment variable

.github/workflows/code-scans.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ jobs:
2222
# full checkout for SonarCloud analysis
2323
fetch-depth: 0
2424

25-
- name: Set up Java 11
25+
- name: Set up Java jdk11
2626
uses: actions/setup-java@v2
2727
with:
2828
java-version: '11'
29-
distribution: 'adopt'
29+
distribution: 'temurin'
3030
cache: 'gradle'
3131

3232
- name: SonarCloud Scan

.github/workflows/pull_request.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ jobs:
1515
# run if the PR was opened or there was a comment containing '/pr_checks'
1616
if: (github.event_name == 'pull_request') || (github.event_name == 'pull_request_review' && contains(github.event.review.body, '/pr_checks'))
1717
runs-on: ubuntu-latest
18-
env:
19-
RESOURCE_GROUP: hri-dev1-wdc-kube
2018

2119
steps:
2220
- name: Set branch name environment variable
@@ -28,7 +26,7 @@ jobs:
2826
uses: actions/setup-java@v2
2927
with:
3028
java-version: '8'
31-
distribution: 'adopt-hotspot'
29+
distribution: 'temurin'
3230
cache: 'gradle'
3331

3432
- name: Gradle build

build.gradle

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ plugins {
2424
}
2525

2626
group = 'org.alvearie.hri.flink'
27-
version = '3.1-1.0.3' // Update version number in releases
27+
version = '3.2-1.1.0' // Update version number in releases
2828
description = """HRI Flink Pipeline Core Library"""
2929

3030
ext {
3131
javaVersion = '1.8'
32-
flinkVersion = '1.10.3'
32+
flinkVersion = '1.14.0'
3333
scalaBinaryVersion = '2.12'
3434
scalaVersion = '2.12.11'
3535
scalaTestVersion = '3.1.1'
@@ -121,12 +121,6 @@ ossIndexAudit {
121121
'32bccbbb-bd3f-44f3-a530-5827a93d74db', '2418a058-dfb7-4303-b1d1-ac5262d9e499', '1be5c4d8-4994-4dd6-92b1-ca53ac7e6a17', 'f984e75c-06bf-43dc-86db-3f8dfe82f430',
122122
'bd0f0dd9-1356-4dec-a8a3-edc777cc92d5', 'b70e447a-06a1-48b0-a02b-cd5b78862777',
123123

124-
//pkg:maven/org.apache.commons/commons-compress@1.18 (from org.apache.flink:flink-scala_2.12:1.10.3)
125-
//NOTE: these vulnerabilities cannot be eliminated UNTIL we upgrade our Flink version to Flink 1.14.0
126-
//Need to get Security Exception until that happens
127-
'68232267-bb25-4b04-8dec-caf7c11c7293', '69b8043a-3002-48fa-9762-8f6040d83de1', '4102317d-8250-465e-a46d-179d42792b14',
128-
'7a6a9dd2-67de-4e2a-b406-7aa4a4ce29cc', '8ea14e38-e6cc-48d9-bfe4-ec89f93596e7',
129-
130124
//pkg:maven/org.apache.logging.log4j/log4j-core@2.11.2 (from org.scala-sbt:zinc_2.12:1.3.5 - Used by gradle scala plugin only)
131125
'd3477f9c-032a-44a7-a5e1-02ae35e4737c',
132126

@@ -178,7 +172,7 @@ dependencies {
178172
// Dependencies that library users should include in their job
179173
// shadow jar
180174
// --------------------------------------------------------------
181-
implementation "org.alvearie.hri:hri-api-batch-notification:3.1-2.0.2"
175+
implementation "org.alvearie.hri:hri-api-batch-notification:3.2-2.0.3"
182176
implementation "org.apache.flink:flink-connector-kafka_${scalaBinaryVersion}:${flinkVersion}"
183177
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
184178
implementation "com.fasterxml.jackson.module:jackson-module-scala_${scalaBinaryVersion}:${jacksonVersion}"
@@ -193,7 +187,7 @@ dependencies {
193187
testImplementation "org.mockito:mockito-scala-scalatest_${scalaBinaryVersion}:1.14.4"
194188
testImplementation "org.apache.flink:flink-tests:${flinkVersion}:tests"
195189
testImplementation "org.apache.flink:flink-test-utils_${scalaBinaryVersion}:${flinkVersion}"
196-
testImplementation "org.apache.flink:flink-runtime_${scalaBinaryVersion}:${flinkVersion}:tests"
190+
testImplementation "org.apache.flink:flink-runtime:${flinkVersion}:tests"
197191
testImplementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}:tests"
198192

199193

src/main/scala/org/alvearie/hri/flink/core/MgmtApiSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class MgmtApiSink(val tenantId: String, val mgmtApiUrl: String, val mgmtClientId
4040
}
4141
}
4242

43-
override def invoke(record: NotificationRecord, context: Context[_]): Unit = {
43+
override def invoke(record: NotificationRecord, context: Context): Unit = {
4444
val batch = record.value
4545
log.info("Received Batch {} with status of {}", batch.getId: Any, batch.getStatus: Any)
4646
batch.getStatus match {

src/test/scala/org/alvearie/hri/flink/core/serialization/HriRecordDeserializerTest.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class HriRecordDeserializerTest extends AnyFunSuite {
6060

6161
test("Null headers should be preserved") {
6262
val deserializer = new HriRecordDeserializer
63-
val consumerRecord = createConsumerRecord(null, FakeKey, FakeBody)
63+
val consumerRecord = createConsumerRecordWithoutHeaders(FakeKey, FakeBody)
6464

6565
val record = deserializer.deserialize(consumerRecord)
6666
val actualHeaders = record.headers
@@ -70,7 +70,7 @@ class HriRecordDeserializerTest extends AnyFunSuite {
7070
val actualPartition = record.partition
7171
val actualOffset = record.offset
7272

73-
actualHeaders should be(null)
73+
actualHeaders.toArray shouldBe empty
7474
actualKey should equal(ExpectedKey)
7575
actualBody should equal(ExpectedBody)
7676
actualTopic should equal(TopicName)
@@ -80,7 +80,7 @@ class HriRecordDeserializerTest extends AnyFunSuite {
8080

8181
test("Null key should be preserved") {
8282
val deserializer = new HriRecordDeserializer
83-
val consumerRecord = createConsumerRecord(null, null, FakeBody)
83+
val consumerRecord = createConsumerRecordWithoutHeaders(null, FakeBody)
8484

8585
val record = deserializer.deserialize(consumerRecord)
8686
val actualHeaders = record.headers
@@ -90,7 +90,7 @@ class HriRecordDeserializerTest extends AnyFunSuite {
9090
val actualPartition = record.partition
9191
val actualOffset = record.offset
9292

93-
actualHeaders should be(null)
93+
actualHeaders.toArray shouldBe empty
9494
actualKey should be(null)
9595
actualBody should equal(ExpectedBody)
9696
actualTopic should equal(TopicName)
@@ -100,7 +100,7 @@ class HriRecordDeserializerTest extends AnyFunSuite {
100100

101101
test("Null body should be preserved") {
102102
val deserializer = new HriRecordDeserializer
103-
val consumerRecord = createConsumerRecord(null, null, null)
103+
val consumerRecord = createConsumerRecordWithoutHeaders(null, null)
104104

105105
val record = deserializer.deserialize(consumerRecord)
106106
val actualHeaders = record.headers
@@ -110,7 +110,7 @@ class HriRecordDeserializerTest extends AnyFunSuite {
110110
val actualPartition = record.partition
111111
val actualOffset = record.offset
112112

113-
actualHeaders should be(null)
113+
actualHeaders.toArray shouldBe empty
114114
actualKey should be(null)
115115
actualBody should be(null)
116116
actualTopic should equal(TopicName)
@@ -170,4 +170,18 @@ class HriRecordDeserializerTest extends AnyFunSuite {
170170
)
171171
}
172172

173+
def createConsumerRecordWithoutHeaders(key: Array[Byte], value: Array[Byte]): ConsumerRecord[Array[Byte], Array[Byte]] = {
174+
new ConsumerRecord[Array[Byte], Array[Byte]](
175+
TopicName,
176+
Partition,
177+
Offset,
178+
ConsumerRecord.NO_TIMESTAMP,
179+
TimestampType.NO_TIMESTAMP_TYPE,
180+
ConsumerRecord.NULL_CHECKSUM.toLong,
181+
ConsumerRecord.NULL_SIZE,
182+
ConsumerRecord.NULL_SIZE,
183+
key,
184+
value,
185+
)
186+
}
173187
}

src/test/scala/org/alvearie/hri/flink/core/serialization/NotificationDeserializerTest.scala

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,42 +67,42 @@ class NotificationDeserializerTest extends AnyFunSuite {
6767

6868
test("Null headers should be preserved") {
6969
val deserializer = new NotificationDeserializer()
70-
val consumerRecord = createConsumerRecord(null, FakeKey, FakeNotification)
70+
val consumerRecord = createConsumerRecordWithoutHeaders(FakeKey, FakeNotification)
7171

7272
val record = deserializer.deserialize(consumerRecord)
7373
val actualHeaders = record.headers
7474
val actualKey = new String(record.key, StandardCharsets.UTF_8)
7575
val actualNotification = record.value
7676

77-
actualHeaders should be(null)
77+
actualHeaders.toArray shouldBe empty
7878
actualKey should equal(ExpectedKey)
7979
actualNotification should equal(ExpectedNotification)
8080
}
8181

8282
test("Null key should be preserved") {
8383
val deserializer = new NotificationDeserializer()
84-
val consumerRecord = createConsumerRecord(null, null, FakeNotification)
84+
val consumerRecord = createConsumerRecordWithoutHeaders(null, FakeNotification)
8585

8686
val record = deserializer.deserialize(consumerRecord)
8787
val actualHeaders = record.headers
8888
val actualKey = record.key
8989
val actualNotification = record.value
9090

91-
actualHeaders should be(null)
91+
actualHeaders.toArray shouldBe empty
9292
actualKey should be(null)
9393
actualNotification should equal(ExpectedNotification)
9494
}
9595

9696
test("Null body should be preserved") {
9797
val deserializer = new NotificationDeserializer()
98-
val consumerRecord = createConsumerRecord(null, null, null)
98+
val consumerRecord = createConsumerRecordWithoutHeaders(null, null)
9999

100100
val record = deserializer.deserialize(consumerRecord)
101101
val actualHeaders = record.headers
102102
val actualKey = record.key
103103
val actualNotification = record.value
104104

105-
actualHeaders should be(null)
105+
actualHeaders.toArray shouldBe empty
106106
actualKey should be(null)
107107
actualNotification should equal(null)
108108
}
@@ -138,7 +138,6 @@ class NotificationDeserializerTest extends AnyFunSuite {
138138
headers
139139
}
140140

141-
142141
def createConsumerRecord(headers: Headers, key: Array[Byte], value: Array[Byte]): ConsumerRecord[Array[Byte], Array[Byte]] = {
143142
new ConsumerRecord[Array[Byte], Array[Byte]](
144143
TopicName,
@@ -155,6 +154,21 @@ class NotificationDeserializerTest extends AnyFunSuite {
155154
)
156155
}
157156

157+
def createConsumerRecordWithoutHeaders(key: Array[Byte], value: Array[Byte]): ConsumerRecord[Array[Byte], Array[Byte]] = {
158+
new ConsumerRecord[Array[Byte], Array[Byte]](
159+
TopicName,
160+
Partition,
161+
Offset,
162+
ConsumerRecord.NO_TIMESTAMP,
163+
TimestampType.NO_TIMESTAMP_TYPE,
164+
ConsumerRecord.NULL_CHECKSUM.toLong,
165+
ConsumerRecord.NULL_SIZE,
166+
ConsumerRecord.NULL_SIZE,
167+
key,
168+
value,
169+
)
170+
}
171+
158172
def createNotification(): BatchNotification = {
159173
new BatchNotification()
160174
.withId("id")
@@ -170,5 +184,4 @@ class NotificationDeserializerTest extends AnyFunSuite {
170184
.withInvalidThreshold(10)
171185
.withFailureMessage("")
172186
}
173-
174187
}

0 commit comments

Comments
 (0)