diff --git a/ClickHouse.Client.Tests/ADO/ConnectionCancellableTests.cs b/ClickHouse.Client.Tests/ADO/ConnectionCancellableTests.cs new file mode 100644 index 00000000..9ddbc7a3 --- /dev/null +++ b/ClickHouse.Client.Tests/ADO/ConnectionCancellableTests.cs @@ -0,0 +1,150 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Diagnostics; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using ClickHouse.Client.ADO; +using ClickHouse.Client.ADO.Parameters; +using ClickHouse.Client.Utility; +using NUnit.Framework; + +#if NET7_0_OR_GREATER + +namespace ClickHouse.Client.Tests.ADO; + +public class ConnectionCancellableTests : AbstractConnectionTestFixture +{ + #region IHttpClientFactory + internal class TestException : Exception + { + public string Parameter { get; private set; } + public TestException(string parameter) : base() + { + Parameter = parameter; + } + } + + internal class HttpClientFactoryFake : IHttpClientFactory + { + public HttpClient CreateClient(string name) + { + throw new TestException($"HttpClientFactoryFake:CreateClient: {name}"); + } + } + #endregion + + [Test] + public async Task ShouldCreateCancellableConnectionWithProvidedHttpClient() + { + using var httpClientHandler = new HttpClientHandler() { AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate }; + using var httpClient = new HttpClient(httpClientHandler); + using var connection = new ClickHouseCancellableConnection(TestUtilities.GetConnectionStringBuilder().ToString(), httpClient); + await connection.OpenAsync(); + ClassicAssert.IsNotEmpty(connection.ServerVersion); + } + + [Test] + public void ShouldCreateCancellableConnectionWithProvidedHttpClientName() + { + using var httpClientHandler = new HttpClientHandler() { AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate }; + using var httpClient = new HttpClient(httpClientHandler); + using var connection = new ClickHouseCancellableConnection(TestUtilities.GetConnectionStringBuilder().ToString(), new HttpClientFactoryFake(), "TestMe"); + Assert.Catch(() => connection.Open(), "HttpClientFactoryFake:CreateClient: TestMe"); + } + + [Test] + public void ShouldCreateCommandCancellable() + { + using var connection = new ClickHouseCancellableConnection(); + var command1 = connection.CreateCommand(); + Assert.That(command1.GetType(), Is.EqualTo(typeof(ClickHouseCancellableCommand))); + Assert.That(command1.ClickHouseConnection, Is.EqualTo(connection)); + } + + // see: https://github.com/linq2db/linq2db/discussions/4966 + // see: https://github.com/DarkWanderer/ClickHouse.Client/issues/489 + [Test] + [Explicit("In linq2db you may call the same command multiple times in different SQLs")] + public async Task MultiTimeCallOneCommandAndFallExceptionWhenHasQueryId() + { + string queryId = "MyQueryId123456"; + var command = cancellableConnection.CreateCommand(); + command.QueryId = queryId; + + try + { + List tasks = new List(2); + + command.CommandText = "SELECT sleep(2) FROM system.numbers LIMIT 100"; + tasks.Add(command.ExecuteScalarAsync()); + + command.CommandText = "SELECT sleep(3) FROM system.numbers LIMIT 200"; // this is another query with the same DbCommand/QueryId + tasks.Add(command.ExecuteScalarAsync()); + + await Task.WhenAll(tasks); + } + catch (ClickHouseServerException ex) when (ex.ErrorCode == 216) + { + Assert.Fail("The query id is running."); + } + catch (Exception) + { + Assert.Fail("Query throw another exception"); + } + } + + // see: https://github.com/DarkWanderer/ClickHouse.Client/discussions/482 + [Test] + [Explicit("Support Cancellation")] + public async Task SupportCancellation() + { + string queryId = "MyQueryId123456"; + var command = cancellableConnection.CreateCommand(); + command.CommandText = "SELECT *\r\nFROM (SELECT sleep(3), '0' as num FROM system.numbers LIMIT 100) t1\r\nINNER JOIN (SELECT sleep(3), '0' as num FROM system.numbers LIMIT 100) t2 on t1.num = t2.num"; + command.QueryId = queryId; + + var commandRunning = connection.CreateCommand(); + commandRunning.CommandText = $"SELECT count(*) FROM system.processes where query_id like '{queryId}';"; + + CancellationTokenSource cts = new CancellationTokenSource(); + + async Task cancelAsync(CancellationTokenSource cancellationTokenSource) + { + await Task.Delay(1000); + cancellationTokenSource.Cancel(); + } + + Stopwatch sw = Stopwatch.StartNew(); + try + { + _ = Task.Run(async () => await cancelAsync(cts)); + + await command.ExecuteScalarAsync(cts.Token); + sw.Stop(); + + if (sw.ElapsedMilliseconds > 5000) + Assert.Fail("The query was not cancelled in time"); + + Assert.Fail("The query did not throw an exception"); + } + catch (OperationCanceledException) + { + // Expected exception as operation canceled + + ulong num = (ulong)commandRunning.ExecuteScalar(); + if (num > 0) + Assert.Fail("The query was not cancelled in time, it is still running on the server"); + } + catch (Exception) + { + Assert.Fail("Query throw another exception"); + } + sw.Stop(); + } +} +#endif diff --git a/ClickHouse.Client.Tests/ADO/ConnectionTests.cs b/ClickHouse.Client.Tests/ADO/ConnectionTests.cs index 77c44a61..8aa47341 100644 --- a/ClickHouse.Client.Tests/ADO/ConnectionTests.cs +++ b/ClickHouse.Client.Tests/ADO/ConnectionTests.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Generic; using System.Data; +using System.Diagnostics; using System.Linq; using System.Net; using System.Net.Http; diff --git a/ClickHouse.Client.Tests/AbstractConnectionTestFixture.cs b/ClickHouse.Client.Tests/AbstractConnectionTestFixture.cs index f1a5fe18..aa632f4c 100644 --- a/ClickHouse.Client.Tests/AbstractConnectionTestFixture.cs +++ b/ClickHouse.Client.Tests/AbstractConnectionTestFixture.cs @@ -10,9 +10,16 @@ public class AbstractConnectionTestFixture : IDisposable { protected readonly ClickHouseConnection connection; +#if NET7_0_OR_GREATER + protected readonly ClickHouseCancellableConnection cancellableConnection; +#endif + protected AbstractConnectionTestFixture() { connection = TestUtilities.GetTestClickHouseConnection(); +#if NET7_0_OR_GREATER + cancellableConnection = TestUtilities.GetTestClickHouseCancellableConnection(); +#endif using var command = connection.CreateCommand(); command.CommandText = "CREATE DATABASE IF NOT EXISTS test;"; command.ExecuteScalar(); diff --git a/ClickHouse.Client.Tests/TestUtilities.cs b/ClickHouse.Client.Tests/TestUtilities.cs index ea2d6a32..d4c54cfe 100644 --- a/ClickHouse.Client.Tests/TestUtilities.cs +++ b/ClickHouse.Client.Tests/TestUtilities.cs @@ -45,6 +45,28 @@ public static async Task ExpectedFeaturesShouldMatchActualFeatures() /// /// public static ClickHouseConnection GetTestClickHouseConnection(bool compression = true, bool session = false, bool customDecimals = true) + { + ClickHouseConnectionStringBuilder builder = SetupConnectionStringBuilder(compression, session, customDecimals); + var connection = new ClickHouseConnection(builder.ConnectionString); + connection.Open(); + return connection; + } + +#if NET7_0_OR_GREATER + /// + /// Utility method to allow to redirect ClickHouse connections to different machine, in case of Windows development environment + /// + /// + public static ClickHouseCancellableConnection GetTestClickHouseCancellableConnection(bool compression = true, bool session = false, bool customDecimals = true) + { + ClickHouseConnectionStringBuilder builder = SetupConnectionStringBuilder(compression, session, customDecimals); + var connection = new ClickHouseCancellableConnection(builder.ConnectionString); + connection.Open(); + return connection; + } +#endif + + private static ClickHouseConnectionStringBuilder SetupConnectionStringBuilder(bool compression, bool session, bool customDecimals) { var builder = GetConnectionStringBuilder(); builder.Compression = compression; @@ -70,9 +92,8 @@ public static ClickHouseConnection GetTestClickHouseConnection(bool compression { builder["set_allow_experimental_dynamic_type"] = 1; } - var connection = new ClickHouseConnection(builder.ConnectionString); - connection.Open(); - return connection; + + return builder; } public static ClickHouseConnectionStringBuilder GetConnectionStringBuilder() diff --git a/ClickHouse.Client/ADO/ClickHouseCancellableCommand.cs b/ClickHouse.Client/ADO/ClickHouseCancellableCommand.cs new file mode 100644 index 00000000..53f9bb75 --- /dev/null +++ b/ClickHouse.Client/ADO/ClickHouseCancellableCommand.cs @@ -0,0 +1,184 @@ +using System; +using System.Data; +using System.Data.Common; +using System.Runtime.ExceptionServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using ClickHouse.Client.ADO.Parameters; +using ClickHouse.Client.ADO.Readers; +using ClickHouse.Client.Formats; + +#if NET7_0_OR_GREATER + +namespace ClickHouse.Client.ADO; + +public class ClickHouseCancellableCommand : ClickHouseCommand +{ + public ClickHouseCancellableConnection ClickHouseConnection => (ClickHouseCancellableConnection)DbConnection; + + internal ClickHouseParameterCollection ClickHouseParameters => (ClickHouseParameterCollection)DbParameterCollection; + + public ClickHouseCancellableCommand() + : base() + { + } + + public ClickHouseCancellableCommand(ClickHouseConnection connection) + : base(connection) + { + } + + private async Task CancelQuery(string queryId) + { + if (string.IsNullOrEmpty(queryId)) return; + + System.Diagnostics.Trace.WriteLine($"QueryId '{queryId}' is canceld."); + + using ClickHouseCommand command = ClickHouseConnection.CreateCommand(); + command.CommandText = $"KILL QUERY WHERE query_id = '{queryId}'"; + int response = await command.ExecuteNonQueryAsync().ConfigureAwait(false); + } + +#pragma warning disable CA2215 // Dispose methods should call base class dispose + protected override void Dispose(bool disposing) +#pragma warning restore CA2215 // Dispose methods should call base class dispose + { + if (disposing) + { + // Dispose token source with delay + _ = Task.Run(async () => + { + await Task.Delay(1000).ConfigureAwait(false); + + base.Dispose(disposing); + }); + } + } + + public override Task ExecuteNonQueryAsync(CancellationToken cancellationToken) => ExecuteNonQueryAsync(new ClickHouseCancellableCommandRunner(), cancellationToken); + + public virtual async Task ExecuteNonQueryAsync(ClickHouseCancellableCommandRunner runner, CancellationToken cancellationToken) + { + if (ClickHouseConnection == null) + throw new InvalidOperationException("Connection is not set"); + + try + { + using var lcts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken); + using var response = await runner.PostSqlQueryAsync(this, CommandText, lcts.Token).ConfigureAwait(false); + using var reader = new ExtendedBinaryReader(await response.Content.ReadAsStreamAsync(lcts.Token).ConfigureAwait(false)); + + return reader.PeekChar() != -1 ? reader.Read7BitEncodedInt() : 0; + } + catch (OperationCanceledException ex) + { + try + { + await CancelQuery(runner.QueryId).ConfigureAwait(false); + } + catch + { + } + + ExceptionDispatchInfo.Capture(ex).Throw(); + } + catch (Exception ex) + { + ExceptionDispatchInfo.Capture(ex).Throw(); + } + return -1; // no here + } + + /// + /// Allows to return raw result from a query (with custom FORMAT) + /// + /// Cancellation token + /// ClickHouseRawResult object containing response stream + public override Task ExecuteRawResultAsync(CancellationToken cancellationToken) => ExecuteRawResultAsync(new ClickHouseCancellableCommandRunner(), cancellationToken); + + public virtual async Task ExecuteRawResultAsync(ClickHouseCancellableCommandRunner runner, CancellationToken cancellationToken) + { + if (ClickHouseConnection == null) + throw new InvalidOperationException("Connection is not set"); + + try + { + using var lcts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken); + var response = await runner.PostSqlQueryAsync(this, CommandText, lcts.Token).ConfigureAwait(false); + return new ClickHouseRawResult(response); + } + catch (OperationCanceledException ex) + { + try + { + await CancelQuery(runner.QueryId).ConfigureAwait(false); + } + catch + { + } + + ExceptionDispatchInfo.Capture(ex).Throw(); + } + catch (Exception ex) + { + ExceptionDispatchInfo.Capture(ex).Throw(); + } + return null; // no here + } + + public override Task ExecuteScalarAsync(CancellationToken cancellationToken) => ExecuteScalarAsync(new ClickHouseCancellableCommandRunner(), cancellationToken); + + public virtual async Task ExecuteScalarAsync(ClickHouseCancellableCommandRunner runner, CancellationToken cancellationToken) + { + using var lcts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken); + using var reader = await ExecuteDbDataReaderAsync(runner, CommandBehavior.Default, lcts.Token).ConfigureAwait(false); + return reader.Read() ? reader.GetValue(0) : null; + } + + protected override Task ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken) => + ExecuteDbDataReaderAsync(new ClickHouseCancellableCommandRunner(), behavior, cancellationToken); + + protected virtual async Task ExecuteDbDataReaderAsync(ClickHouseCancellableCommandRunner runner, CommandBehavior behavior, CancellationToken cancellationToken) + { + if (ClickHouseConnection == null) + throw new InvalidOperationException("Connection is not set"); + + try + { + using var lcts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, cancellationToken); + var sqlBuilder = new StringBuilder(CommandText); + switch (behavior) + { + case CommandBehavior.SingleRow: + sqlBuilder.Append(" LIMIT 1"); + break; + case CommandBehavior.SchemaOnly: + sqlBuilder.Append(" LIMIT 0"); + break; + default: + break; + } + var result = await runner.PostSqlQueryAsync(this, sqlBuilder.ToString(), lcts.Token).ConfigureAwait(false); + return ClickHouseDataReader.FromHttpResponse(result, ClickHouseConnection.TypeSettings); + } + catch (OperationCanceledException ex) + { + try + { + await CancelQuery(runner.QueryId).ConfigureAwait(false); + } + catch + { + } + + ExceptionDispatchInfo.Capture(ex).Throw(); + } + catch (Exception ex) + { + ExceptionDispatchInfo.Capture(ex).Throw(); + } + return null; // no here + } +} +#endif diff --git a/ClickHouse.Client/ADO/ClickHouseCancellableCommandRunner.cs b/ClickHouse.Client/ADO/ClickHouseCancellableCommandRunner.cs new file mode 100644 index 00000000..544a3d51 --- /dev/null +++ b/ClickHouse.Client/ADO/ClickHouseCancellableCommandRunner.cs @@ -0,0 +1,154 @@ +using System; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using ClickHouse.Client.ADO.Parameters; +using ClickHouse.Client.Diagnostic; +using ClickHouse.Client.Formats; +using ClickHouse.Client.Json; +using ClickHouse.Client.Utility; + +#if NET7_0_OR_GREATER + +namespace ClickHouse.Client.ADO; + +public class ClickHouseCancellableCommandRunner +{ + private string queryId; + + public ClickHouseCancellableCommandRunner() + { + queryId = Guid.NewGuid().ToString("N"); + } + + /// + /// Gets QueryId associated with command + /// + public string QueryId => queryId; + + public QueryStats QueryStats { get; private set; } + + public async Task PostSqlQueryAsync(ClickHouseCancellableCommand command, string sqlQuery, CancellationToken token) + { + if (command.ClickHouseConnection == null) + throw new InvalidOperationException("Connection not set"); + using var activity = command.ClickHouseConnection.StartActivity("PostSqlQueryAsync"); + + var uriBuilder = command.ClickHouseConnection.CreateUriBuilder(); + await command.ClickHouseConnection.EnsureOpenAsync().ConfigureAwait(false); // Preserve old behavior + + uriBuilder.QueryId = QueryId; + uriBuilder.CommandQueryStringParameters = command.CustomSettings; + + using var postMessage = command.ClickHouseConnection.UseFormDataParameters + ? BuildHttpRequestMessageWithFormData( + command: command, + sqlQuery: sqlQuery, + uriBuilder: uriBuilder) + : BuildHttpRequestMessageWithQueryParams( + command: command, + sqlQuery: sqlQuery, + uriBuilder: uriBuilder); + + activity.SetQuery(sqlQuery); + + var response = await command.ClickHouseConnection.HttpClient + .SendAsync(postMessage, HttpCompletionOption.ResponseHeadersRead, token) + .ConfigureAwait(false); + + QueryStats = ExtractQueryStats(response); + activity.SetQueryStats(QueryStats); + return await ClickHouseConnection.HandleError(response, sqlQuery, activity).ConfigureAwait(false); + } + + private static HttpRequestMessage BuildHttpRequestMessageWithQueryParams(ClickHouseCancellableCommand command, string sqlQuery, ClickHouseUriBuilder uriBuilder) + { + if (command.ClickHouseParameters != null) + { + sqlQuery = command.ClickHouseParameters.ReplacePlaceholders(sqlQuery); + foreach (ClickHouseDbParameter parameter in command.ClickHouseParameters) + { + uriBuilder.AddSqlQueryParameter( + parameter.ParameterName, + HttpParameterFormatter.Format(parameter, command.ClickHouseConnection.TypeSettings)); + } + } + + var uri = uriBuilder.ToString(); + + var postMessage = new HttpRequestMessage(HttpMethod.Post, uri); + + command.ClickHouseConnection.AddDefaultHttpHeaders(postMessage.Headers); + HttpContent content = new StringContent(sqlQuery); + content.Headers.ContentType = new MediaTypeHeaderValue("text/sql"); + if (command.ClickHouseConnection.UseCompression) + { + content = new CompressedContent(content, DecompressionMethods.GZip); + } + + postMessage.Content = content; + + return postMessage; + } + + private static HttpRequestMessage BuildHttpRequestMessageWithFormData(ClickHouseCancellableCommand command, string sqlQuery, ClickHouseUriBuilder uriBuilder) + { + var content = new MultipartFormDataContent(); + + if (command.ClickHouseParameters != null) + { + sqlQuery = command.ClickHouseParameters.ReplacePlaceholders(sqlQuery); + + foreach (ClickHouseDbParameter parameter in command.ClickHouseParameters) + { + content.Add( + content: new StringContent(HttpParameterFormatter.Format(parameter, command.ClickHouseConnection.TypeSettings)), + name: $"param_{parameter.ParameterName}"); + } + } + + content.Add( + content: new StringContent(sqlQuery), + name: "query"); + + var uri = uriBuilder.ToString(); + + var postMessage = new HttpRequestMessage(HttpMethod.Post, uri); + + command.ClickHouseConnection.AddDefaultHttpHeaders(postMessage.Headers); + + postMessage.Content = content; + + return postMessage; + } + + private static readonly JsonSerializerOptions SummarySerializerOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = new SnakeCaseNamingPolicy(), + NumberHandling = System.Text.Json.Serialization.JsonNumberHandling.AllowReadingFromString, + }; + + private static QueryStats ExtractQueryStats(HttpResponseMessage response) + { + try + { + const string summaryHeader = "X-ClickHouse-Summary"; + if (response.Headers.Contains(summaryHeader)) + { + var value = response.Headers.GetValues(summaryHeader).FirstOrDefault(); + var jsonDoc = JsonDocument.Parse(value); + return JsonSerializer.Deserialize(value, SummarySerializerOptions); + } + } + catch + { + } + return null; + } +} +#endif diff --git a/ClickHouse.Client/ADO/ClickHouseCancellableConnection.cs b/ClickHouse.Client/ADO/ClickHouseCancellableConnection.cs new file mode 100644 index 00000000..acca3877 --- /dev/null +++ b/ClickHouse.Client/ADO/ClickHouseCancellableConnection.cs @@ -0,0 +1,26 @@ +using System.Data.Common; +using System.Net.Http; + +#if NET7_0_OR_GREATER + +namespace ClickHouse.Client.ADO; + +public class ClickHouseCancellableConnection : ClickHouseConnection +{ + public ClickHouseCancellableConnection() + : base() { } + + public ClickHouseCancellableConnection(string connectionString) + : base(connectionString) { } + + public ClickHouseCancellableConnection(string connectionString, HttpClient httpClient) + : base(connectionString, httpClient) { } + + public ClickHouseCancellableConnection(string connectionString, IHttpClientFactory httpClientFactory, string httpClientName = "") + : base(connectionString, httpClientFactory, httpClientName) { } + + public new ClickHouseCancellableCommand CreateCommand() => new ClickHouseCancellableCommand(this); + + protected override DbCommand CreateDbCommand() => CreateCommand(); +} +#endif diff --git a/ClickHouse.Client/ADO/ClickHouseCancellableConnectionFactory.cs b/ClickHouse.Client/ADO/ClickHouseCancellableConnectionFactory.cs new file mode 100644 index 00000000..570e8b12 --- /dev/null +++ b/ClickHouse.Client/ADO/ClickHouseCancellableConnectionFactory.cs @@ -0,0 +1,24 @@ +using System.Data.Common; +using ClickHouse.Client.ADO.Adapters; +using ClickHouse.Client.ADO.Parameters; + +namespace ClickHouse.Client.ADO; + +#if NET7_0_OR_GREATER +public class ClickHouseCancellableConnectionFactory : DbProviderFactory +{ + public static ClickHouseCancellableConnectionFactory Instance => new(); + + public override DbConnection CreateConnection() => new ClickHouseCancellableConnection(); + + public override DbDataAdapter CreateDataAdapter() => new ClickHouseDataAdapter(); + + public override DbConnectionStringBuilder CreateConnectionStringBuilder() => new ClickHouseConnectionStringBuilder(); + + public override DbParameter CreateParameter() => new ClickHouseDbParameter(); + + public override DbCommand CreateCommand() => new ClickHouseCancellableCommand(); + + public override DbDataSource CreateDataSource(string connectionString) => new ClickHouseDataSource(connectionString); +} +#endif diff --git a/ClickHouse.Client/ADO/ClickHouseCommand.cs b/ClickHouse.Client/ADO/ClickHouseCommand.cs index 4f6181d2..18852b51 100644 --- a/ClickHouse.Client/ADO/ClickHouseCommand.cs +++ b/ClickHouse.Client/ADO/ClickHouseCommand.cs @@ -45,6 +45,8 @@ public ClickHouseCommand(ClickHouseConnection connection) public override UpdateRowSource UpdatedRowSource { get; set; } + public CancellationToken CancellationToken => cts.Token; + /// /// Gets or sets QueryId associated with command /// After query execution, will be set by value provided by server @@ -95,7 +97,7 @@ public override async Task ExecuteNonQueryAsync(CancellationToken cancellat /// /// Cancellation token /// ClickHouseRawResult object containing response stream - public async Task ExecuteRawResultAsync(CancellationToken cancellationToken) + public virtual async Task ExecuteRawResultAsync(CancellationToken cancellationToken) { if (connection == null) throw new InvalidOperationException("Connection is not set");