5
5
import com .falkordb .impl .Utils ;
6
6
import com .falkordb .impl .graph_cache .GraphCache ;
7
7
import com .falkordb .impl .resultset .ResultSetImpl ;
8
- import redis .clients .jedis .Builder ;
9
- import redis .clients .jedis .BuilderFactory ;
10
- import redis .clients .jedis .Client ;
11
- import redis .clients .jedis .Pipeline ;
12
- import redis .clients .jedis .Response ;
8
+ import redis .clients .jedis .*;
9
+ import redis .clients .jedis .commands .ProtocolCommand ;
13
10
11
+ import java .util .Arrays ;
14
12
import java .util .List ;
15
13
import java .util .Map ;
16
14
@@ -23,22 +21,27 @@ public class GraphPipelineImpl extends Pipeline implements com.falkordb.GraphPip
23
21
private GraphCache cache ;
24
22
private final String graphId ;
25
23
26
- public GraphPipelineImpl (Client client , Graph graph , GraphCache cache , String graphId ){
27
- super . setClient ( client );
24
+ public GraphPipelineImpl (Connection connection , Graph graph , GraphCache cache , String graphId ){
25
+ super ( connection );
28
26
this .graph = graph ;
29
27
this .cache = cache ;
30
28
this .graphId = graphId ;
31
29
}
32
30
31
+ protected <T > Response <T > appendWithResponse (ProtocolCommand protocolCommand , List <Object > arguments , Builder <T > builder ) {
32
+ CommandArguments commandArguments = new CommandArguments (protocolCommand );
33
+ arguments .forEach (commandArguments ::add );
34
+ return this .appendCommand (new CommandObject <>(commandArguments , builder ));
35
+ }
36
+
33
37
/**
34
38
* Execute a Cypher query.
35
39
* @param query Cypher query
36
40
* @return a response which builds the result set with the query answer.
37
41
*/
38
42
@ Override
39
43
public Response <ResultSet > query (String query ) {
40
- client .sendCommand (GraphCommand .QUERY , graphId , query , Utils .COMPACT_STRING );
41
- return getResponse (new Builder <ResultSet >() {
44
+ return appendWithResponse (GraphCommand .QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING ), new Builder <ResultSet >() {
42
45
@ SuppressWarnings ("unchecked" )
43
46
@ Override
44
47
public ResultSet build (Object o ) {
@@ -54,8 +57,7 @@ public ResultSet build(Object o) {
54
57
*/
55
58
@ Override
56
59
public Response <ResultSet > readOnlyQuery (String query ) {
57
- client .sendCommand (GraphCommand .RO_QUERY , graphId , query , Utils .COMPACT_STRING );
58
- return getResponse (new Builder <ResultSet >() {
60
+ return appendWithResponse (GraphCommand .RO_QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING ), new Builder <ResultSet >() {
59
61
@ SuppressWarnings ("unchecked" )
60
62
@ Override
61
63
public ResultSet build (Object o ) {
@@ -74,9 +76,8 @@ public ResultSet build(Object o) {
74
76
*/
75
77
@ Override
76
78
public Response <ResultSet > query (String query , long timeout ) {
77
- client .sendCommand (GraphCommand .QUERY , graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
78
- Long .toString (timeout ));
79
- return getResponse (new Builder <ResultSet >() {
79
+ return appendWithResponse (GraphCommand .QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
80
+ Long .toString (timeout )), new Builder <ResultSet >() {
80
81
@ SuppressWarnings ("unchecked" )
81
82
@ Override
82
83
public ResultSet build (Object o ) {
@@ -95,9 +96,8 @@ public ResultSet build(Object o) {
95
96
*/
96
97
@ Override
97
98
public Response <ResultSet > readOnlyQuery (String query , long timeout ) {
98
- client .sendCommand (GraphCommand .RO_QUERY , graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
99
- Long .toString (timeout ));
100
- return getResponse (new Builder <ResultSet >() {
99
+ return appendWithResponse (GraphCommand .RO_QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
100
+ Long .toString (timeout )), new Builder <ResultSet >() {
101
101
@ SuppressWarnings ("unchecked" )
102
102
@ Override
103
103
public ResultSet build (Object o ) {
@@ -114,9 +114,7 @@ public ResultSet build(Object o) {
114
114
*/
115
115
@ Override
116
116
public Response <ResultSet > query (String query , Map <String , Object > params ) {
117
- String preparedQuery = Utils .prepareQuery (query , params );
118
- client .sendCommand (GraphCommand .QUERY , graphId , preparedQuery , Utils .COMPACT_STRING );
119
- return getResponse (new Builder <ResultSet >() {
117
+ return appendWithResponse (GraphCommand .QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING ), new Builder <ResultSet >() {
120
118
@ SuppressWarnings ("unchecked" )
121
119
@ Override
122
120
public ResultSet build (Object o ) {
@@ -133,9 +131,7 @@ public ResultSet build(Object o) {
133
131
*/
134
132
@ Override
135
133
public Response <ResultSet > readOnlyQuery (String query , Map <String , Object > params ) {
136
- String preparedQuery = Utils .prepareQuery (query , params );
137
- client .sendCommand (GraphCommand .RO_QUERY , graphId , preparedQuery , Utils .COMPACT_STRING );
138
- return getResponse (new Builder <ResultSet >() {
134
+ return appendWithResponse (GraphCommand .RO_QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING ), new Builder <ResultSet >() {
139
135
@ SuppressWarnings ("unchecked" )
140
136
@ Override
141
137
public ResultSet build (Object o ) {
@@ -156,10 +152,8 @@ public ResultSet build(Object o) {
156
152
*/
157
153
@ Override
158
154
public Response <ResultSet > query (String query , Map <String , Object > params , long timeout ) {
159
- String preparedQuery = Utils .prepareQuery (query , params );
160
- client .sendCommand (GraphCommand .QUERY , graphId , preparedQuery , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
161
- Long .toString (timeout ));
162
- return getResponse (new Builder <ResultSet >() {
155
+ return appendWithResponse (GraphCommand .QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
156
+ Long .toString (timeout )), new Builder <ResultSet >() {
163
157
@ SuppressWarnings ("unchecked" )
164
158
@ Override
165
159
public ResultSet build (Object o ) {
@@ -180,11 +174,8 @@ public ResultSet build(Object o) {
180
174
*/
181
175
@ Override
182
176
public Response <ResultSet > readOnlyQuery (String query , Map <String , Object > params , long timeout ) {
183
- String preparedQuery = Utils .prepareQuery (query , params );
184
- client .sendCommand (GraphCommand .RO_QUERY , graphId , preparedQuery , Utils .COMPACT_STRING ,
185
- Utils .TIMEOUT_STRING ,
186
- Long .toString (timeout ));
187
- return getResponse (new Builder <ResultSet >() {
177
+ return appendWithResponse (GraphCommand .RO_QUERY , Arrays .asList (graphId , query , Utils .COMPACT_STRING , Utils .TIMEOUT_STRING ,
178
+ Long .toString (timeout )), new Builder <ResultSet >() {
188
179
@ SuppressWarnings ("unchecked" )
189
180
@ Override
190
181
public ResultSet build (Object o ) {
@@ -235,8 +226,7 @@ public Response<ResultSet> callProcedure(String procedure, List<String> args,
235
226
*/
236
227
@ Override
237
228
public Response <String > copyGraph (String destinationGraphId ) {
238
- client .sendCommand (GraphCommand .COPY , graphId , destinationGraphId );
239
- return getResponse (BuilderFactory .STRING );
229
+ return appendWithResponse (GraphCommand .COPY , Arrays .asList (graphId , destinationGraphId ), BuilderFactory .STRING );
240
230
}
241
231
242
232
@@ -246,10 +236,11 @@ public Response<String> copyGraph(String destinationGraphId) {
246
236
*/
247
237
@ Override
248
238
public Response <String > deleteGraph (){
239
+ try {
240
+ return appendWithResponse (GraphCommand .DELETE , Arrays .asList (graphId ), BuilderFactory .STRING );
241
+ } finally {
242
+ cache .clear ();
243
+ }
249
244
250
- client .sendCommand (GraphCommand .DELETE , graphId );
251
- Response <String > response = getResponse (BuilderFactory .STRING );
252
- cache .clear ();
253
- return response ;
254
245
}
255
246
}
0 commit comments