5
5
import com .falkordb .impl .Utils ;
6
6
import com .falkordb .impl .graph_cache .GraphCache ;
7
7
import com .falkordb .impl .resultset .ResultSetImpl ;
8
- import redis .clients .jedis .*;
9
- import redis .clients .jedis .commands .ProtocolCommand ;
8
+ import redis .clients .jedis .Builder ;
9
+ import redis .clients .jedis .BuilderFactory ;
10
+ import redis .clients .jedis .Client ;
11
+ import redis .clients .jedis .Pipeline ;
12
+ import redis .clients .jedis .Response ;
10
13
11
- import java .util .Arrays ;
12
14
import java .util .List ;
13
15
import java .util .Map ;
14
16
@@ -21,27 +23,22 @@ public class GraphPipelineImpl extends Pipeline implements com.falkordb.GraphPip
21
23
private GraphCache cache ;
22
24
private final String graphId ;
23
25
24
- public GraphPipelineImpl (Connection connection , Graph graph , GraphCache cache , String graphId ){
25
- super ( connection );
26
+ public GraphPipelineImpl (Client client , Graph graph , GraphCache cache , String graphId ){
27
+ super . setClient ( client );
26
28
this .graph = graph ;
27
29
this .cache = cache ;
28
30
this .graphId = graphId ;
29
31
}
30
32
31
- protected <T > Response <T > appendWithResponse (ProtocolCommand protocolCommand , List <Object > arguments , Builder <T > builder ) {
32
- CommandArguments commandArguments = new CommandArguments (protocolCommand );
33
- arguments .forEach (commandArguments ::add );
34
- return this .appendCommand (new CommandObject <>(commandArguments , builder ));
35
- }
36
-
37
33
/**
38
34
* Execute a Cypher query.
39
35
* @param query Cypher query
40
36
* @return a response which builds the result set with the query answer.
41
37
*/
42
38
@ Override
43
39
public Response <ResultSet > query (String query ) {
44
- return appendWithResponse (GraphCommand .QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING ), new Builder <ResultSet >() {
40
+ client .sendCommand (GraphCommand .QUERY , graphId , query , Utils .COMPACT_STRING );
41
+ return getResponse (new Builder <ResultSet >() {
45
42
@ SuppressWarnings ("unchecked" )
46
43
@ Override
47
44
public ResultSet build (Object o ) {
@@ -57,7 +54,8 @@ public ResultSet build(Object o) {
57
54
*/
58
55
@ Override
59
56
public Response <ResultSet > readOnlyQuery (String query ) {
60
- return appendWithResponse (GraphCommand .RO_QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING ), new Builder <ResultSet >() {
57
+ client .sendCommand (GraphCommand .RO_QUERY , graphId , query , Utils .COMPACT_STRING );
58
+ return getResponse (new Builder <ResultSet >() {
61
59
@ SuppressWarnings ("unchecked" )
62
60
@ Override
63
61
public ResultSet build (Object o ) {
@@ -76,8 +74,9 @@ public ResultSet build(Object o) {
76
74
*/
77
75
@ Override
78
76
public Response <ResultSet > query (String query , long timeout ) {
79
- return appendWithResponse (GraphCommand .QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
80
- Long .toString (timeout )), new Builder <ResultSet >() {
77
+ client .sendCommand (GraphCommand .QUERY , graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
78
+ Long .toString (timeout ));
79
+ return getResponse (new Builder <ResultSet >() {
81
80
@ SuppressWarnings ("unchecked" )
82
81
@ Override
83
82
public ResultSet build (Object o ) {
@@ -96,8 +95,9 @@ public ResultSet build(Object o) {
96
95
*/
97
96
@ Override
98
97
public Response <ResultSet > readOnlyQuery (String query , long timeout ) {
99
- return appendWithResponse (GraphCommand .RO_QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
100
- Long .toString (timeout )), new Builder <ResultSet >() {
98
+ client .sendCommand (GraphCommand .RO_QUERY , graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
99
+ Long .toString (timeout ));
100
+ return getResponse (new Builder <ResultSet >() {
101
101
@ SuppressWarnings ("unchecked" )
102
102
@ Override
103
103
public ResultSet build (Object o ) {
@@ -114,7 +114,9 @@ public ResultSet build(Object o) {
114
114
*/
115
115
@ Override
116
116
public Response <ResultSet > query (String query , Map <String , Object > params ) {
117
- return appendWithResponse (GraphCommand .QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING ), new Builder <ResultSet >() {
117
+ String preparedQuery = Utils .prepareQuery (query , params );
118
+ client .sendCommand (GraphCommand .QUERY , graphId , preparedQuery , Utils .COMPACT_STRING );
119
+ return getResponse (new Builder <ResultSet >() {
118
120
@ SuppressWarnings ("unchecked" )
119
121
@ Override
120
122
public ResultSet build (Object o ) {
@@ -131,7 +133,9 @@ public ResultSet build(Object o) {
131
133
*/
132
134
@ Override
133
135
public Response <ResultSet > readOnlyQuery (String query , Map <String , Object > params ) {
134
- return appendWithResponse (GraphCommand .RO_QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING ), new Builder <ResultSet >() {
136
+ String preparedQuery = Utils .prepareQuery (query , params );
137
+ client .sendCommand (GraphCommand .RO_QUERY , graphId , preparedQuery , Utils .COMPACT_STRING );
138
+ return getResponse (new Builder <ResultSet >() {
135
139
@ SuppressWarnings ("unchecked" )
136
140
@ Override
137
141
public ResultSet build (Object o ) {
@@ -152,8 +156,10 @@ public ResultSet build(Object o) {
152
156
*/
153
157
@ Override
154
158
public Response <ResultSet > query (String query , Map <String , Object > params , long timeout ) {
155
- return appendWithResponse (GraphCommand .QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
156
- Long .toString (timeout )), new Builder <ResultSet >() {
159
+ String preparedQuery = Utils .prepareQuery (query , params );
160
+ client .sendCommand (GraphCommand .QUERY , graphId , preparedQuery , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
161
+ Long .toString (timeout ));
162
+ return getResponse (new Builder <ResultSet >() {
157
163
@ SuppressWarnings ("unchecked" )
158
164
@ Override
159
165
public ResultSet build (Object o ) {
@@ -174,8 +180,11 @@ public ResultSet build(Object o) {
174
180
*/
175
181
@ Override
176
182
public Response <ResultSet > readOnlyQuery (String query , Map <String , Object > params , long timeout ) {
177
- return appendWithResponse (GraphCommand .RO_QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
178
- Long .toString (timeout )), new Builder <ResultSet >() {
183
+ String preparedQuery = Utils .prepareQuery (query , params );
184
+ client .sendCommand (GraphCommand .RO_QUERY , graphId , preparedQuery , Utils .COMPACT_STRING ,
185
+ Utils .TIMEOUT_STRING ,
186
+ Long .toString (timeout ));
187
+ return getResponse (new Builder <ResultSet >() {
179
188
@ SuppressWarnings ("unchecked" )
180
189
@ Override
181
190
public ResultSet build (Object o ) {
@@ -226,7 +235,8 @@ public Response<ResultSet> callProcedure(String procedure, List<String> args,
226
235
*/
227
236
@ Override
228
237
public Response <String > copyGraph (String destinationGraphId ) {
229
- return appendWithResponse (GraphCommand .COPY , Arrays .asList (graphId , destinationGraphId ), BuilderFactory .STRING );
238
+ client .sendCommand (GraphCommand .COPY , graphId , destinationGraphId );
239
+ return getResponse (BuilderFactory .STRING );
230
240
}
231
241
232
242
@@ -236,11 +246,10 @@ public Response<String> copyGraph(String destinationGraphId) {
236
246
*/
237
247
@ Override
238
248
public Response <String > deleteGraph (){
239
- try {
240
- return appendWithResponse (GraphCommand .DELETE , Arrays .asList (graphId ), BuilderFactory .STRING );
241
- } finally {
242
- cache .clear ();
243
- }
244
249
250
+ client .sendCommand (GraphCommand .DELETE , graphId );
251
+ Response <String > response = getResponse (BuilderFactory .STRING );
252
+ cache .clear ();
253
+ return response ;
245
254
}
246
255
}
0 commit comments