Skip to content
This repository was archived by the owner on Jun 22, 2025. It is now read-only.

Cancelation and query duplication use #642

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions ClickHouse.Client.Tests/ADO/ConnectionCancellableTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ClickHouse.Client.ADO;
using ClickHouse.Client.ADO.Parameters;
using ClickHouse.Client.Utility;
using NUnit.Framework;

#if NET7_0_OR_GREATER

namespace ClickHouse.Client.Tests.ADO;

public class ConnectionCancellableTests : AbstractConnectionTestFixture
{
#region IHttpClientFactory
internal class TestException : Exception
{
public string Parameter { get; private set; }
public TestException(string parameter) : base()
{
Parameter = parameter;
}
}

internal class HttpClientFactoryFake : IHttpClientFactory
{
public HttpClient CreateClient(string name)
{
throw new TestException($"HttpClientFactoryFake:CreateClient: {name}");
}
}
#endregion

[Test]
public async Task ShouldCreateCancellableConnectionWithProvidedHttpClient()
{
using var httpClientHandler = new HttpClientHandler() { AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate };
using var httpClient = new HttpClient(httpClientHandler);
using var connection = new ClickHouseCancellableConnection(TestUtilities.GetConnectionStringBuilder().ToString(), httpClient);
await connection.OpenAsync();
ClassicAssert.IsNotEmpty(connection.ServerVersion);
}

[Test]
public void ShouldCreateCancellableConnectionWithProvidedHttpClientName()
{
using var httpClientHandler = new HttpClientHandler() { AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate };
using var httpClient = new HttpClient(httpClientHandler);
using var connection = new ClickHouseCancellableConnection(TestUtilities.GetConnectionStringBuilder().ToString(), new HttpClientFactoryFake(), "TestMe");
Assert.Catch<TestException>(() => connection.Open(), "HttpClientFactoryFake:CreateClient: TestMe");
}

[Test]
public void ShouldCreateCommandCancellable()
{
using var connection = new ClickHouseCancellableConnection();
var command1 = connection.CreateCommand();
Assert.That(command1.GetType(), Is.EqualTo(typeof(ClickHouseCancellableCommand)));
Assert.That(command1.ClickHouseConnection, Is.EqualTo(connection));
}

// see: https://github.com/linq2db/linq2db/discussions/4966
// see: https://github.com/DarkWanderer/ClickHouse.Client/issues/489
[Test]
[Explicit("In linq2db you may call the same command multiple times in different SQLs")]
public async Task MultiTimeCallOneCommandAndFallExceptionWhenHasQueryId()
{
string queryId = "MyQueryId123456";
var command = cancellableConnection.CreateCommand();
command.QueryId = queryId;

try
{
List<Task> tasks = new List<Task>(2);

command.CommandText = "SELECT sleep(2) FROM system.numbers LIMIT 100";
tasks.Add(command.ExecuteScalarAsync());

command.CommandText = "SELECT sleep(3) FROM system.numbers LIMIT 200"; // this is another query with the same DbCommand/QueryId
tasks.Add(command.ExecuteScalarAsync());

await Task.WhenAll(tasks);
}
catch (ClickHouseServerException ex) when (ex.ErrorCode == 216)
{
Assert.Fail("The query id is running.");
}
catch (Exception)
{
Assert.Fail("Query throw another exception");
}
}

// see: https://github.com/DarkWanderer/ClickHouse.Client/discussions/482
[Test]
[Explicit("Support Cancellation")]
public async Task SupportCancellation()
{
string queryId = "MyQueryId123456";
var command = cancellableConnection.CreateCommand();
command.CommandText = "SELECT *\r\nFROM (SELECT sleep(3), '0' as num FROM system.numbers LIMIT 100) t1\r\nINNER JOIN (SELECT sleep(3), '0' as num FROM system.numbers LIMIT 100) t2 on t1.num = t2.num";
command.QueryId = queryId;

var commandRunning = connection.CreateCommand();
commandRunning.CommandText = $"SELECT count(*) FROM system.processes where query_id like '{queryId}';";

CancellationTokenSource cts = new CancellationTokenSource();

async Task cancelAsync(CancellationTokenSource cancellationTokenSource)
{
await Task.Delay(1000);
cancellationTokenSource.Cancel();
}

Stopwatch sw = Stopwatch.StartNew();
try
{
_ = Task.Run(async () => await cancelAsync(cts));

await command.ExecuteScalarAsync(cts.Token);
sw.Stop();

if (sw.ElapsedMilliseconds > 5000)
Assert.Fail("The query was not cancelled in time");

Assert.Fail("The query did not throw an exception");
}
catch (OperationCanceledException)
{
// Expected exception as operation canceled

ulong num = (ulong)commandRunning.ExecuteScalar();
if (num > 0)
Assert.Fail("The query was not cancelled in time, it is still running on the server");
}
catch (Exception)
{
Assert.Fail("Query throw another exception");
}
sw.Stop();
}
}
#endif
2 changes: 2 additions & 0 deletions ClickHouse.Client.Tests/ADO/ConnectionTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Net.Http;
Expand Down
7 changes: 7 additions & 0 deletions ClickHouse.Client.Tests/AbstractConnectionTestFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,16 @@ public class AbstractConnectionTestFixture : IDisposable
{
protected readonly ClickHouseConnection connection;

#if NET7_0_OR_GREATER
protected readonly ClickHouseCancellableConnection cancellableConnection;
#endif

protected AbstractConnectionTestFixture()
{
connection = TestUtilities.GetTestClickHouseConnection();
#if NET7_0_OR_GREATER
cancellableConnection = TestUtilities.GetTestClickHouseCancellableConnection();
#endif
using var command = connection.CreateCommand();
command.CommandText = "CREATE DATABASE IF NOT EXISTS test;";
command.ExecuteScalar();
Expand Down
27 changes: 24 additions & 3 deletions ClickHouse.Client.Tests/TestUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,28 @@ public static async Task ExpectedFeaturesShouldMatchActualFeatures()
/// </summary>
/// <returns></returns>
public static ClickHouseConnection GetTestClickHouseConnection(bool compression = true, bool session = false, bool customDecimals = true)
{
ClickHouseConnectionStringBuilder builder = SetupConnectionStringBuilder(compression, session, customDecimals);
var connection = new ClickHouseConnection(builder.ConnectionString);
connection.Open();
return connection;
}

#if NET7_0_OR_GREATER
/// <summary>
/// Utility method to allow to redirect ClickHouse connections to different machine, in case of Windows development environment
/// </summary>
/// <returns></returns>
public static ClickHouseCancellableConnection GetTestClickHouseCancellableConnection(bool compression = true, bool session = false, bool customDecimals = true)
{
ClickHouseConnectionStringBuilder builder = SetupConnectionStringBuilder(compression, session, customDecimals);
var connection = new ClickHouseCancellableConnection(builder.ConnectionString);
connection.Open();
return connection;
}
#endif

private static ClickHouseConnectionStringBuilder SetupConnectionStringBuilder(bool compression, bool session, bool customDecimals)
{
var builder = GetConnectionStringBuilder();
builder.Compression = compression;
Expand All @@ -70,9 +92,8 @@ public static ClickHouseConnection GetTestClickHouseConnection(bool compression
{
builder["set_allow_experimental_dynamic_type"] = 1;
}
var connection = new ClickHouseConnection(builder.ConnectionString);
connection.Open();
return connection;

return builder;
}

public static ClickHouseConnectionStringBuilder GetConnectionStringBuilder()
Expand Down
Loading
Loading