Skip to content

Commit b6be9ae

Browse files
committed
CustomerOrder push working KStream but Customer information is empty
1 parent 91ecf58 commit b6be9ae

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

src/main/java/com/kafkastream/events/EventsListener.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.streams.state.KeyValueStore;
2525
import org.apache.kafka.streams.state.StoreBuilder;
2626
import org.apache.kafka.streams.state.Stores;
27+
import org.springframework.web.client.RestTemplate;
2728

2829
import java.util.Collections;
2930
import java.util.HashMap;
@@ -89,10 +90,22 @@ public static void main(String[] args)
8990
System.out.println("orderKTable.value: " + order);
9091
CustomerOrder customerOrder = new CustomerOrder();
9192
customerOrder.setCustomerId(order.getCustomerId());
92-
customerOrder.setFirstName("");
93-
customerOrder.setLastName("");
94-
customerOrder.setEmail("");
95-
customerOrder.setPhone("");
93+
Customer customer=getCustomerInformation(order.getCustomerId());
94+
if(customer == null)
95+
{
96+
customerOrder.setFirstName("");
97+
customerOrder.setLastName("");
98+
customerOrder.setEmail("");
99+
customerOrder.setPhone("");
100+
}
101+
else
102+
{
103+
customerOrder.setFirstName(customer.getFirstName());
104+
customerOrder.setLastName(customer.getLastName());
105+
customerOrder.setEmail(customer.getEmail());
106+
customerOrder.setPhone(customer.getPhone());
107+
}
108+
96109
customerOrder.setOrderId(order.getOrderId());
97110
customerOrder.setOrderItemName(order.getOrderItemName());
98111
customerOrder.setOrderPlace(order.getOrderPlace());
@@ -135,6 +148,12 @@ public void run()
135148
System.exit(0);
136149
}
137150

151+
private static Customer getCustomerInformation(CharSequence customerId)
152+
{
153+
RestTemplate restTemplate=new RestTemplate();
154+
return restTemplate.getForObject("http://localhost:8095/store/customer/"+customerId,Customer.class);
155+
}
156+
138157

139158
private static <VT extends SpecificRecord> SpecificAvroSerde<VT> createSerde(String schemaRegistryUrl)
140159
{

src/main/java/com/kafkastream/web/kafkarest/StateStoreRestService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,10 @@ public void start() throws Exception
200200
jettyServer.start();
201201
}
202202

203-
public Customer getCustomerInformation(CharSequence customerId)
203+
@GET
204+
@Path("/customer/{customerId}")
205+
@Produces(MediaType.APPLICATION_JSON)
206+
public Customer getCustomerInformation(@PathParam("customerId") String customerId)
204207
{
205208
try
206209
{

0 commit comments

Comments
 (0)