Skip to content

Commit d1acebc

Browse files
authored
Merge pull request #106 from DarthMax/arrow_export_multiple_properties
Arrow export multiple properties
2 parents f694b8b + 0dd74fd commit d1acebc

File tree

8 files changed

+187
-19
lines changed

8 files changed

+187
-19
lines changed

changelog/1.1.0.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
## Breaking changes
5-
5+
66

77
## New features
88

@@ -21,7 +21,9 @@
2121

2222
## Improvements
2323

24-
* The functions `gds.graph.streamNodeProperty` and `gds.graph.streamRelationshipProperty` can leverage the Arrow Flight server of GDS to improve throughput.
24+
* The functions `gds.graph.streamNodeProperty`, `gds.graph.streamNodeProperties`, `gds.graph.streamRelationshipProperty` and `gds.graph.streamRelationshipProperties` can leverage the Arrow Flight server of GDS to improve throughput.
25+
* The calls to `gds.graph.streamNodeProperties` and `gds.graph.streamRelationshipProperties` can now return data in an improved format.
26+
* By setting `separate_property_columns` to `True` the return format will be `nodeId, property1, property2, ...` and `sourceNodeId, targetNodeId, relationshipType, property1, property2, ...`.
2527
* Improved error message of `gds.graph.get` to include currently targeted database if graph not found.
2628
* Added inline progress bar for project and algorithm procedures with adequate server logging support.
2729

graphdatascience/graph/graph_proc_runner.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,23 @@ def streamNodeProperties(
104104
G: Graph,
105105
node_properties: List[str],
106106
node_labels: Strings = ["*"],
107+
separate_property_columns: bool = False,
107108
**config: Any,
108109
) -> DataFrame:
109110
self._namespace += ".streamNodeProperties"
110111

111-
return self._handle_properties(G, node_properties, node_labels, config)
112+
result = self._handle_properties(G, node_properties, node_labels, config)
113+
114+
# new format was requested, but the query was run via Cypher
115+
if separate_property_columns and "propertyValue" in result.keys():
116+
return result.pivot_table("propertyValue", "nodeId", columns="nodeProperty").reset_index()
117+
# old format was requested but the query was run via Arrow
118+
elif not separate_property_columns and "propertyValue" not in result.keys():
119+
return result.melt(id_vars=["nodeId"]).rename(
120+
columns={"variable": "nodeProperty", "value": "propertyValue"}
121+
)
122+
123+
return result
112124

