Skip to content

Commit 537ab81

Browse files
committed
Fixed all bugs
1 parent b6be9ae commit 537ab81

File tree

6 files changed

+46
-12
lines changed

6 files changed

+46
-12
lines changed

src/main/java/com/kafkastream/constants/KafkaConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ public class KafkaConstants
55
//Kafka Streams Configuration
66
public static String SCHEMA_REGISTRY_URL = "http://localhost:8081";
77
public static String APPLICATION_ID_CONFIG = "cqrs-streams";
8+
public static String APPLICATION_ID_CONFIG2 = "cqrs-streams2";
89
public static String APPLICATION_SERVER_CONFIG = "localhost:8095";
910
public static String BOOTSTRAP_SERVERS_CONFIG = "localhost:9092";
1011
public static String COMMIT_INTERVAL_MS_CONFIG = "2000";

src/main/java/com/kafkastream/events/EventsListener.java renamed to src/main/java/com/kafkastream/events/services/EventsListener.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
package com.kafkastream.events;
1+
package com.kafkastream.events.services;
22

33
import com.kafkastream.constants.KafkaConstants;
4+
import com.kafkastream.dto.CustomerDto;
45
import com.kafkastream.model.Customer;
56
import com.kafkastream.model.CustomerOrder;
67
import com.kafkastream.model.Greetings;
@@ -24,6 +25,8 @@
2425
import org.apache.kafka.streams.state.KeyValueStore;
2526
import org.apache.kafka.streams.state.StoreBuilder;
2627
import org.apache.kafka.streams.state.Stores;
28+
import org.springframework.http.HttpMethod;
29+
import org.springframework.http.ResponseEntity;
2730
import org.springframework.web.client.RestTemplate;
2831

2932
import java.util.Collections;
@@ -44,7 +47,7 @@ public class EventsListener
4447
private static void setUp()
4548
{
4649
properties = new Properties();
47-
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, KafkaConstants.APPLICATION_ID_CONFIG);
50+
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, KafkaConstants.APPLICATION_ID_CONFIG2);
4851
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS_CONFIG);
4952
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, KafkaConstants.APPLICATION_SERVER_CONFIG);
5053
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, KafkaConstants.COMMIT_INTERVAL_MS_CONFIG);
@@ -59,9 +62,12 @@ private static void setUp()
5962

