Skip to content

Commit 144f149

Browse files
authored
Merge pull request #878 from neo4j/arrow-flakyness
Fixes to enable diagnose of flaky `graph.construct` test using Arrow
2 parents 5e72391 + de12324 commit 144f149

File tree

3 files changed

+17
-9
lines changed

3 files changed

+17
-9
lines changed

graphdatascience/query_runner/arrow_graph_constructor.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import concurrent
4+
import logging
45
import math
56
import warnings
67
from concurrent.futures import ThreadPoolExecutor
@@ -33,6 +34,7 @@ def __init__(
3334
)
3435
self._chunk_size = chunk_size
3536
self._min_batch_size = chunk_size * 10
37+
self._logger = logging.getLogger()
3638

3739
def run(self, node_dfs: list[DataFrame], relationship_dfs: list[DataFrame]) -> None:
3840
try:
@@ -60,8 +62,11 @@ def run(self, node_dfs: list[DataFrame], relationship_dfs: list[DataFrame]) -> N
6062

6163
self._client.relationship_load_done(self._graph_name)
6264
except (Exception, KeyboardInterrupt) as e:
63-
self._client.abort(self._graph_name)
64-
65+
try:
66+
self._client.abort(self._graph_name)
67+
except Exception as abort_exception:
68+
if "No arrow process" not in str(abort_exception):
69+
self._logger.warning(f"error aborting graph creation: {abort_exception}")
6570
raise e
6671

6772
def _partition_dfs(self, dfs: list[DataFrame]) -> list[DataFrame]:

graphdatascience/query_runner/gds_arrow_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,10 @@ def _send_action(self, action_type: str, meta_data: dict[str, Any]) -> dict[str,
640640
)
641641
def send_with_retry() -> dict[str, Any]:
642642
try:
643-
result = client.do_action(flight.Action(action_type, json.dumps(meta_data).encode("utf-8")))
643+
result = client.do_action(
644+
action=flight.Action(action_type, json.dumps(meta_data).encode("utf-8")),
645+
options=flight.FlightCallOptions(timeout=20.0),
646+
)
644647

645648
# Consume result fully to sanity check and avoid cancelled streams
646649
collected_result = list(result)

graphdatascience/tests/integration/test_graph_construct.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,9 @@ def test_imdb_graph_with_arrow(gds: GraphDataScience) -> None:
145145
G.drop()
146146

147147

148-
@pytest.mark.filterwarnings("ignore: GDS Enterprise users can use Apache Arrow")
149148
@pytest.mark.compatible_with(min_inclusive=ServerVersion(2, 3, 0))
150-
def test_lastfm_graph_without_arrow(gds_without_arrow: GraphDataScience) -> None:
151-
G = gds_without_arrow.graph.load_lastfm()
149+
def test_lastfm_graph_with_arrow(gds: GraphDataScience) -> None:
150+
G = gds.graph.load_lastfm()
152151

153152
try:
154153
assert G.node_count() == 19914
@@ -162,14 +161,14 @@ def test_lastfm_graph_without_arrow(gds_without_arrow: GraphDataScience) -> None
162161
assert set(G.relationship_properties("TAGGED")) == {"day", "month", "year", "tagID", "timestamp"}
163162
assert set(G.relationship_properties("LISTEN_TO")) == {"weight"}
164163
assert set(G.relationship_properties("IS_FRIEND")) == set()
165-
166164
finally:
167165
G.drop()
168166

169167

168+
@pytest.mark.filterwarnings("ignore: GDS Enterprise users can use Apache Arrow")
170169
@pytest.mark.compatible_with(min_inclusive=ServerVersion(2, 3, 0))
171-
def test_lastfm_graph_with_arrow(gds: GraphDataScience) -> None:
172-
G = gds.graph.load_lastfm()
170+
def test_lastfm_graph_without_arrow(gds_without_arrow: GraphDataScience) -> None:
171+
G = gds_without_arrow.graph.load_lastfm()
173172

174173
try:
175174
assert G.node_count() == 19914
@@ -183,6 +182,7 @@ def test_lastfm_graph_with_arrow(gds: GraphDataScience) -> None:
183182
assert set(G.relationship_properties("TAGGED")) == {"day", "month", "year", "tagID", "timestamp"}
184183
assert set(G.relationship_properties("LISTEN_TO")) == {"weight"}
185184
assert set(G.relationship_properties("IS_FRIEND")) == set()
185+
186186
finally:
187187
G.drop()
188188

0 commit comments

Comments
 (0)