Skip to content

Commit

Permalink
Merge pull request #1758 from d4ilys/master
Browse files Browse the repository at this point in the history
优化QuestDb BulkCopy 并重命名ClickHouse QuestDb BulkCopy 扩展方法名
  • Loading branch information
2881099 authored Mar 24, 2024
2 parents 472991f + a3363fd commit 833c76f
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 61 deletions.
10 changes: 5 additions & 5 deletions FreeSql.Tests/FreeSql.Tests/ClickHouse/ClickHouseTest3.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public async Task UriStringIsTooLongTest()
//单个插入报错
await _fsql.Insert(t).ExecuteAffrowsAsync();

// await _fsql.Insert(t).ExecuteBulkCopyAsync();
// await _fsql.Insert(t).ExecuteQuestBulkCopyAsync();
}


Expand All @@ -304,9 +304,9 @@ public async Task TestBulkCopySingle()
//单个插入报错
await _fsql.Insert(t).ExecuteAffrowsAsync();

await _fsql.Insert(t).ExecuteBulkCopyAsync();
await _fsql.Insert(t).ExecuteClickHouseBulkCopyAsync();

_fsql.Insert(t).ExecuteBulkCopy();
_fsql.Insert(t).ExecuteClickHouseBulkCopy();
}

/// <summary>
Expand All @@ -333,9 +333,9 @@ public async Task TestBulkCopyMany()
await _fsql.Insert(t).ExecuteAffrowsAsync();

//BulkCopy不会报错
await _fsql.Insert(t).ExecuteBulkCopyAsync();
await _fsql.Insert(t).ExecuteClickHouseBulkCopyAsync();

_fsql.Insert(t).ExecuteBulkCopy();
_fsql.Insert(t).ExecuteClickHouseBulkCopy();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ public async Task TestSqlBulkCopy()
NameUpdate = "NameUpdate"
});
}
var result = await restFsql.Insert(list).ExecuteBulkCopyAsync();

var result = await restFsql.Insert(list).ExecuteQuestDbBulkCopyAsync();
Assert.True(result > 0);
}

Expand Down
79 changes: 79 additions & 0 deletions FreeSql.Tests/FreeSql.Tests/QuestDb/QuestDbIssue/QuestDbIssue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit;

namespace FreeSql.Tests.QuestDb.QuestDbIssue
{
public class QuestDbIssue : QuestDbTest
{
[Fact]
public void Issue1757()
{
restFsql.CodeFirst.SyncStructure<Test0111>();
var count = fsql.Insert(new List<Test0111>()
{
new()
{
CreateTime = DateTime.Now,
CustomId = 3, Name = "test333",
Price = 3,
Value = 3
}
}).ExecuteQuestDbBulkCopy();

Assert.True(count > 0);

var list = fsql.Select<Test0111>().ToList();
}


[Fact]
public void Issue1757Many()
{
restFsql.CodeFirst.SyncStructure<Test0111>();
var count = fsql.Insert(new List<Test0111>()
{
new()
{
CreateTime = DateTime.Now,
CustomId = 4, Name = "test444",
Price = 4,
Value = 4
},
new()
{
CreateTime = DateTime.Now,
CustomId = 5, Name = "test555",
Price = 5,
Value = 5
},
new()
{
CreateTime = DateTime.Now,
CustomId = 6, Name = "test666",
Price = 6,
Value = 6
}
}).ExecuteQuestDbBulkCopy();

Assert.True(count > 0);

var list = fsql.Select<Test0111>().ToList();
}
}

public class Test0111
{
public long Id { get; set; }

public string Name { get; set; }
public decimal Price { get; set; }
public DateTime CreateTime { get; set; }
public long CustomId { get; set; }

public double Value { get; set; }
}
}
10 changes: 7 additions & 3 deletions FreeSql.Tests/FreeSql.Tests/QuestDb/QuestDbTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,27 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit;