6063
public static void main(String[] args)
6164
{
65+
//StateStoreService stateStoreService=new StateStoreService();
66+
6267
//Setup StreamsBuilder
6368
setUp();
6469

70+
6571
SpecificAvroSerde<Customer> customerSerde = createSerde(KafkaConstants.SCHEMA_REGISTRY_URL);
6672
SpecificAvroSerde<Order> orderSerde = createSerde(KafkaConstants.SCHEMA_REGISTRY_URL);
6773
SpecificAvroSerde<Greetings> greetingsSerde = createSerde(KafkaConstants.SCHEMA_REGISTRY_URL);
@@ -90,7 +96,7 @@ public static void main(String[] args)
9096
System.out.println("orderKTable.value: " + order);
9197
CustomerOrder customerOrder = new CustomerOrder();
9298
customerOrder.setCustomerId(order.getCustomerId());
93-
Customer customer=getCustomerInformation(order.getCustomerId());
99+
CustomerDto customer=getCustomerInformation(order.getCustomerId());
94100
if(customer == null)
95101
{
96102
customerOrder.setFirstName("");
@@ -134,7 +140,6 @@ public static void main(String[] args)
134140
e.printStackTrace();
135141
}
136142

137-
138143
//Close Runtime
139144
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook")
140145
{
@@ -148,10 +153,16 @@ public void run()
148153
System.exit(0);
149154
}
150155

151-
private static Customer getCustomerInformation(CharSequence customerId)
156+
private static CustomerDto getCustomerInformation(CharSequence customerId)
152157
{
153158
RestTemplate restTemplate=new RestTemplate();
154-
return restTemplate.getForObject("http://localhost:8095/store/customer/"+customerId,Customer.class);
159+
if(customerId!=null)
160+
{
161+
ResponseEntity<CustomerDto> customerResponseEntity= restTemplate.exchange("http://" +KafkaConstants.REST_PROXY_HOST+":"+KafkaConstants.REST_PROXY_PORT+ "store/customer/"+customerId, HttpMethod.GET,null, CustomerDto.class);
162+
return customerResponseEntity.getBody();
163+
164+
}
165+
return null;
155166
}
156167

157168

src/main/java/com/kafkastream/events/EventsSender.java renamed to src/main/java/com/kafkastream/events/services/EventsSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.kafkastream.events;
1+
package com.kafkastream.events.services;
22

33
import com.kafkastream.constants.KafkaConstants;
44
import com.kafkastream.model.Customer;

src/main/java/com/kafkastream/web/EventsController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.kafkastream.web;
22

3-
import com.kafkastream.events.EventsSender;
3+
import com.kafkastream.events.services.EventsSender;
44
import com.kafkastream.model.Customer;
55
import com.kafkastream.model.Greetings;
66
import com.kafkastream.model.Order;

src/main/java/com/kafkastream/web/kafkarest/StateStoreRestService.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ private static <T> T waitUntilStoreIsQueryable(final String storeName, final Que
7070
@GET
7171
@Path("/customer-order/{customerId}")
7272
@Produces(MediaType.APPLICATION_JSON)
73-
public List<CustomerOrderDTO> getCustomerOrders(@PathParam("customerId") String customerId) throws InterruptedException
73+
public List<CustomerOrderDTO> getCustomerOrder(@PathParam("customerId") String customerId) throws InterruptedException
7474
{
75-
System.out.println("Inside getCustomerOrders()");
75+
System.out.println("Inside getCustomerOrder()");
7676
List<CustomerOrderDTO> customerOrderList = new ArrayList<>();
7777
ReadOnlyKeyValueStore<String, CustomerOrder> customerOrdersStore = waitUntilStoreIsQueryable(KafkaConstants.CUSTOMER_ORDER_STORE_NAME, QueryableStoreTypes.keyValueStore(), streams);
7878

@@ -203,7 +203,29 @@ public void start() throws Exception
203203
@GET
204204
@Path("/customer/{customerId}")
205205
@Produces(MediaType.APPLICATION_JSON)
206-
public Customer getCustomerInformation(@PathParam("customerId") String customerId)
206+
public CustomerDto getCustomerInformation(@PathParam("customerId") String customerId)
207+
{
208+
try
209+
{
210+
ReadOnlyKeyValueStore<String, Customer> customersStore = waitUntilStoreIsQueryable(KafkaConstants.CUSTOMER_STORE_NAME, QueryableStoreTypes.keyValueStore(), streams);
211+
KeyValueIterator<String, Customer> keyValueIterator = customersStore.all();
212+
while (keyValueIterator.hasNext())
213+
{
214+
Customer customer=keyValueIterator.next().value;
215+
if(customer.getCustomerId().toString().equals(customerId))
216+
return new CustomerDto(customer.getCustomerId().toString(),customer.getFirstName().toString(),customer.getLastName().toString(),
217+
customer.getEmail().toString(),customer.getPhone().toString());
218+
}
219+
}
220+
catch (Exception e)
221+
{
222+
e.printStackTrace();
223+
}
224+
return null;
225+
}
226+
227+
228+
public Customer getCustomer(String customerId)
207229
{
208230
try
209231
{

src/test/java/com/kafkastream/TestProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.kafkastream;
22

3-
import com.kafkastream.events.EventsSender;
3+
import com.kafkastream.events.services.EventsSender;
44
import com.kafkastream.model.Customer;
55
import com.kafkastream.model.Order;
66
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;

0 commit comments

Comments
 (0)