@@ -16,11 +16,18 @@ namespace ClickHouse.Client.Copy;
16
16
17
17
public class ClickHouseBulkCopy : IDisposable
18
18
{
19
- private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new ( ) ;
19
+ private static readonly RecyclableMemoryStreamManager CommonMemoryStreamManager = new ( new RecyclableMemoryStreamManager . Options
20
+ {
21
+ MaximumLargePoolFreeBytes = 512 * 1024 * 1024 ,
22
+ MaximumSmallPoolFreeBytes = 128 * 1024 * 1024 ,
23
+ BlockSize = 256 * 1024 ,
24
+ } ) ;
25
+
20
26
private readonly ClickHouseConnection connection ;
21
27
private readonly BatchSerializer batchSerializer ;
22
28
private readonly RowBinaryFormat rowBinaryFormat ;
23
29
private readonly bool ownsConnection ;
30
+ private readonly RecyclableMemoryStreamManager memoryStreamManager ;
24
31
private long rowsWritten ;
25
32
private ( string [ ] names , ClickHouseType [ ] types ) columnNamesAndTypes ;
26
33
@@ -85,12 +92,13 @@ public long RowsWritten
85
92
}
86
93
}
87
94
88
- private async Task < ( string [ ] names , ClickHouseType [ ] types ) > LoadNamesAndTypesAsync ( string destinationTableName , IReadOnlyCollection < string > columns = null )
95
+ /// <summary>
96
+ /// Gets RecyclableMemoryStreamManager used to create recyclable streams.
97
+ /// </summary>
98
+ public RecyclableMemoryStreamManager MemoryStreamManager
89
99
{
90
- using var reader = ( ClickHouseDataReader ) await connection . ExecuteReaderAsync ( $ "SELECT { GetColumnsExpression ( columns ) } FROM { DestinationTableName } WHERE 1=0") . ConfigureAwait ( false ) ;
91
- var types = reader . GetClickHouseColumnTypes ( ) ;
92
- var names = reader . GetColumnNames ( ) . Select ( c => c . EncloseColumnName ( ) ) . ToArray ( ) ;
93
- return ( names , types ) ;
100
+ get { return memoryStreamManager ?? CommonMemoryStreamManager ; }
101
+ init { memoryStreamManager = value ; }
94
102
}
95
103
96
104
/// <summary>
@@ -172,11 +180,19 @@ public async Task WriteToServerAsync(IEnumerable<object[]> rows, CancellationTok
172
180
await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
173
181
}
174
182
183
+ private async Task < ( string [ ] names , ClickHouseType [ ] types ) > LoadNamesAndTypesAsync ( string destinationTableName , IReadOnlyCollection < string > columns = null )
184
+ {
185
+ using var reader = ( ClickHouseDataReader ) await connection . ExecuteReaderAsync ( $ "SELECT { GetColumnsExpression ( columns ) } FROM { DestinationTableName } WHERE 1=0") . ConfigureAwait ( false ) ;
186
+ var types = reader . GetClickHouseColumnTypes ( ) ;
187
+ var names = reader . GetColumnNames ( ) . Select ( c => c . EncloseColumnName ( ) ) . ToArray ( ) ;
188
+ return ( names , types ) ;
189
+ }
190
+
175
191
private async Task SendBatchAsync ( Batch batch , CancellationToken token )
176
192
{
177
193
using ( batch ) // Dispose object regardless whether sending succeeds
178
194
{
179
- using var stream = MemoryStreamManager . GetStream ( nameof ( SendBatchAsync ) ) ;
195
+ using var stream = MemoryStreamManager . GetStream ( nameof ( SendBatchAsync ) , 128 * 1024 ) ;
180
196
// Async serialization
181
197
await Task . Run ( ( ) => batchSerializer . Serialize ( batch , stream ) , token ) . ConfigureAwait ( false ) ;
182
198
// Seek to beginning as after writing it's at end
0 commit comments