1
1
package org .apache .eventmesh .protocol .a2a ;
2
2
3
- import org .apache .eventmesh .common .protocol .ProtocolAdaptor ;
4
3
import org .apache .eventmesh .common .protocol .ProtocolTransportObject ;
4
+ import org .apache .eventmesh .protocol .api .ProtocolAdaptor ;
5
+ import org .apache .eventmesh .protocol .api .exception .ProtocolHandleException ;
5
6
import org .apache .eventmesh .common .protocol .grpc .cloudevents .CloudEvent ;
6
7
import org .apache .eventmesh .common .protocol .grpc .cloudevents .CloudEvent .CloudEventAttributeValue ;
7
8
import org .apache .eventmesh .common .protocol .http .HttpMessage ;
17
18
18
19
import java .nio .charset .StandardCharsets ;
19
20
import java .util .HashMap ;
21
+ import java .util .List ;
20
22
import java .util .Map ;
21
23
import java .util .UUID ;
22
24
25
+ import lombok .extern .slf4j .Slf4j ;
26
+
23
27
/**
24
- * A2A (Agent-to-Agent) Protocol Adaptor
28
+ * Enhanced A2A (Agent-to-Agent) Protocol Adaptor
25
29
* Handles conversion between A2A protocol messages and EventMesh internal formats
26
30
*/
31
+ @ Slf4j
27
32
public class A2AProtocolAdaptor implements ProtocolAdaptor <ProtocolTransportObject > {
28
33
29
- public static final String PROTOCOL_TYPE = "A2A" ;
30
- public static final String PROTOCOL_VERSION = "1.0" ;
34
+ private static final String PROTOCOL_TYPE = "A2A" ;
35
+ private static final String PROTOCOL_VERSION = "1.0" ;
36
+ private volatile boolean initialized = false ;
37
+
38
+
39
+ @ Override
40
+ public void initialize () {
41
+ if (!initialized ) {
42
+ log .info ("Initializing A2A Protocol Adaptor v{}" , PROTOCOL_VERSION );
43
+ initialized = true ;
44
+ }
45
+ }
46
+
47
+ @ Override
48
+ public void destroy () {
49
+ if (initialized ) {
50
+ log .info ("Destroying A2A Protocol Adaptor" );
51
+ initialized = false ;
52
+ }
53
+ }
54
+
55
+ @ Override
56
+ public String getProtocolType () {
57
+ return PROTOCOL_TYPE ;
58
+ }
31
59
32
60
@ Override
33
- public CloudEvent toCloudEvent (ProtocolTransportObject protocolTransportObject ) {
61
+ public String getVersion () {
62
+ return PROTOCOL_VERSION ;
63
+ }
64
+
65
+ @ Override
66
+ public int getPriority () {
67
+ return 80 ; // High priority for A2A protocol
68
+ }
69
+
70
+ @ Override
71
+ public boolean supportsBatchProcessing () {
72
+ return true ;
73
+ }
74
+
75
+ @ Override
76
+ public java .util .Set <String > getCapabilities () {
77
+ return java .util .Set .of ("agent-communication" , "workflow-orchestration" , "state-sync" );
78
+ }
79
+
80
+ @ Override
81
+ public boolean isValid (ProtocolTransportObject protocol ) {
82
+ if (protocol == null ) {
83
+ return false ;
84
+ }
85
+
86
+ // Check if it's an A2A message
87
+ try {
88
+ if (protocol instanceof RequestMessage ) {
89
+ String content = protocol .toString ();
90
+ return content .contains ("\" protocol\" :\" A2A\" " ) ||
91
+ content .contains ("A2A" ) ||
92
+ content .contains ("agent" );
93
+ }
94
+ return false ;
95
+ } catch (Exception e ) {
96
+ return false ;
97
+ }
98
+ }
99
+
100
+ @ Override
101
+ public CloudEvent toCloudEvent (ProtocolTransportObject protocolTransportObject )
102
+ throws ProtocolHandleException {
34
103
if (protocolTransportObject instanceof RequestMessage ) {
35
104
return toCloudEvent ((RequestMessage ) protocolTransportObject );
36
105
} else if (protocolTransportObject instanceof ResponseMessage ) {
37
106
return toCloudEvent ((ResponseMessage ) protocolTransportObject );
38
107
}
39
- throw new IllegalArgumentException ("Unsupported protocol transport object type" );
108
+ throw new ProtocolHandleException ("Unsupported protocol transport object type: " +
109
+ protocolTransportObject .getClass ().getName ());
40
110
}
41
111
42
112
@ Override
43
- public ProtocolTransportObject fromCloudEvent (CloudEvent cloudEvent ) {
113
+ public java .util .List <CloudEvent > toBatchCloudEvent (ProtocolTransportObject protocol )
114
+ throws ProtocolHandleException {
115
+ // For A2A protocol, we can process multiple agent messages in batch
116
+ if (protocol instanceof RequestMessage ) {
117
+ try {
118
+ String body = protocol .toString ();
119
+ if (body .contains ("batchMessages" )) {
120
+ // Parse batch A2A messages
121
+ java .util .List <A2AMessage > batchMessages = parseBatchA2AMessages (body );
122
+ return batchMessages .stream ()
123
+ .map (this ::convertA2AMessageToCloudEvent )
124
+ .collect (java .util .stream .Collectors .toList ());
125
+ }
126
+ } catch (Exception e ) {
127
+ throw new ProtocolHandleException ("Failed to process batch A2A messages" , e );
128
+ }
129
+ }
130
+
131
+ // Fall back to single message processing
132
+ return java .util .List .of (toCloudEvent (protocol ));
133
+ }
134
+
135
+ @ Override
136
+ public ProtocolTransportObject fromCloudEvent (CloudEvent cloudEvent )
137
+ throws ProtocolHandleException {
44
138
String protocolType = cloudEvent .getAttributesMap ().get ("protocol" ).getCeString ();
45
139
if (!PROTOCOL_TYPE .equals (protocolType )) {
46
- throw new IllegalArgumentException ("Unsupported protocol type: " + protocolType );
140
+ throw new ProtocolHandleException ("Unsupported protocol type: " + protocolType );
47
141
}
48
142
49
143
String messageType = cloudEvent .getAttributesMap ().get ("messageType" ).getCeString ();
@@ -79,7 +173,7 @@ private CloudEvent toCloudEvent(RequestMessage requestMessage) {
79
173
80
174
return cloudEventBuilder .build ();
81
175
} catch (Exception e ) {
82
- throw new RuntimeException ("Failed to convert RequestMessage to CloudEvent" , e );
176
+ throw new ProtocolHandleException ("Failed to convert RequestMessage to CloudEvent" , e );
83
177
}
84
178
}
85
179
@@ -102,7 +196,7 @@ private CloudEvent toCloudEvent(ResponseMessage responseMessage) {
102
196
103
197
return cloudEventBuilder .build ();
104
198
} catch (Exception e ) {
105
- throw new RuntimeException ("Failed to convert ResponseMessage to CloudEvent" , e );
199
+ throw new ProtocolHandleException ("Failed to convert ResponseMessage to CloudEvent" , e );
106
200
}
107
201
}
108
202
@@ -111,7 +205,7 @@ private A2AMessage parseA2AMessage(CloudEvent cloudEvent) {
111
205
String data = new String (cloudEvent .getData ().toByteArray (), StandardCharsets .UTF_8 );
112
206
return JsonUtils .parseObject (data , A2AMessage .class );
113
207
} catch (Exception e ) {
114
- throw new RuntimeException ("Failed to parse A2A message from CloudEvent" , e );
208
+ throw new ProtocolHandleException ("Failed to parse A2A message from CloudEvent" , e );
115
209
}
116
210
}
117
211
@@ -133,7 +227,73 @@ private ProtocolTransportObject createHttpMessage(A2AMessage a2aMessage) {
133
227
RequestMessage requestMessage = new RequestMessage (header , body );
134
228
return requestMessage ;
135
229
} catch (Exception e ) {
136
- throw new RuntimeException ("Failed to create HTTP message from A2A message" , e );
230
+ throw new ProtocolHandleException ("Failed to create HTTP message from A2A message" , e );
231
+ }
232
+ }
233
+
234
+ /**
235
+ * Parse batch A2A messages from request body.
236
+ */
237
+ private List <A2AMessage > parseBatchA2AMessages (String body ) throws ProtocolHandleException {
238
+ try {
239
+ // Parse JSON to extract batch messages
240
+ Map <String , Object > bodyMap = JsonUtils .parseTypeReferenceObject (body ,
241
+ new com .fasterxml .jackson .core .type .TypeReference <Map <String , Object >>() {});
242
+
243
+ if (bodyMap .containsKey ("batchMessages" )) {
244
+ @ SuppressWarnings ("unchecked" )
245
+ List <Map <String , Object >> messagesData = (List <Map <String , Object >>) bodyMap .get ("batchMessages" );
246
+
247
+ return messagesData .stream ()
248
+ .map (data -> JsonUtils .mapToObject (data , A2AMessage .class ))
249
+ .collect (java .util .stream .Collectors .toList ());
250
+ }
251
+
252
+ return List .of ();
253
+ } catch (Exception e ) {
254
+ throw new ProtocolHandleException ("Failed to parse batch A2A messages" , e );
255
+ }
256
+ }
257
+
258
+ /**
259
+ * Convert A2A message to CloudEvent.
260
+ */
261
+ private CloudEvent convertA2AMessageToCloudEvent (A2AMessage a2aMessage ) {
262
+ try {
263
+ CloudEvent .Builder cloudEventBuilder = CloudEvent .newBuilder ()
264
+ .setId (a2aMessage .getMessageId () != null ? a2aMessage .getMessageId () : UUID .randomUUID ().toString ())
265
+ .setSource ("eventmesh-a2a" )
266
+ .setSpecVersion ("1.0" )
267
+ .setType ("org.apache.eventmesh.protocol.a2a." + a2aMessage .getMessageType ().toLowerCase ())
268
+ .setData (com .google .protobuf .ByteString .copyFromUtf8 (JsonUtils .toJSONString (a2aMessage )))
269
+ .putAttributes ("protocol" ,
270
+ CloudEvent .CloudEventAttributeValue .newBuilder ().setCeString (PROTOCOL_TYPE ).build ())
271
+ .putAttributes ("version" ,
272
+ CloudEvent .CloudEventAttributeValue .newBuilder ().setCeString (PROTOCOL_VERSION ).build ())
273
+ .putAttributes ("messageType" ,
274
+ CloudEvent .CloudEventAttributeValue .newBuilder ().setCeString (a2aMessage .getMessageType ()).build ());
275
+
276
+ if (a2aMessage .getSourceAgent () != null ) {
277
+ cloudEventBuilder .putAttributes ("sourceAgent" ,
278
+ CloudEvent .CloudEventAttributeValue .newBuilder ()
279
+ .setCeString (a2aMessage .getSourceAgent ().getAgentId ()).build ());
280
+ }
281
+
282
+ if (a2aMessage .getTargetAgent () != null ) {
283
+ cloudEventBuilder .putAttributes ("targetAgent" ,
284
+ CloudEvent .CloudEventAttributeValue .newBuilder ()
285
+ .setCeString (a2aMessage .getTargetAgent ().getAgentId ()).build ());
286
+ }
287
+
288
+ if (a2aMessage .getMetadata () != null && a2aMessage .getMetadata ().getCorrelationId () != null ) {
289
+ cloudEventBuilder .putAttributes ("correlationId" ,
290
+ CloudEvent .CloudEventAttributeValue .newBuilder ()
291
+ .setCeString (a2aMessage .getMetadata ().getCorrelationId ()).build ());
292
+ }
293
+
294
+ return cloudEventBuilder .build ();
295
+ } catch (Exception e ) {
296
+ throw new RuntimeException ("Failed to convert A2A message to CloudEvent" , e );
137
297
}
138
298
}
139
299
0 commit comments