@@ -42,6 +42,32 @@ cdef class _OracleErrorInfo:
42
42
list batcherrors
43
43
44
44
45
+ @ cython.freelist (20 )
46
+ cdef class _PostProcessFn:
47
+ cdef:
48
+ object fn
49
+ bint convert_nulls
50
+ bint check_awaitable
51
+ uint32_t num_elements
52
+ list values
53
+
54
+ @staticmethod
55
+ cdef _PostProcessFn from_info(object fn, uint32_t num_elements,
56
+ list values, bint convert_nulls = False ,
57
+ bint check_awaitable = False ):
58
+ """
59
+ Create a post process function object and return it.
60
+ """
61
+ cdef _PostProcessFn fn_obj
62
+ fn_obj = _PostProcessFn.__new__ (_PostProcessFn)
63
+ fn_obj.fn = fn
64
+ fn_obj.convert_nulls = convert_nulls
65
+ fn_obj.check_awaitable = check_awaitable
66
+ fn_obj.num_elements = num_elements
67
+ fn_obj.values = values
68
+ return fn_obj
69
+
70
+
45
71
cdef class Message:
46
72
cdef:
47
73
BaseThinConnImpl conn_impl
@@ -736,6 +762,70 @@ cdef class MessageWithData(Message):
736
762
self .bit_vector = < const char_type* > self .bit_vector_buf.data.as_chars
737
763
memcpy(< void * > self .bit_vector, ptr, num_bytes)
738
764
765
+ cdef list _get_post_process_fns(self ):
766
+ """
767
+ Returns a list of functions that need to be run after the database
768
+ response has been completely received. These functions can be
769
+ internally defined (for wrapping implementation objects with user
770
+ facing objects) or user defined (out converters). This prevents
771
+ multiple executions of functions (reparsing of database responses for
772
+ older databases without the end of response indicator) or interference
773
+ with any ongoing database response. Returning a list allows this
774
+ process to be determined commonly across sync and async in order to
775
+ avoid duplicating code.
776
+ """
777
+ cdef:
778
+ OracleMetadata metadata
779
+ uint32_t num_elements
780
+ uint8_t ora_type_num
781
+ ThinVarImpl var_impl
782
+ _PostProcessFn fn
783
+ list fns = []
784
+ bint is_async
785
+ object cls
786
+ is_async = self .conn_impl._protocol._transport._is_async
787
+ if self .out_var_impls is not None :
788
+ for var_impl in self .out_var_impls:
789
+ if var_impl is None :
790
+ continue
791
+
792
+ # retain last raw value when not fetching Arrow (for handling
793
+ # duplicate rows)
794
+ if not self .cursor_impl.fetching_arrow:
795
+ var_impl._last_raw_value = \
796
+ var_impl._values[self .cursor_impl._last_row_index]
797
+
798
+ # determine the number of elements to process, if needed
799
+ if var_impl.is_array:
800
+ num_elements = var_impl.num_elements_in_array
801
+ else :
802
+ num_elements = self .row_index
803
+
804
+ # perform post conversion to user-facing objects, if applicable
805
+ if self .in_fetch:
806
+ metadata = var_impl._fetch_metadata
807
+ else :
808
+ metadata = var_impl.metadata
809
+ ora_type_num = metadata.dbtype._ora_type_num
810
+ if ora_type_num in (ORA_TYPE_NUM_CLOB,
811
+ ORA_TYPE_NUM_BLOB,
812
+ ORA_TYPE_NUM_BFILE):
813
+ cls = PY_TYPE_ASYNC_LOB if is_async else PY_TYPE_LOB
814
+ fn = _PostProcessFn.from_info(cls ._from_impl, num_elements,
815
+ var_impl._values)
816
+ fns.append(fn)
817
+
818
+ # perform post conversion via user out converter, if applicable
819
+ if var_impl.outconverter is None :
820
+ continue
821
+ fn = _PostProcessFn.from_info(var_impl.outconverter,
822
+ num_elements, var_impl._values,
823
+ var_impl.convert_nulls,
824
+ check_awaitable = True )
825
+ fns.append(fn)
826
+
827
+ return fns
828
+
739
829
cdef bint _is_duplicate_data(self , uint32_t column_num):
740
830
"""
741
831
Returns a boolean indicating if the given column contains data
@@ -1366,32 +1456,21 @@ cdef class MessageWithData(Message):
1366
1456
database round-trip.
1367
1457
"""
1368
1458
cdef:
1369
- uint32_t i, j, num_elements
1370
1459
object value, element_value
1371
- ThinVarImpl var_impl
1372
- if self .out_var_impls is None :
1373
- return 0
1374
- for var_impl in self .out_var_impls:
1375
- if var_impl is None or var_impl.outconverter is None :
1376
- continue
1377
- if not self .cursor_impl.fetching_arrow:
1378
- var_impl._last_raw_value = \
1379
- var_impl._values[self .cursor_impl._last_row_index]
1380
- if var_impl.is_array:
1381
- num_elements = var_impl.num_elements_in_array
1382
- else :
1383
- num_elements = self .row_index
1384
- for i in range (num_elements):
1385
- value = var_impl._values[i]
1386
- if value is None and not var_impl.convert_nulls:
1460
+ _PostProcessFn fn
1461
+ uint32_t i, j
1462
+ for fn in self ._get_post_process_fns():
1463
+ for i in range (fn.num_elements):
1464
+ value = fn.values[i]
1465
+ if value is None and not fn.convert_nulls:
1387
1466
continue
1388
1467
if isinstance (value, list ):
1389
1468
for j, element_value in enumerate (value):
1390
- if element_value is None :
1469
+ if element_value is None and not fn.convert_nulls :
1391
1470
continue
1392
- value[j] = var_impl.outconverter (element_value)
1471
+ value[j] = fn.fn (element_value)
1393
1472
else :
1394
- var_impl._values [i] = var_impl.outconverter (value)
1473
+ fn.values [i] = fn.fn (value)
1395
1474
1396
1475
async def postprocess_async(self ):
1397
1476
"""
@@ -1401,39 +1480,28 @@ cdef class MessageWithData(Message):
1401
1480
database round-trip.
1402
1481
"""
1403
1482
cdef:
1404
- object value, element_value, fn
1405
- uint32_t i, j, num_elements
1406
- ThinVarImpl var_impl
1407
- if self .out_var_impls is None :
1408
- return 0
1409
- for var_impl in self .out_var_impls:
1410
- if var_impl is None or var_impl.outconverter is None :
1411
- continue
1412
- if not self .cursor_impl.fetching_arrow:
1413
- var_impl._last_raw_value = \
1414
- var_impl._values[self .cursor_impl._last_row_index]
1415
- if var_impl.is_array:
1416
- num_elements = var_impl.num_elements_in_array
1417
- else :
1418
- num_elements = self .row_index
1419
- fn = var_impl.outconverter
1420
- for i in range (num_elements):
1421
- value = var_impl._values[i]
1422
- if value is None and not var_impl.convert_nulls:
1483
+ object value, element_value
1484
+ _PostProcessFn fn
1485
+ uint32_t i, j
1486
+ for fn in self ._get_post_process_fns():
1487
+ for i in range (fn.num_elements):
1488
+ value = fn.values[i]
1489
+ if value is None and not fn.convert_nulls:
1423
1490
continue
1424
1491
if isinstance (value, list ):
1425
1492
for j, element_value in enumerate (value):
1426
- if element_value is None :
1493
+ if element_value is None and not fn.convert_nulls :
1427
1494
continue
1428
- element_value = fn(element_value)
1429
- if inspect.isawaitable(element_value):
1495
+ element_value = fn.fn(element_value)
1496
+ if fn.check_awaitable \
1497
+ and inspect.isawaitable(element_value):
1430
1498
element_value = await element_value
1431
1499
value[j] = element_value
1432
1500
else :
1433
- value = fn(value)
1434
- if inspect.isawaitable(value):
1501
+ value = fn.fn (value)
1502
+ if fn.check_awaitable and inspect.isawaitable(value):
1435
1503
value = await value
1436
- var_impl._values [i] = value
1504
+ fn.values [i] = value
1437
1505
1438
1506
cdef int preprocess(self ) except - 1 :
1439
1507
cdef:
0 commit comments