Skip to content

Commit 57ca711

Browse files
authored
2.8.0 fixes (#53)
* Update README * Add missing JSON_REQUIRES * Deprecate legacy avro * Update version to 2.8.0-2 * Always archive results in github actions * Explicitly import serializers * Temp debug imports * Add googleapis requirement * Upgrade packages during the pip install * Upgrade github workflow Module referencing is missing if not force installed. Verify later. * Revert "Temp debug imports" This reverts commit ba739c7.
1 parent 2e4075b commit 57ca711

File tree

12 files changed

+34
-191
lines changed

12 files changed

+34
-191
lines changed

.github/workflows/main.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ jobs:
2727

2828
- name: Spin up kafka
2929
run: cd examples && docker compose up -d && cd ..
30+
- name: Upgrade pip, setuptools, and wheel
31+
run: python3 -m pip install --upgrade pip setuptools wheel
3032
- name: Install python requirements
31-
run: pip install .[all]
33+
run: pip install --force-reinstall .[all]
34+
- name: Check for broken dependencies
35+
run: pip check
3236
- name: Wait for services
3337
run: while [ -n "$(docker container ls -a | grep starting)" ]; do sleep 2; done;
3438
- name: Docker inspect
@@ -37,8 +41,8 @@ jobs:
3741
run: python3 --version
3842
- name: Execute tests
3943
run: python3 -m robot -d ./docs examples/
40-
continue-on-error: true
4144
- name: Archive test log
45+
if: ${{ always() }}
4246
uses: actions/upload-artifact@v4
4347
with:
4448
name: log.html

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ pip install robotframework-confluentkafkalibrary
2424

2525
Extra packages:
2626
* [avro] = ['fastavro >= 1.3.2', 'avro >= 1.11.1']
27-
* [legacyavro] = ['fastavro >= 1.3.2', 'avro-python3 >= 1.10.1']
2827
* [json] = ['jsonschema >= 3.2.0']
2928
* [protobuf] = ['protobuf >= 4.22.0']
30-
29+
* [schemaregistry] = ['httpx>=0.26', 'cachetools >= 5.5.0', 'attrs >= 24.3.0']
3130
To install all dependencies use `[all]` extension like:
3231

3332
```

examples/schema/avro/KeySchema.avsc

Lines changed: 0 additions & 11 deletions
This file was deleted.

examples/schema/avro/ValueSchema.avsc

Lines changed: 0 additions & 18 deletions
This file was deleted.

examples/test_avro.robot

Lines changed: 6 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,10 @@ Library Collections
44
Library String
55

66
Suite Setup Starting Test
7-
Suite Teardown Stop Thread
87

98

109
*** Test Cases ***
11-
Avro Producer With Schemas As String Argument
12-
[Setup] Clear Messages From Thread ${MAIN_THREAD}
13-
${value_schema}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"},{"name": "number","type": ["int","null"]}]}
14-
${key_schema}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"}]}
15-
${producer_id}= Create Producer schema_registry_url=http://127.0.0.1:8081
16-
... value_schema=${value_schema} key_schema=${key_schema}
17-
${value}= Create Dictionary name=Robot number=${10}
18-
Produce group_id=${producer_id} topic=avro_testing1 partition=${0} value=${value} key=${KEY_FOR_SCHEMA}
19-
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
20-
Sleep 1s
21-
22-
${consumer_group_id}= Create Consumer auto_offset_reset=earliest schema_registry_url=http://127.0.0.1:8081
23-
Subscribe Topic group_id=${consumer_group_id} topics=avro_testing1
24-
${messages}= Poll group_id=${consumer_group_id}
25-
Should Be Equal ${TEST_DATA} ${messages}
26-
${thread_messages}= Get Messages From Thread ${MAIN_THREAD}
27-
Should Be Equal ${TEST_DATA} ${thread_messages}
28-
[Teardown] Basic Teardown ${consumer_group_id}
29-
30-
Avro Producer With Path To Schemas
31-
[Setup] Clear Messages From Thread ${MAIN_THREAD}
32-
${value_schema_file_path}= Set Variable examples/schema/avro/ValueSchema.avsc
33-
${key_schema_file_path}= Set Variable examples/schema/avro/KeySchema.avsc
34-
${producer_id}= Create Producer schema_registry_url=http://127.0.0.1:8081
35-
... value_schema=${value_schema_file_path} key_schema=${key_schema_file_path}
36-
${value}= Create Dictionary name=Robot number=${10}
37-
Produce group_id=${producer_id} topic=avro_testing2 partition=${0} value=${value} key=${KEY_FOR_SCHEMA}
38-
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
39-
Sleep 1s
40-
41-
${consumer_group_id}= Create Consumer auto_offset_reset=earliest schema_registry_url=http://127.0.0.1:8081
42-
Subscribe Topic group_id=${consumer_group_id} topics=avro_testing2
43-
${messages}= Poll group_id=${consumer_group_id}
44-
Should Be Equal ${TEST_DATA} ${messages}
45-
${thread_messages}= Get Messages From Thread ${MAIN_THREAD}
46-
Should Be Equal ${TEST_DATA} ${thread_messages}
47-
[Teardown] Basic Teardown ${consumer_group_id}
48-
4910
Avro Producer Consumer With Serializers
50-
[Setup] Clear Messages From Thread ${MAIN_THREAD}
5111
${schema_registry_conf}= Create Dictionary url=http://127.0.0.1:8081
5212
${schema_registry_client}= Get Schema Registry Client ${schema_registry_conf}
5313
${schema_str}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"},{"name": "number","type": ["int","null"]}]}
@@ -56,16 +16,16 @@ Avro Producer Consumer With Serializers
5616
${string_serializer}= Get String Serializer
5717
${string_deserializer}= Get String Deserializer
5818

59-
${producer_id}= Create Producer key_serializer=${string_serializer} value_serializer=${avro_serializer} legacy=${False}
19+
${producer_id}= Create Producer key_serializer=${string_serializer} value_serializer=${avro_serializer} serializing=${True}
6020
${value}= Create Dictionary name=Robot number=${10}
61-
Produce group_id=${producer_id} topic=avro_testing3 partition=${0} value=${value} key=568a68fd-2785-44cc-8997-1295c3755d28
21+
Produce group_id=${producer_id} topic=avro_testing1 partition=${0} value=${value} key=${KEY}
6222
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
6323

64-
${consumer_group_id}= Create Consumer auto_offset_reset=latest key_deserializer=${string_deserializer} value_deserializer=${avro_deserializer} legacy=${False}
65-
Subscribe Topic group_id=${consumer_group_id} topics=avro_testing3
24+
${consumer_group_id}= Create Consumer auto_offset_reset=latest key_deserializer=${string_deserializer} value_deserializer=${avro_deserializer} deserializing=${True}
25+
Subscribe Topic group_id=${consumer_group_id} topics=avro_testing1
6626
Poll group_id=${consumer_group_id} # Dummy poll when using offset reset 'latest'
6727

68-
Produce group_id=${producer_id} topic=avro_testing3 value=${value} partition=${0} key=568a68fd-2785-44cc-8997-1295c3755d28
28+
Produce group_id=${producer_id} topic=avro_testing1 value=${value} partition=${0} key=${KEY}
6929
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
7030
${messages}= Poll group_id=${consumer_group_id}
7131
Should Be Equal ${messages} ${TEST_DATA}
@@ -74,28 +34,13 @@ Avro Producer Consumer With Serializers
7434

7535
*** Keywords ***
7636
Starting Test
77-
Set Suite Variable @{TEST_TOPIC} avro_testing1 avro_testing2 avro_testing3
37+
Set Suite Variable @{TEST_TOPIC} avro_testing1
7838
Set Suite Variable ${KEY} 568a68fd-2785-44cc-8997-1295c3755d28
79-
Set Suite Variable &{KEY_FOR_SCHEMA} name=testkey
8039

8140
${value}= Create Dictionary name=Robot number=${10}
8241
${data}= Create List ${value}
8342
Set Suite Variable ${TEST_DATA} ${data}
8443

85-
# Rewrite to support topic creation via AdminClient
86-
${value_schema}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"},{"name": "number","type": ["int","null"]}]}
87-
${key_schema}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"}]}
88-
${producer_id}= Create Producer schema_registry_url=http://127.0.0.1:8081
89-
... value_schema=${value_schema} key_schema=${key_schema}
90-
91-
Produce group_id=${producer_id} topic=avro_testing1 partition=${0} value=${value} key=${KEY_FOR_SCHEMA}
92-
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
93-
Produce group_id=${producer_id} topic=avro_testing2 partition=${0} value=${value} key=${KEY_FOR_SCHEMA}
94-
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
95-
96-
${thread}= Start Consumer Threaded topics=${TEST_TOPIC} schema_registry_url=http://127.0.0.1:8081 auto_offset_reset=latest
97-
Set Suite Variable ${MAIN_THREAD} ${thread}
98-
9944
All Messages Are Delivered
10045
[Arguments] ${producer_id}
10146
${count}= Flush ${producer_id}
@@ -109,8 +54,4 @@ Basic Teardown
10954
${groups}= Create List ${group_id}
11055
${admin_client_id}= Create Admin Client
11156
${resp}= Delete Groups ${admin_client_id} group_ids=${groups}
112-
Log ${resp}
113-
114-
Stop Thread
115-
${resp}= Stop Consumer Threaded ${MAIN_THREAD}
11657
Log ${resp}

examples/test_protobuf.robot

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ Protobuf Producer With Serializer
1010
${protobuf_serializer}= Get Protobuf Serializer ${msg_type} ${schema_registry_client}
1111
${protobuf_deserializer}= Get Protobuf Deserializer ${msg_type}
1212
${string_serializer}= Get String Serializer
13-
${string_deserializer}= Get String Deserializer
1413

15-
# On the Producer when legacy is set to True the value must be converted into a binary string
16-
${producer_id}= Create Producer key_serializer=${string_serializer} value_serializer=${protobuf_serializer} legacy=${True}
17-
${value}= Create User Robot 10 ${True}
14+
${producer_id}= Create Producer key_serializer=${string_serializer} value_serializer=${protobuf_serializer} serializing=${True}
15+
${value}= Create User Robot 10
1816
Produce group_id=${producer_id} topic=protobuf_testing1 key=bd232464-e3d3-425d-93b7-5789dc7273c1 value=${value}
1917
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
2018

@@ -27,12 +25,12 @@ Protobuf Producer Consumer With Serializer
2725
${string_serializer}= Get String Serializer
2826
${string_deserializer}= Get String Deserializer
2927

30-
${producer_id}= Create Producer key_serializer=${string_serializer} value_serializer=${protobuf_serializer} legacy=${False}
28+
${producer_id}= Create Producer key_serializer=${string_serializer} value_serializer=${protobuf_serializer} serializing=${True}
3129
${value}= Create User Robot 10
3230
Produce group_id=${producer_id} topic=protobuf_testing2 key=f01df0c6-ec0b-49e9-835f-d766a9e8036f value=${value}
3331
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
3432

35-
${consumer_group_id}= Create Consumer auto_offset_reset=earliest key_deserializer=${string_deserializer} value_deserializer=${protobuf_deserializer} legacy=${False}
33+
${consumer_group_id}= Create Consumer auto_offset_reset=earliest key_deserializer=${string_deserializer} value_deserializer=${protobuf_deserializer} deserializing=${True}
3634
Subscribe Topic group_id=${consumer_group_id} topics=protobuf_testing2
3735
${messages}= Poll group_id=${consumer_group_id}
3836
Length Should Be ${messages} 1

setup.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111
"""[1:-1]
1212

1313
AVRO_REQUIRES = ['fastavro >= 1.3.2', 'avro >= 1.11.1']
14-
LEGACYAVRO_REQUIRES = ['fastavro >= 1.3.2', 'avro-python3 >= 1.10.1']
15-
JSON_REQUIRES = ['jsonschema >= 3.2.0']
16-
PROTO_REQUIRES = ['protobuf >= 4.22.0']
14+
JSON_REQUIRES = ['jsonschema >= 3.2.0', 'pyrsistent >= 0.20.0']
15+
PROTO_REQUIRES = ['protobuf >= 4.22.0', 'googleapis-common-protos >= 1.66.0']
1716
SCHEMA_REGISTRY_REQUIRES = ['httpx>=0.26', 'cachetools >= 5.5.0', 'attrs >= 24.3.0']
18-
ALL = AVRO_REQUIRES + LEGACYAVRO_REQUIRES + JSON_REQUIRES + PROTO_REQUIRES + SCHEMA_REGISTRY_REQUIRES
17+
ALL = AVRO_REQUIRES + JSON_REQUIRES + PROTO_REQUIRES + SCHEMA_REGISTRY_REQUIRES
1918
setup(name = 'robotframework-confluentkafkalibrary',
2019
version = VERSION,
2120
description = 'Confluent Kafka library for Robot Framework',
@@ -40,7 +39,6 @@
4039
extras_require={
4140
'all': ALL,
4241
'avro': AVRO_REQUIRES,
43-
'legacyavro': LEGACYAVRO_REQUIRES,
4442
'json': JSON_REQUIRES,
4543
'protobuf': PROTO_REQUIRES,
4644
'schemaregistry': SCHEMA_REGISTRY_REQUIRES,

src/ConfluentKafkaLibrary/__init__.py

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class ConfluentKafkaLibrary(*IMPORTS):
3232
3333
*Basic Consumer with predefined group_id*
3434
35-
| ${group_id}= | `Create Consumer` | group_id=mygroup | # if group_id is not defined uuid4() is gemerated |
35+
| ${group_id}= | `Create Consumer` | group_id=mygroup | # if group_id is not defined uuid4() is generated |
3636
| `Subscribe Topic` | group_id=${group_id} | topics=test_topic |
3737
| ${result}= | `Poll` | group_id=${group_id} | max_records=5 |
3838
| `Log` | ${result} |
@@ -58,20 +58,6 @@ class ConfluentKafkaLibrary(*IMPORTS):
5858
| ${json} | Convert String to JSON | ${messages}[0] |
5959
| ${jsonValue} | Get value from JSON | ${json} | $.key |
6060
61-
*Run Avro Consumer over HTTPS and threaded:*
62-
63-
| ${thread}= | `Start Consumer Threaded` |
64-
| | ... topics=test_topic |
65-
| | ... schema_registry_url=https://localhost:8081 |
66-
| | ... auto_offset_reset=earliest | # We want to get all messages |
67-
| | ... security.protocol=ssl |
68-
| | ... schema.registry.ssl.ca.location=/home/user/cert/caproxy.pem |
69-
| | ... ssl.ca.location=/home/user/cert/caproxy.pem |
70-
| | ... ssl.certificate.location=/home/user/cert/kafka-client-cert.pem |
71-
| | ... ssl.key.location=/home/user/cert/kafka-client-key.pem |
72-
| `Log` | `Execute commands which should push some data to topic` |
73-
| ${messages}= | `Get Messages From Thread` | ${thread} |
74-
7561
"""
7662

7763
ROBOT_LIBRARY_VERSION = VERSION

src/ConfluentKafkaLibrary/consumer.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
try:
88
from confluent_kafka.avro.serializer import SerializerError
9-
from confluent_kafka.avro import AvroConsumer
109
except ImportError:
1110
pass
1211

@@ -86,11 +85,10 @@ def create_consumer(
8685
port="9092",
8786
enable_auto_commit=True,
8887
auto_offset_reset="latest",
89-
schema_registry_url=None,
9088
auto_create_topics=True,
9189
key_deserializer=None,
9290
value_deserializer=None,
93-
legacy=True,
91+
deserializing=False,
9492
**kwargs
9593
):
9694
"""Create Kafka Consumer and returns its `group_id` as string.
@@ -110,13 +108,10 @@ def create_consumer(
110108
other value will raise the exception. Default: `latest`.
111109
- ``enable_auto_commit`` (bool): If true the consumer's offset will be
112110
periodically committed in the background. Default: `True`.
113-
- ``schema_registry_url`` (str): *required* for Avro Consumer.
114-
Full URL to avro schema endpoint.
115111
- ``auto_create_topics`` (bool): Consumers no longer trigger auto creation of topics,
116112
will be removed in future release. Default: `True`.
117-
- ``legacy`` (bool): Activate SerializingConsumer if 'False' else
118-
AvroConsumer (legacy) is used. Will be removed when confluent-kafka will deprecate this.
119-
Default: `True`.
113+
- ``deserializing`` (bool): Activates DeserializingConsumer with deserialization capabilities.
114+
Default: `False`.
120115
121116
Note:
122117
Configuration parameters are described in more detail at
@@ -125,16 +120,7 @@ def create_consumer(
125120
if group_id is None:
126121
group_id = str(uuid.uuid4())
127122

128-
if schema_registry_url and legacy:
129-
consumer = AvroConsumer({
130-
'bootstrap.servers': '{}:{}'.format(server, port),
131-
'group.id': group_id,
132-
'enable.auto.commit': enable_auto_commit,
133-
'allow.auto.create.topics': auto_create_topics,
134-
'auto.offset.reset': auto_offset_reset,
135-
'schema.registry.url': schema_registry_url,
136-
**kwargs})
137-
elif not legacy:
123+
if deserializing:
138124
consumer = DeserializingConsumer({
139125
'bootstrap.servers': '{}:{}'.format(server, port),
140126
'group.id': group_id,

0 commit comments

Comments
 (0)