namespace FreeSql.Tests.QuestDb
{
public class QuestDbTest
{
public static IFreeSql fsql = new FreeSql.FreeSqlBuilder()
.UseConnectionString(FreeSql.DataType.QuestDb,
@"host=192.168.0.36;port=8812;username=admin;password=quest;database=qdb;ServerCompatibilityMode=NoTypeLoading;")
@"host=192.168.1.114;port=8812;username=admin;password=quest;database=qdb;ServerCompatibilityMode=NoTypeLoading;")
.UseMonitorCommand(cmd => Debug.WriteLine($"Sql:{cmd.CommandText}")) //监听SQL语句
.UseNoneCommandParameter(true)
.Build();

public static IFreeSql restFsql = new FreeSql.FreeSqlBuilder()
.UseConnectionString(FreeSql.DataType.QuestDb,
@"host=192.168.0.36;port=8812;username=admin;password=quest;database=qdb;ServerCompatibilityMode=NoTypeLoading;")
@"host=192.168.1.114;port=8812;username=admin;password=quest;database=qdb;ServerCompatibilityMode=NoTypeLoading;")
.UseMonitorCommand(cmd => Debug.WriteLine($"Sql:{cmd.CommandText}")) //监听SQL语句
.UseQuestDbRestAPI("192.168.0.36:9001", "admin", "ushahL(aer2r")
.UseQuestDbRestAPI("192.168.1.114:9000")
.Build();

}


}
6 changes: 3 additions & 3 deletions Providers/FreeSql.Provider.ClickHouse/ClickHouseExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static ISelect<T> Sample<T>(this ISelect<T> that, decimal k, int n, decim
/// <typeparam name="T"></typeparam>
/// <param name="that"></param>
/// <returns></returns>
public static async Task<int> ExecuteBulkCopyAsync<T>(this IInsert<T> that) where T : class
public static async Task<int> ExecuteClickHouseBulkCopyAsync<T>(this IInsert<T> that) where T : class
{
try
{
Expand All @@ -87,8 +87,8 @@ public static async Task<int> ExecuteBulkCopyAsync<T>(this IInsert<T> that) wher
/// <typeparam name="T"></typeparam>
/// <param name="insert"></param>
/// <returns></returns>
public static int ExecuteBulkCopy<T>(this IInsert<T> insert) where T : class
public static int ExecuteClickHouseBulkCopy<T>(this IInsert<T> insert) where T : class
{
return ExecuteBulkCopyAsync(insert).ConfigureAwait(false).GetAwaiter().GetResult();
return ExecuteClickHouseBulkCopyAsync(insert).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
96 changes: 47 additions & 49 deletions Providers/FreeSql.Provider.QuestDb/QuestDbGlobalExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ public static partial class QuestDbGlobalExtensions
/// <param name="args"></param>
/// <returns></returns>
public static string FormatQuestDb(this string that, params object[] args) =>
_QuestDbAdo.Addslashes(that, args);
_questDbAdo.Addslashes(that, args);

static QuestDbAdo _QuestDbAdo = new QuestDbAdo();
private static readonly QuestDbAdo _questDbAdo = new QuestDbAdo();

public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder buider, string host, string username = "",
string password = "") => RestAPIExtension.UseQuestDbRestAPI(buider, host, username, password);
public static FreeSqlBuilder UseQuestDbRestAPI(this FreeSqlBuilder build, string host, string username = "",
string password = "") => RestAPIExtension.UseQuestDbRestAPI(build, host, username, password);

/// <summary>
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
Expand All @@ -53,6 +53,7 @@ public static ISelect<T1> LatestOn<T1, TKey>(this ISelect<T1> select, Expression
LatestOnExtension.InternelImpl(timestamp, partition);
return select;
}

/// <summary>
/// 对于多个时间序列存储在同一个表中的场景,根据时间戳检索给定键或键组合的最新项。
/// </summary>
Expand Down Expand Up @@ -113,13 +114,15 @@ public static ISelect<T1, T2, T3, T4> LatestOn<T1, T2, T3, T4, TKey>(this ISelec
/// <param name="unit">单位</param>
/// <param name="alignToCalendar">对准日历</param>
/// <returns></returns>
public static ISelect<T> SampleBy<T>(this ISelect<T> select, double time, SampleUnit unit, bool alignToCalendar = false)
public static ISelect<T> SampleBy<T>(this ISelect<T> select, double time, SampleUnit unit,
bool alignToCalendar = false)
{
SampleByExtension.IsExistence.Value = true;
var samoleByTemple = $"{Environment.NewLine}SAMPLE BY {{0}}{{1}} {{2}}";
string alignToCalendarTemple = "";
if (alignToCalendar) alignToCalendarTemple = "ALIGN TO CALENDAR ";
SampleByExtension.SamoleByString.Value = string.Format(samoleByTemple, time.ToString(), (char)unit, alignToCalendarTemple);
SampleByExtension.SamoleByString.Value =
string.Format(samoleByTemple, time.ToString(), (char)unit, alignToCalendarTemple);
return select;
}

Expand Down Expand Up @@ -156,23 +159,25 @@ private static List<string> SplitByLine(string text)
/// <param name="that"></param>
/// <param name="dateFormat">导入时,时间格式 默认:yyyy/M/d H:mm:ss</param>
/// <returns></returns>
public static async Task<int> ExecuteBulkCopyAsync<T>(this IInsert<T> that,string dateFormat = "yyyy/M/d H:mm:ss") where T : class
public static async Task<int> ExecuteQuestDbBulkCopyAsync<T>(this IInsert<T> that,
string dateFormat = "yyyy/M/d H:mm:ss") where T : class
{
//思路:通过提供的RestAPI imp,实现快速复制
if (string.IsNullOrWhiteSpace(RestAPIExtension.BaseUrl))
{
throw new Exception("BulkCopy功能需要启用RestAPI,启用方式:new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")");
throw new Exception(
"BulkCopy功能需要启用RestAPI,启用方式:new FreeSqlBuilder().UseQuestDbRestAPI(\"localhost:9000\", \"username\", \"password\")");
}

var result = 0;
var fileName = $"{Guid.NewGuid()}.csv";
var filePath = Path.Combine(AppContext.BaseDirectory, fileName);

try
{
var client = QuestDbContainer.GetService<IHttpClientFactory>().CreateClient();
var boundary = "---------------" + DateTime.Now.Ticks.ToString("x");
var list = new List<Hashtable>();
var insert = that as QuestDbInsert<T>;
var name = insert.InternalTableRuleInvoke(); //获取表名
var name = insert.InternalTableRuleInvoke(); //获取表名
insert.InternalOrm.DbFirst.GetTableByName(name).Columns.ForEach(d =>
{
if (d.DbTypeText == "TIMESTAMP")
Expand All @@ -181,7 +186,7 @@ public static async Task<int> ExecuteBulkCopyAsync<T>(this IInsert<T> that,strin
{
{ "name", d.Name },
{ "type", d.DbTypeText },
{ "pattern", dateFormat}
{ "pattern", dateFormat }
});
}
else
Expand All @@ -194,34 +199,37 @@ public static async Task<int> ExecuteBulkCopyAsync<T>(this IInsert<T> that,strin
}
});
var schema = JsonConvert.SerializeObject(list);
//写入CSV文件
using (var writer = new StreamWriter(filePath))
using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture))
using (MemoryStream stream = new MemoryStream())
{
await csv.WriteRecordsAsync(insert._source);
}
//写入CSV文件
using (var writer = new StreamWriter(stream))
using (var csv = new CsvWriter(writer, CultureInfo.CurrentCulture))
{
await csv.WriteRecordsAsync(insert._source);
}

var httpContent = new MultipartFormDataContent(boundary);
if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization))
client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization);
httpContent.Add(new StringContent(schema), "schema");
httpContent.Add(new ByteArrayContent(File.ReadAllBytes(filePath)), "data");
//boundary带双引号 可能导致服务器错误情况
httpContent.Headers.Remove("Content-Type");
httpContent.Headers.TryAddWithoutValidation("Content-Type",
"multipart/form-data; boundary=" + boundary);
var httpResponseMessage =
await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent);
var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync();
var splitByLine = SplitByLine(readAsStringAsync);
foreach (var s in splitByLine)
{
if (s.Contains("Rows"))
var httpContent = new MultipartFormDataContent(boundary);
if (!string.IsNullOrWhiteSpace(RestAPIExtension.authorization))
client.DefaultRequestHeaders.Add("Authorization", RestAPIExtension.authorization);
httpContent.Add(new StringContent(schema), "schema");
httpContent.Add(new ByteArrayContent(stream.ToArray()), "data");
//boundary带双引号 可能导致服务器错误情况
httpContent.Headers.Remove("Content-Type");
httpContent.Headers.TryAddWithoutValidation("Content-Type",
"multipart/form-data; boundary=" + boundary);
var httpResponseMessage =
await client.PostAsync($"{RestAPIExtension.BaseUrl}/imp?name={name}", httpContent);
var readAsStringAsync = await httpResponseMessage.Content.ReadAsStringAsync();
var splitByLine = SplitByLine(readAsStringAsync);
foreach (var s in splitByLine)
{
var strings = s.Split('|');
if (strings[1].Trim() == "Rows imported")
if (s.Contains("Rows"))
{
result = Convert.ToInt32(strings[2].Trim());
var strings = s.Split('|');
if (strings[1].Trim() == "Rows imported")
{
result = Convert.ToInt32(strings[2].Trim());
}
}
}
}
Expand All @@ -230,17 +238,6 @@ public static async Task<int> ExecuteBulkCopyAsync<T>(this IInsert<T> that,strin
{
throw e;
}
finally
{
try
{
File.Delete(filePath);
}
catch
{
// ignored
}
}

return result;
}
Expand All @@ -252,9 +249,9 @@ public static async Task<int> ExecuteBulkCopyAsync<T>(this IInsert<T> that,strin
/// <param name="insert"></param>
/// <param name="dateFormat">导入时,时间格式 默认:yyyy/M/d H:mm:ss</param>
/// <returns></returns>
public static int ExecuteBulkCopy<T>(this IInsert<T> insert,string dateFormat = "yyyy/M/d H:mm:ss") where T : class
public static int ExecuteQuestDbBulkCopy<T>(this IInsert<T> insert, string dateFormat = "yyyy/M/d H:mm:ss") where T : class
{
return ExecuteBulkCopyAsync(insert,dateFormat).ConfigureAwait(false).GetAwaiter().GetResult();
return ExecuteQuestDbBulkCopyAsync(insert, dateFormat).ConfigureAwait(false).GetAwaiter().GetResult();
}
}

Expand Down Expand Up @@ -344,6 +341,7 @@ internal static FreeSqlBuilder UseQuestDbRestAPI(FreeSqlBuilder buider, string h
var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes($"{username}:{password}"));
authorization = $"Basic {base64}";
}

//RestApi需要无参数
buider.UseNoneCommandParameter(true);
return buider;
Expand Down

0 comments on commit 833c76f

Please sign in to comment.