113125
def streamNodeProperty(
114126
self,
@@ -126,11 +138,25 @@ def streamRelationshipProperties(
126138
G: Graph,
127139
relationship_properties: List[str],
128140
relationship_types: Strings = ["*"],
141+
separate_property_columns: bool = False,
129142
**config: Any,
130143
) -> DataFrame:
131144
self._namespace += ".streamRelationshipProperties"
132145

133-
return self._handle_properties(G, relationship_properties, relationship_types, config)
146+
result = self._handle_properties(G, relationship_properties, relationship_types, config)
147+
148+
# new format was requested, but the query was run via Cypher
149+
if separate_property_columns and "propertyValue" in result.keys():
150+
return result.pivot_table(
151+
"propertyValue", ["sourceNodeId", "targetNodeId", "relationshipType"], columns="relationshipProperty"
152+
).reset_index()
153+
# old format was requested but the query was run via Arrow
154+
elif not separate_property_columns and "propertyValue" not in result.keys():
155+
return result.melt(id_vars=["sourceNodeId", "targetNodeId", "relationshipType"]).rename(
156+
columns={"variable": "relationshipProperty", "value": "propertyValue"}
157+
)
158+
159+
return result
134160

135161
def streamRelationshipProperty(
136162
self,

graphdatascience/query_runner/arrow_query_runner.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ def run_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> Data
4747
return self._run_arrow_property_get(
4848
graph_name, "gds.graph.streamNodeProperty", {"node_property": property_name, "node_labels": node_labels}
4949
)
50+
elif "gds.graph.streamNodeProperties" in query:
51+
graph_name = params["graph_name"]
52+
property_names = params["properties"]
53+
node_labels = params["entities"]
54+
return self._run_arrow_property_get(
55+
graph_name,
56+
"gds.graph.streamNodeProperties",
57+
{"node_properties": property_names, "node_labels": node_labels},
58+
)
5059
elif "gds.graph.streamRelationshipProperty" in query:
5160
graph_name = params["graph_name"]
5261
property_name = params["properties"]
@@ -56,6 +65,15 @@ def run_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> Data
5665
"gds.graph.streamRelationshipProperty",
5766
{"relationship_property": property_name, "relationship_types": relationship_types},
5867
)
68+
elif "gds.graph.streamRelationshipProperties" in query:
69+
graph_name = params["graph_name"]
70+
property_names = params["properties"]
71+
relationship_types = params["entities"]
72+
return self._run_arrow_property_get(
73+
graph_name,
74+
"gds.graph.streamRelationshipProperties",
75+
{"relationship_properties": property_names, "relationship_types": relationship_types},
76+
)
5977

6078
return self._fallback_query_runner.run_query(query, params)
6179

graphdatascience/tests/integration/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687")
1111

12-
AUTH = None
12+
AUTH = ("neo4j", "password")
1313
if os.environ.get("NEO4J_USER") is not None:
1414
AUTH = (
1515
os.environ.get("NEO4J_USER", "DUMMY"),

graphdatascience/tests/integration/test_graph_ops.py

Lines changed: 109 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ def run_around_tests(runner: Neo4jQueryRunner) -> Generator[None, None, None]:
1818
runner.run_query(
1919
"""
2020
CREATE
21-
(a: Node {x: 1}),
22-
(b: Node {x: 2}),
23-
(c: Node {x: 3}),
24-
(a)-[:REL {relX: 4}]->(b),
25-
(a)-[:REL {relX: 5}]->(c),
26-
(b)-[:REL {relX: 6}]->(c),
21+
(a: Node {x: 1, y: 2}),
22+
(b: Node {x: 2, y: 3}),
23+
(c: Node {x: 3, y: 4}),
24+
(a)-[:REL {relX: 4, relY: 5}]->(b),
25+
(a)-[:REL {relX: 5, relY: 6}]->(c),
26+
(b)-[:REL {relX: 6, relY: 7}]->(c),
2727
(b)-[:REL2]->(c)
2828
"""
2929
)
@@ -164,10 +164,52 @@ def test_graph_streamNodeProperty_without_arrow(gds_without_arrow: GraphDataScie
164164

165165

166166
def test_graph_streamNodeProperties(gds: GraphDataScience) -> None:
167-
G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": "x"}}, "*")
167+
G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*")
168168

169-
result = gds.graph.streamNodeProperties(G, ["x"], concurrency=2)
170-
assert {e for e in result["propertyValue"]} == {1, 2, 3}
169+
result = gds.graph.streamNodeProperties(G, ["x", "y"], concurrency=2)
170+
171+
assert list(result.keys()) == ["nodeId", "nodeProperty", "propertyValue"]
172+
173+
x_values = result[result.nodeProperty == "x"]
174+
assert {e for e in x_values["propertyValue"]} == {1, 2, 3}
175+
176+
y_values = result[result.nodeProperty == "y"]
177+
assert {e for e in y_values["propertyValue"]} == {2, 3, 4}
178+
179+
180+
def test_graph_streamNodeProperties_separate_property_columns(gds: GraphDataScience) -> None:
181+
G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*")
182+
183+
result = gds.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2)
184+
assert list(result.keys()) == ["nodeId", "x", "y"]
185+
assert {e for e in result["x"]} == {1, 2, 3}
186+
assert {e for e in result["y"]} == {2, 3, 4}
187+
188+
189+
def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None:
190+
G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*")
191+
192+
result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], concurrency=2)
193+
194+
assert list(result.keys()) == ["nodeId", "nodeProperty", "propertyValue"]
195+
196+
x_values = result[result.nodeProperty == "x"]
197+
assert {e for e in x_values["propertyValue"]} == {1, 2, 3}
198+
199+
y_values = result[result.nodeProperty == "y"]
200+
assert {e for e in y_values["propertyValue"]} == {2, 3, 4}
201+
202+
203+
def test_graph_streamNodeProperties_without_arrow_separate_property_columns(
204+
gds_without_arrow: GraphDataScience,
205+
) -> None:
206+
G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*")
207+
208+
result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2)
209+
210+
assert list(result.keys()) == ["nodeId", "x", "y"]
211+
assert {e for e in result["x"]} == {1, 2, 3}
212+
assert {e for e in result["y"]} == {2, 3, 4}
171213

172214

173215
def test_graph_streamRelationshipProperty(gds: GraphDataScience) -> None:
@@ -185,10 +227,65 @@ def test_graph_streamRelationshipProperty_without_arrow(gds_without_arrow: Graph
185227

186228

187229
def test_graph_streamRelationshipProperties(gds: GraphDataScience) -> None:
188-
G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": "relX"}})
230+
G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}})
189231

