Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[regression](compaction) Add case to test single replica compaction #27199

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,20 @@ void Tablet::get_compaction_status(std::string* json_result) {
root.GetAllocator());
root.AddMember("last base status", base_compaction_status_value, root.GetAllocator());

TReplicaInfo replica_info;
std::string dummp_token;
rapidjson::Value fetch_addr;
if (tablet_meta()->tablet_schema()->enable_single_replica_compaction() &&
StorageEngine::instance()->get_peer_replica_info(tablet_id(), &replica_info,
&dummp_token)) {
std::string addr = replica_info.host + ":" + std::to_string(replica_info.brpc_port);
fetch_addr.SetString(addr.c_str(), addr.length(), root.GetAllocator());
} else {
// -1 means do compaction locally
fetch_addr.SetString("-1", root.GetAllocator());
}
root.AddMember("fetch from peer", fetch_addr, root.GetAllocator());

// print all rowsets' version as an array
rapidjson::Document versions_arr;
rapidjson::Document missing_versions_arr;
Expand Down
10 changes: 10 additions & 0 deletions regression-test/data/compaction/test_single_replica_compaction.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 b 100
2 b 100
3 b 100
5 a 100
6 a 100
7 a 100
8 a 100

Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_single_replica_compaction", "p2") {
def tableName = "test_single_replica_compaction"

def set_be_config = { key, value ->
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
}

boolean disableAutoCompaction = true
boolean has_update_be_config = false
try {
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))

logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}
set_be_config.call("disable_auto_compaction", "true")
has_update_be_config = true

def triggerCompaction = { be_host, be_http_port, compact_type, tablet_id ->
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://${be_host}:${be_http_port}")
sb.append("/api/compaction/run?tablet_id=")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not employ single replica compaction. use table_id =

sb.append(tablet_id)
sb.append("&compact_type=${compact_type}")

String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
if (!disableAutoCompaction) {
return "Success, " + out
}
assertEquals(code, 0)
return out
}

def waitForCompaction = { be_host, be_http_port, tablet_id ->
boolean running = true
do {
Thread.sleep(1000)
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://${be_host}:${be_http_port}")
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)

String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}

def getTabletStatus = { be_host, be_http_port, tablet_id ->
boolean running = true
Thread.sleep(1000)
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://${be_host}:${be_http_port}")
sb.append("/api/compaction/show?tablet_id=")
sb.append(tablet_id)

String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
out = process.getText()
logger.info("Get tablet status: code=" + code + ", out=" + out)
assertEquals(code, 0)
def tabletStatus = parseJson(out.trim())
return tabletStatus
}


sql """ DROP TABLE IF EXISTS ${tableName}; """
sql """
CREATE TABLE ${tableName} (
`id` int(11) NULL,
`name` varchar(255) NULL,
`score` int(11) NULL
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES ( "replication_num" = "3", "enable_single_replica_compaction" = "true" );
"""

String[][] tablets = sql """ show tablets from ${tableName}; """

// wait for update replica infos
// be.conf: update_replica_infos_interval_seconds
Thread.sleep(20000)

// find the master be for single replica compaction
Boolean found = false
String master_backend_id;
List<String> follower_backend_id = new ArrayList<>()
// The test table only has one bucket with 3 replicas,
// and `show tablets` will return 3 different replicas with the same tablet.
// So we can use the same tablet_id to get tablet/trigger compaction with different backends.
String tablet_id = tablets[0][0]
for (String[] tablet in tablets) {
String trigger_backend_id = tablet[2]
def tablet_status = getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
Xiaoccer marked this conversation as resolved.
Show resolved Hide resolved
def fetchFromPeerValue = tablet_status."fetch from peer"

if (found && fetchFromPeerValue.contains("-1")) {
logger.warn("multipe master");
assertTrue(false)
}
if (fetchFromPeerValue.contains("-1")) {
found = true
master_backend_id = trigger_backend_id
} else {
follower_backend_id.add(trigger_backend_id)
}
}

def checkCompactionResult = {
def master_tablet_status = getTabletStatus(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id);
def master_rowsets = master_tablet_status."rowsets"
assert master_rowsets instanceof List
logger.info("rowset size: " + master_rowsets.size())

for (String backend: follower_backend_id) {
def tablet_status = getTabletStatus(backendId_to_backendIP[backend], backendId_to_backendHttpPort[backend], tablet_id);
def rowsets = tablet_status."rowsets"
assert rowsets instanceof List
assertEquals(master_rowsets.size(), rowsets.size())
}
}

sql """ INSERT INTO ${tableName} VALUES (1, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (1, "b", 100); """
sql """ INSERT INTO ${tableName} VALUES (2, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (2, "b", 100); """
sql """ INSERT INTO ${tableName} VALUES (3, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (3, "b", 100); """

// trigger master be to do cum compaction
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id)

// trigger follower be to fetch compaction result
for (String id in follower_backend_id) {
assertTrue(triggerCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id)
}

// check rowsets
checkCompactionResult.call()

sql """ INSERT INTO ${tableName} VALUES (4, "a", 100); """
sql """ DELETE FROM ${tableName} WHERE id = 4; """
sql """ INSERT INTO ${tableName} VALUES (5, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (6, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (7, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (8, "a", 100); """

// trigger master be to do cum compaction with delete
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id)

// trigger follower be to fetch compaction result
for (String id in follower_backend_id) {
assertTrue(triggerCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id)
}

// check rowsets
checkCompactionResult.call()

// trigger master be to do base compaction
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id],
"base", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id)

// // trigger follower be to fetch compaction result
for (String id in follower_backend_id) {
assertTrue(triggerCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id],
"base", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id)
}

// check rowsets
checkCompactionResult.call()

qt_sql """
select * from ${tableName} order by id
"""

} finally {
if (has_update_be_config) {
set_be_config.call("disable_auto_compaction", disableAutoCompaction.toString())
}
}
}
Loading