Skip to content

Commit 654dad9

Browse files
committed
Add prefer_async param to TAPService constructor to control what mode search uses
1 parent 447cade commit 654dad9

File tree

5 files changed

+222
-5
lines changed

5 files changed

+222
-5
lines changed

CHANGES.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ Enhancements and Fixes
88

99
- Fix job result handling to prioritize standard URL structure [#684]
1010

11+
- Add prefer_async param to TAPService constructor to control what mode uses [#686]
12+
13+
1114
Deprecations and Removals
1215
-------------------------
1316

docs/dal/index.rst

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,16 @@ other channels. Most commonly, you will discover them in the VO
222222
registry (cf. :ref:`pyvo.registry<pyvo-registry>`).
223223

224224
To perform a query using ADQL, the ``search()`` method is used.
225+
For services that work better with asynchronous queries, you can configure
226+
the service to prefer async execution by default:
227+
228+
.. doctest-remote-data::
229+
230+
>>> tap_service = vo.dal.TAPService("http://dc.g-vo.org/tap", prefer_async=True)
231+
232+
This will make the ``search()`` method use asynchronous execution automatically,
233+
while still allowing you to force synchronous execution when needed.
234+
225235
TAPService instances have several methods to inspect the metadata
226236
of the service - in particular, what tables with what columns are
227237
available - discussed below.
@@ -250,9 +260,43 @@ robust of long-running queries. It also supports queuing queries,
250260
which allows service operators to be a lot more generous with
251261
resource limits.
252262

253-
To specify the query mode, you can use either ``run_sync()`` for
263+
There are a few ways to control the query mode:
264+
265+
**Explicit method calls:**
266+
To explicitly specify the query mode, you can use either ``run_sync()`` for
254267
synchronous query or ``run_async()`` for asynchronous query.
255268

269+
**Service-level preference:**
270+
Alternatively, you can configure whether you want the service to prefer async execution
271+
by passing the ``prefer_async`` parameter to the service constructor.
272+
Then the ``search()`` method will use async execution by default, but
273+
you can still force sync execution when needed by passing the
274+
``force_sync`` parameter to ``search()``.
275+
276+
.. doctest-remote-data::
277+
278+
>>> # Service configured to prefer async
279+
>>> service = vo.dal.TAPService("http://dc.g-vo.org/tap", prefer_async=True)
280+
>>> result = service.search(ex_query) # Uses async automatically
281+
>>> # Force sync execution when needed
282+
>>> result = service.search(ex_query, force_sync=True) # Uses sync
283+
284+
**Default behavior:**
285+
By default, ``search()`` will use synchronous execution:
286+
287+
.. doctest-remote-data::
288+
289+
>>> # Default service (prefer_async=False)
290+
>>> service = vo.dal.TAPService("http://dc.g-vo.org/tap")
291+
>>> result = service.search(ex_query) # Always uses sync
292+
293+
**Running as AsyncTAPJobs:**
294+
The methods above (``search()`` with ``prefer_async=True`` and ``run_async()``)
295+
handle job lifecycle automatically. However, you may also want to work with
296+
job objects directly, which can give more control over the query execution
297+
and job information (for example some TAP services may display progress
298+
information in the job output).
299+
256300
.. doctest-remote-data::
257301

258302
>>> job = tap_service.submit_job(ex_query)
@@ -336,6 +380,19 @@ With ``run_async()`` you basically submit an asynchronous query and
336380
return its result. It is like running ``submit_job()`` first and then
337381
run the query manually.
338382

383+
Choosing the right execution mode
384+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
385+
386+
**Use synchronous queries when:**
387+
- Running quick queries (typically < 1 minute)
388+
- The service has efficient sync implementation
389+
- Query results are relatively small
390+
391+
**Use asynchronous queries when:**
392+
- Running long-running queries where query might hit sync timeout limits
393+
- Working with services that have more robust async execution
394+
- You want access to (UWS) jobs you have run and metadata or results for any of your async jobs
395+
339396
Query limit
340397
^^^^^^^^^^^
341398

docs/io/uws.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ While the examples above focus on individual jobs, you can also parse job lists:
382382
>>> # Iterate through jobs (each is a JobSummary object)
383383
>>> for job in jobs: # doctest: +ELLIPSIS
384384
... print(f"Job {job.jobid}: {job.phase}")
385-
Job ...: COMPLETED
385+
Job ...: ...
386386

387387

388388
Error Handling

pyvo/dal/tap.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ class TAPService(DALService, AvailabilityMixin, CapabilityMixin):
107107
_tables = None
108108
_examples = None
109109

110-
def __init__(self, baseurl, *, capability_description=None, session=None):
110+
def __init__(self, baseurl, *, capability_description=None,
111+
session=None, prefer_async=False):
111112
"""
112113
instantiate a Table Access Protocol service
113114
@@ -117,6 +118,12 @@ def __init__(self, baseurl, *, capability_description=None, session=None):
117118
the base URL that should be used for forming queries to the service.
118119
session : object
119120
optional session to use for network requests
121+
capability_description : str, optional
122+
a capability description URL to use instead of the service
123+
capabilities.
124+
prefer_async : bool, optional
125+
Whether to prefer async queries over sync queries. (Default:
126+
false)
120127
"""
121128
try:
122129
super().__init__(baseurl, session=session, capability_description=capability_description)
@@ -131,6 +138,8 @@ def __init__(self, baseurl, *, capability_description=None, session=None):
131138
raise DALServiceError(f"Cannot find TAP service at '"
132139
f"{baseurl}'.\n\n{str(e)}") from None
133140

141+
self._prefer_async = prefer_async
142+
134143
def get_tap_capability(self):
135144
"""
136145
returns the (first) TAP capability of this service.
@@ -281,8 +290,55 @@ def run_sync(
281290
query, language=language, maxrec=maxrec, uploads=uploads,
282291
**keywords).execute()
283292

284-
# alias for service discovery
285-
search = run_sync
293+
def search(self, query, *, language="ADQL", maxrec=None, uploads=None,
294+
force_sync=False, **keywords):
295+
"""
296+
submit a TAP query and return its result.
297+
Checks the prefer_async setting and runs the query synchronously or
298+
asynchronously depending on the setting.
299+
300+
Parameters
301+
----------
302+
query : str
303+
The query string / parameters
304+
language : str
305+
specifies the query language, default ADQL.
306+
useful for services which allow to use the backend query language.
307+
maxrec : int
308+
the maximum records to return. defaults to the service default
309+
uploads : dict
310+
a mapping from table names to file like objects containing a votable
311+
force_sync : bool
312+
if True, forces synchronous execution regardless of prefer_async setting
313+
**keywords
314+
additional keyword arguments passed to the underlying query method
315+
316+
Returns
317+
-------
318+
TAPResults
319+
the query result
320+
321+
Raises
322+
------
323+
DALServiceError
324+
for errors connecting to or communicating with the service
325+
DALQueryError
326+
for errors either in the input query syntax or
327+
other user errors detected by the service
328+
DALFormatError
329+
for errors parsing the VOTable response
330+
331+
See Also
332+
--------
333+
TAPResults
334+
AsyncTAPJob
335+
"""
336+
if self._prefer_async and not force_sync:
337+
return self.run_async(query, language=language, maxrec=maxrec,
338+
uploads=uploads, **keywords)
339+
else:
340+
return self.run_sync(query, language=language, maxrec=maxrec,
341+
uploads=uploads, **keywords)
286342

287343
def run_async(
288344
self, query, *, language="ADQL", maxrec=None, uploads=None,

pyvo/dal/tests/test_tap.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,107 @@ def test_get_udf(self, tapservice):
10991099
func = tapservice.get_tap_capability().get_adql().get_udf("IVO_hasword") # case insensitive!
11001100
assert func.form == "ivo_hasword(haystack TEXT, needle TEXT) -> INTEGER"
11011101

1102+
def test_prefer_async_default_false(self):
1103+
service = TAPService('http://example.com/tap')
1104+
assert service._prefer_async is False
1105+
1106+
def test_prefer_async_parameter(self):
1107+
service = TAPService('http://example.com/tap', prefer_async=True)
1108+
assert service._prefer_async is True
1109+
1110+
@pytest.mark.usefixtures('sync_fixture')
1111+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W27")
1112+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W48")
1113+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W06")
1114+
def test_search_default_behavior_unchanged(self):
1115+
service = TAPService('http://example.com/tap')
1116+
1117+
from unittest.mock import patch
1118+
with patch.object(service, 'run_sync') as mock_sync, \
1119+
patch.object(service, 'run_async') as mock_async:
1120+
mock_sync.return_value = "sync_result"
1121+
mock_async.return_value = "async_result"
1122+
result = service.search("SELECT * FROM ivoa.obscore")
1123+
mock_sync.assert_called_once_with(
1124+
"SELECT * FROM ivoa.obscore",
1125+
language="ADQL",
1126+
maxrec=None,
1127+
uploads=None
1128+
)
1129+
mock_async.assert_not_called()
1130+
assert result == "sync_result"
1131+
1132+
@pytest.mark.usefixtures('async_fixture')
1133+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W27")
1134+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W48")
1135+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W06")
1136+
def test_search_prefer_async_true(self):
1137+
service = TAPService('http://example.com/tap', prefer_async=True)
1138+
1139+
from unittest.mock import patch
1140+
with patch.object(service, 'run_sync') as mock_sync, \
1141+
patch.object(service, 'run_async') as mock_async:
1142+
mock_sync.return_value = "sync_result"
1143+
mock_async.return_value = "async_result"
1144+
result = service.search("SELECT * FROM ivoa.obscore")
1145+
mock_async.assert_called_once_with(
1146+
"SELECT * FROM ivoa.obscore",
1147+
language="ADQL",
1148+
maxrec=None,
1149+
uploads=None
1150+
)
1151+
mock_sync.assert_not_called()
1152+
assert result == "async_result"
1153+
1154+
@pytest.mark.usefixtures('sync_fixture')
1155+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W27")
1156+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W48")
1157+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W06")
1158+
def test_search_force_sync_overrides_prefer_async(self):
1159+
service = TAPService('http://example.com/tap', prefer_async=True)
1160+
1161+
from unittest.mock import patch
1162+
with patch.object(service, 'run_sync') as mock_sync, \
1163+
patch.object(service, 'run_async') as mock_async:
1164+
1165+
mock_sync.return_value = "sync_result"
1166+
mock_async.return_value = "async_result"
1167+
result = service.search("SELECT * FROM ivoa.obscore", force_sync=True)
1168+
mock_sync.assert_called_once_with(
1169+
"SELECT * FROM ivoa.obscore",
1170+
language="ADQL",
1171+
maxrec=None,
1172+
uploads=None
1173+
)
1174+
mock_async.assert_not_called()
1175+
assert result == "sync_result"
1176+
1177+
@pytest.mark.usefixtures('async_fixture')
1178+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W27")
1179+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W48")
1180+
@pytest.mark.filterwarnings("ignore::astropy.io.votable.exceptions.W06")
1181+
def test_search_parameters_passed_through(self):
1182+
service = TAPService('http://example.com/tap', prefer_async=True)
1183+
1184+
uploads = {'test_table': 'test_data'}
1185+
from unittest.mock import patch
1186+
with patch.object(service, 'run_async') as mock_async:
1187+
mock_async.return_value = "async_result"
1188+
service.search(
1189+
"SELECT * FROM ivoa.obscore",
1190+
language="SQL",
1191+
maxrec=100,
1192+
uploads=uploads,
1193+
extra_param="test_value"
1194+
)
1195+
mock_async.assert_called_once_with(
1196+
"SELECT * FROM ivoa.obscore",
1197+
language="SQL",
1198+
maxrec=100,
1199+
uploads=uploads,
1200+
extra_param="test_value"
1201+
)
1202+
11021203

11031204
def test_get_endpoint_candidates():
11041205
# Directly instantiate the TAPService with a known base URL

0 commit comments

Comments
 (0)