190-
result = gds.graph.streamRelationshipProperties(G, ["relX"], concurrency=2)
191-
assert {e for e in result["propertyValue"]} == {4, 5, 6}
232+
result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2)
233+
234+
assert list(result.keys()) == [
235+
"sourceNodeId",
236+
"targetNodeId",
237+
"relationshipType",
238+
"relationshipProperty",
239+
"propertyValue",
240+
]
241+
242+
x_values = result[result.relationshipProperty == "relX"]
243+
assert {e for e in x_values["propertyValue"]} == {4, 5, 6}
244+
y_values = result[result.relationshipProperty == "relY"]
245+
assert {e for e in y_values["propertyValue"]} == {5, 6, 7}
246+
247+
248+
def test_graph_streamRelationshipProperties_separate_property_columns(gds: GraphDataScience) -> None:
249+
G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}})
250+
251+
result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], separate_property_columns=True, concurrency=2)
252+
253+
assert list(result.keys()) == ["sourceNodeId", "targetNodeId", "relationshipType", "relX", "relY"]
254+
assert {e for e in result["relX"]} == {4, 5, 6}
255+
assert {e for e in result["relY"]} == {5, 6, 7}
256+
257+
258+
def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None:
259+
G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}})
260+
261+
result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2)
262+
263+
assert list(result.keys()) == [
264+
"sourceNodeId",
265+
"targetNodeId",
266+
"relationshipType",
267+
"relationshipProperty",
268+
"propertyValue",
269+
]
270+
271+
x_values = result[result.relationshipProperty == "relX"]
272+
assert {e for e in x_values["propertyValue"]} == {4, 5, 6}
273+
y_values = result[result.relationshipProperty == "relY"]
274+
assert {e for e in y_values["propertyValue"]} == {5, 6, 7}
275+
276+
277+
def test_graph_streamRelationshipProperties_without_arrow_separate_property_columns(
278+
gds_without_arrow: GraphDataScience,
279+
) -> None:
280+
G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}})
281+
282+
result = gds_without_arrow.graph.streamRelationshipProperties(
283+
G, ["relX", "relY"], separate_property_columns=True, concurrency=2
284+
)
285+
286+
assert list(result.keys()) == ["sourceNodeId", "targetNodeId", "relationshipType", "relX", "relY"]
287+
assert {e for e in result["relX"]} == {4, 5, 6}
288+
assert {e for e in result["relY"]} == {5, 6, 7}
192289

193290

194291
def test_graph_writeNodeProperties(gds: GraphDataScience) -> None:

graphdatascience/tests/unit/conftest.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
class CollectingQueryRunner(QueryRunner):
1717
def __init__(self, server_version: Union[str, ServerVersion]) -> None:
18+
self._mock_result: Optional[DataFrame] = None
1819
self.queries: List[str] = []
1920
self.params: List[Dict[str, Any]] = []
2021
self.server_version = server_version
@@ -27,7 +28,11 @@ def run_query(self, query: str, params: Optional[Dict[str, Any]] = None) -> Data
2728
self.params.append(params)
2829

2930
# This "mock" lets us initialize the GDS object without issues.
30-
return pandas.DataFrame([{"version": str(self.server_version)}])
31+
return (
32+
self._mock_result
33+
if self._mock_result is not None
34+
else pandas.DataFrame([{"version": str(self.server_version)}])
35+
)
3136

3237
def last_query(self) -> str:
3338
return self.queries[-1]
@@ -44,6 +49,9 @@ def database(self) -> str:
4449
def create_graph_constructor(self, _: str, __: int) -> GraphConstructor:
4550
raise NotImplementedError
4651

52+
def set__mock_result(self, result: DataFrame) -> None:
53+
self._mock_result = result
54+
4755

4856
@pytest.fixture
4957
def runner(server_version: ServerVersion) -> CollectingQueryRunner:

graphdatascience/tests/unit/test_graph_ops.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pandas
12
import pytest
23

34
from .conftest import CollectingQueryRunner
@@ -139,6 +140,8 @@ def test_graph_streamNodeProperty(runner: CollectingQueryRunner, gds: GraphDataS
139140
def test_graph_streamNodeProperties(runner: CollectingQueryRunner, gds: GraphDataScience) -> None:
140141
G, _ = gds.graph.project("g", "*", "*")
141142

143+
runner.set__mock_result(pandas.DataFrame([{"nodeId": 0, "dummyProp": 2}]))
144+
142145
gds.graph.streamNodeProperties(G, ["dummyProp"], concurrency=2)
143146
assert runner.last_query() == "CALL gds.graph.streamNodeProperties($graph_name, $properties, $entities, $config)"
144147
assert runner.last_params() == {
@@ -187,6 +190,20 @@ def test_graph_streamRelationshipProperty(runner: CollectingQueryRunner, gds: Gr
187190
def test_graph_streamRelationshipProperties(runner: CollectingQueryRunner, gds: GraphDataScience) -> None:
188191
G, _ = gds.graph.project("g", "*", "*")
189192

193+
result_df = pandas.DataFrame(
194+
[
195+
{
196+
"sourceNodeId": 0,
197+
"targetNodeId": 1,
198+
"relationshipType": "REL",
199+
"relationshipProperty": "dummyProp",
200+
"propertyValue": 2,
201+
}
202+
]
203+
)
204+
205+
runner.set__mock_result(result_df)
206+
190207
gds.graph.streamRelationshipProperties(G, ["dummyProp"], concurrency=2)
191208
assert (
192209
runner.last_query()

requirements/dev.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ pandas-stubs
77
pytest
88
pytest-annotate
99
tox
10-
types-setuptools
10+
types-setuptools

0 commit comments

Comments
 (0)