-
-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Watch Live View . #123
Labels
enhancement
New feature or request
Comments
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
|
Hi, Thanks for contribution, will definitely look into this |
I have been using this code for some time and it works for me without any problems. using System.IO;
using System.Net.Sockets;
using System.Text;
using SpanJson;
public class LiveViewWatcher : IDisposable
{
public static LiveViewWatcher Watch(string watchName, string ip_address = null, string user = null, string pwd = null, int port = 8123, string sessionId = null, bool only_event = false) =>
new LiveViewWatcher(watchName, ip_address, user, pwd, 8123, sessionId, only_event);
public LiveViewWatcher(string watchName, string ip_address = null, string user = null, string pwd = null, int? port = null, string sessionId = null, bool only_event = false)
{
ip_address ??= MachineConstants.CLICKHOUSE_SERVER.IP;
user ??= MachineConstants.CLICKHOUSE_SERVER.USER;
pwd ??= MachineConstants.CLICKHOUSE_SERVER.PASSWORD;
port ??= Convert.ToInt32(MachineConstants.CLICKHOUSE_SERVER.PORT);
SessionId = sessionId ??= Guid.NewGuid().ToString();
ServerIpAddres = ip_address;
ServerPort = port.Value;
Username = user;
Password = pwd;
WatchName = watchName;
OnlyEvents = only_event;
}
private readonly string WatchName;
private readonly bool OnlyEvents;
public int KeepAliveInterval { get; private set; } = 240;
public delegate void EvLiveViewResult(LiveViewWatcher watchObj, List<Dictionary<string, string>> data);
public delegate void EvLiveViewJsonResult(LiveViewWatcher watchObj, string data);
public event EvLiveViewResult OnLiveViewResult = null;
public event EvLiveViewJsonResult OnLiveViewJsonResult = null;
private void ProcessDataBuffer(byte[] bytes)
{
var json_data = "";
var all_content = "";
StringBuilder str_builder = new StringBuilder();
try
{
all_content = bytes.ToUTF8();
var splited_chunk = all_content.Split("\r\n", StringSplitOptions.RemoveEmptyEntries).Where(c => c.Length > 6).ToArray();
foreach (var ch_line in splited_chunk)
str_builder.Append(ch_line);
all_content = str_builder.ToString();
str_builder = new StringBuilder();
var lines = all_content.Split("\n", StringSplitOptions.RemoveEmptyEntries).Where(c => c.Length > 6).ToArray();
str_builder.Append("[");
foreach (var line in lines)
{
if (line[0] != '{' || line.StartsWith("{\"progress\":"))
continue;
str_builder.Append(line.Substring(7, line.Length - 8));
str_builder.Append(",");
}
str_builder = str_builder.Remove(str_builder.Length - 1, 1);
str_builder.Append("]");
json_data = str_builder.ToString();
if (json_data.Length < 2)
return;
Task.Run(() =>
{
OnLiveViewJsonResult?.Invoke(this, json_data);
OnLiveViewResult?.Invoke(this, JsonSerializer.Generic.Utf16.Deserialize<List<Dictionary<string, string>>>(json_data));
});
}
catch (Exception expMsg)
{
Console.WriteLine(expMsg.StackTrace);
// System.IO.File.WriteAllText($"live_data_j_{DateTime.Now.ToFileTime()}.txt", json_data);
// System.IO.File.WriteAllText($"live_data_c_{DateTime.Now.ToFileTime()}.txt", all_content);
}
}
#region Scoket Methods
public string ServerIpAddres { get; }
public int ServerPort { get; } = 8123;
public string Password { get; }
public string Username { get; }
public string SessionId { get; set; }
public bool IsDisposed { get; private set; } = false;
public TcpClient socket;
public async Task StartWatch()
{
try
{
SessionId = Guid.NewGuid().ToString();
string url = $"http://{ServerIpAddres}:{ServerPort}/?user={Username}&password={Password}&session_id={SessionId}&allow_experimental_live_view=1&live_view_heartbeat_interval=120&query=WATCH%20{WatchName}{(OnlyEvents ? "%20EVENTS" : "")}%20FORMAT%20JSONEachRowWithProgress";
var request = $"GET {url} HTTP/1.1\r\nHost: {ServerIpAddres}\r\nContent-Length: 0\r\nConnection: Keep-Alive\r\nKeep-Alive: timeout=3600, max=100\r\n\r\n";
socket = new TcpClient(ServerIpAddres, ServerPort);
socket.ReceiveBufferSize = 4096 * 2;
socket.SendBufferSize = 4096 * 2;
socket.ReceiveTimeout = 30000;
socket.SendTimeout = 30000;
using var stream = socket.GetStream();
await stream.WriteAsync(request.ToBytes());
await stream.FlushAsync();
int byte_read = 0;
MemoryStream LiveViewBuffer = new MemoryStream();
DateTime lastMessageTime = DateTime.Now;
do
{
byte[] bytes = new byte[1024 * 32];
byte_read = await stream.ReadAsync(bytes, 0, bytes.Length);
if (byte_read <= 0)
{
if ((DateTime.Now - lastMessageTime).TotalSeconds > KeepAliveInterval)
throw new Exception($"[{WatchName}] [{DateTime.Now.ToMiniStr()}] [{lastMessageTime.ToMiniStr()}] http session is killed!");
await stream.WriteAsync(new byte[0]);
System.Threading.Thread.Sleep(2000);
continue;
}
lastMessageTime = DateTime.Now;
LiveViewBuffer.Write(bytes, 0, byte_read);
if (byte_read >= 3 && bytes[byte_read - 3] == 10 && bytes[byte_read - 2] == 13 && bytes[byte_read - 1] == 10)
{
LiveViewBuffer.Flush();
ProcessDataBuffer(LiveViewBuffer.ToArray());
LiveViewBuffer = new MemoryStream();
continue;
}
} while (true);
}
catch (Exception expMsg)
{
Console.WriteLine(expMsg.Message);
if (!IsDisposed)
{
ClearSocket();
System.Threading.Thread.Sleep(20000);
_ = StartWatch();
}
}
}
private void ClearSocket()
{
if (socket != null)
{
try
{
if (socket.Connected)
socket?.Close();
socket?.Dispose();
}
catch
{
socket = null;
}
}
}
#endregion
public void Dispose() => IsDisposed = true;
} |
DarkWanderer
added a commit
that referenced
this issue
Dec 30, 2022
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi.
I implement code that open http port and monitor watch live view changes.
can you add this code to your library ,and return result in object ?
output result is :
The text was updated successfully, but these errors were encountered: