diff --git a/kafka/README.md b/kafka/README.md new file mode 100644 index 0000000..f21ce6f --- /dev/null +++ b/kafka/README.md @@ -0,0 +1,58 @@ + +This tool is used to collect information from a Kafka cluster to add in problem diagnosis or review. + +Note: +* This won't work on versions before kafka 2 +* User is requested to change the path values in `node_collector.sh` before running any of the scripts, as by default the script uses `Kafka configuration file locations`, `data directory location`, and other setting locations as per **Apache Kafka** default setup. +* The term "VM" in environment of script `cluster_collector.sh` means if running in kernel. + +# Design info: +There are two scripts used in instacollector tool for kafka. + +1. `node_collector.sh`: supposed to be executed on each Kafka node. It executes Linux and Kafka commands and copies configuration and log files required for cluster health checks. The user needs to modify the `KAFKA_HOME` path inside the script as per their configurations, the default value used is: +``` +KAFKA_HOME=/opt/kafka +``` +2. `cluster_collector.sh`: to be executed on a machine connected to Kafka cluster e.g. user laptop with a running docker or a running VM. It executes node_collector.sh on each Kafka node using ssh. The cluster_collector.sh requires 4 user inputs : +``` + * Enter your kafka environment (vm/docker) : + * [If VM] + * Enter username for login on Kafka cluster nodes (Press Enter for default admin) : + * Enter Identity file path: (the ssh key file in your local machine which is used to connect to the VMs) + * [If docker] + * Enter docker home directory: + * Enter path of the command config file: (kafka command-config file location on the kafka brokers) + * Enter file containing ip addresses/host/container names of Kafka cluster nodes: (the hosts file in your local machine) +``` +******************* +# Execution settings: +The `cluster_collector.sh` has setting of connecting to cluster nodes using the provided ssh key file or an id file. + +If the ssh key has passphrase enabled then please use `ssh-agent` & `ssh-add` commands to add the passphrase before running `cluster_collector.sh` script. + +If there is another method required for `ssh`, user is requested to change the script as applicable. + +Alternatively, the `node_collector.sh` can also be executed on individual nodes if `cluster_collector.sh` is not useful in any case. + + +Below are the Kafka & Zookeeper related files which will be copied from different nodes: +``` +Kafka Broker Files | Zookeeper Files +**********************************|*********************** +server.properties | zookeeper.properties +server.log | zoo.cfg +kafkaServer.out | log4j.properties +kafka-authorizer.log | zoo.log +controller.log | zookeeper_jaas.conf +state-change.log | zookeeper.out +kafka_server_jaas.conf | +kafka-topics/.sh | +kafka-topics/.sh | +kafka-broker-api-versions/.sh | +kafka-consumer-groups/.sh | +server.properties +``` + +**Note:** The scripts should be executed on bash shell. + +Please see https://www.instaclustr.com/support/documentation/announcements/instaclustr-open-source-project-status/ for Instaclustr support status of this project diff --git a/kafka/cluster_collector.sh b/kafka/cluster_collector.sh new file mode 100755 index 0000000..d862ab0 --- /dev/null +++ b/kafka/cluster_collector.sh @@ -0,0 +1,159 @@ +#!/bin/bash + +##******************************************************************************************************************** +##******************************************************************************************************************** +## The purpose of this tool is to extract kafka & zookeeper related configuration and log files for troubleshooting. +## Following are the list of files that are extracted. Please note that not all files exists in an environment. +## All properties with the word "password" in it are replaced with "***" +#=============================================================# +# kafka files and the path variables where they are expected +# BROKER_CONFIG_PATH +# server.properties +# BROKER_LOG_PATH +# server.log +# kafkaServer.out +# kafka-authorizer.log +# controller.log +# state-change.log +# BROKER_JAAS_CONFIG +# kafka_server_jaas.conf +# ZOOKEEPER_CONFIG +# zookeeper.properties +# zoo.cfg +# log4j.properties +# ZOOKEEPER_LOG_PATH +# zoo.log +# ZOOKEEPER_JAAS_CONFIG +# zookeeper_jaas.conf +# ZOOKEEPER_LOG_PATH +# zookeeper.out +# BROKER_BIN_PATH +# kafka-topics/.sh +# kafka-topics/.sh +# kafka-broker-api-versions/.sh +# kafka-consumer-groups/.sh +#=============================================================# +## +## In addition to the files above the script also extract the following OS related information - +## 1. file system & directory size +## 2. io stats +## 3. file descriptors +## 4. cpu & memory +## 5. contents of the hosts file +## 6. output of kafka-topics.sh topic describe +## +##******************************************************************************************************************** +##******************************************************************************************************************** +## Last Modification Date : 10/29/2021 +## Description : Script functionality enhanced to add information related to iostat, df, file descriptor +## cpu & memory info +##******************************************************************************************************************** +##******************************************************************************************************************** + +clear + +#GLOBAL VARIABLES +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +INFO_DIR=/tmp/InstaCollection_$(date +%Y%m%d%H%M) + +#Collect environment info (VM/docker) +read -p "Enter your kafka environment (vm/docker) :" kenv + +if [[ "${kenv}" == "vm" ]]; then + #Collect user info. + read -p "Enter username for login on Kafka cluster nodes (Press Enter for default admin) :" user + [ -z "${user}" ] && user='admin' + + #user='rahulchakrabarty' + + read -p "Enter Identity file path:" id_file + if [[ ! -f ${id_file} || ! -s ${id_file} ]]; then + echo "$id_file File not found!" + exit 1 + fi + + #id_file='/Users/rahulchakrabarty-instaclustr/.ssh/rahulchakrabarty-instaclustr' +elif [[ "${kenv}" == "docker" ]]; then + read -p "Enter docker home directory :" docker_home + + if [ -z "$docker_home" ]; then + echo "Docker home directory cannot be empty" + exit 1 + fi +else + echo "Invalid value for environment" + exit 1 +fi + +read -p "Enter path of the command config file:" config_file + +read -p "Enter file containing ip addresses/host/container names of Kafka cluster nodes:" peers_file +if [[ ! -f ${peers_file} || ! -s ${peers_file} ]]; then + echo "$peers_file File not found!" + exit 1 +fi + +#peers_file='./hosts' + +echo "environment $kenv" + +#Execute the node_collector on each node or container +if [ "$kenv" == "vm" ]; then + while read peer + do + if [[ -z "$peer" ]]; then + break + fi + ssh -i $id_file $user@$peer "bash -s" < node_collector.sh $peer $config_file & + done < "$peers_file" +else + while read peer + do + if [[ -z "$peer" ]]; then + break + fi + echo "Copying file node_collector.sh to container" + docker cp ./node_collector.sh $peer:$docker_home/ + docker exec $peer /bin/bash -c "sh $docker_home/node_collector.sh $peer $config_file" & + done < "$peers_file" +fi + +#waiting for all node_collectors to complete +wait + +mkdir $INFO_DIR + +#copy the data from each node/container + +if [ "$kenv" == "vm" ]; then + while read peer + do + if [[ -z "$peer" ]]; then + break + fi + mkdir $INFO_DIR/$peer + scp -i $id_file $user@$peer:/tmp/InstaCollection.tar.gz $INFO_DIR/$peer/InstaCollection_$peer.tar.gz & + + done < "$peers_file" +else + while read peer + do + if [[ -z "$peer" ]]; then + break + fi + mkdir $INFO_DIR/$peer + docker cp $peer:/tmp/InstaCollection.tar.gz $INFO_DIR/$peer/InstaCollection_$peer.tar.gz & + + done < "$peers_file" + +fi + +#waiting for all scp to complete +wait + +#compress the info directory +result_file=/tmp/InstaCollection_$(date +%Y%m%d%H%M).tar.gz +tar -zcf $result_file -C $INFO_DIR . +rm -r $INFO_DIR + +echo "Process complete. File generated : " $result_file diff --git a/kafka/node_collector.sh b/kafka/node_collector.sh new file mode 100755 index 0000000..b5e1285 --- /dev/null +++ b/kafka/node_collector.sh @@ -0,0 +1,331 @@ +#!/bin/bash +#!TODO: implement https://google.github.io/styleguide/shellguide.html +#==========================# +# -- Collection Options -- # +#==========================# +# -- Paths +KAFKA_HOME=/opt/kafka +BROKER_BIN_PATH=${KAFKA_HOME}/bin +BROKER_CONFIG_PATH=${KAFKA_HOME}/config +BROKER_LOG_PATH=${KAFKA_HOME}/logs +BROKER_DATA_PATHS=${KAFKA_HOME}/kafka-logs +ZOOKEEPER_CONFIG=${KAFKA_HOME}/config +ZOOKEEPER_LOG_PATH=${KAFKA_HOME}/logs + +# -- Ports +KAFKA_CLIENT_PORT=9092 +ZOOKEEPER_CLIENT_PORT=2181 + +# -- GC Logging +GC_LOGGING_ENABLED=yes +GC_LOG_PATH=${KAFKA_HOME}/logs + +# -- Arguments from CLI +unset ip +unset command_config +unset debug +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + + # IP + -ip | --ip) + ip="$2" + echo "$ip : main : Using ip=$ip" + shift + shift + ;; + + # Command Config + -c | --command-config) + command_config="$2" + shift + shift + ;; + + # Debug + -d | --debug) + debug=true + shift + ;; + + # unknown option + *) + if [ -z "$ip" ]; then + ip=$(hostname -I | tail -n -1 | tr -d [:blank:]) + fi + echo "$ip : main : WARN : ignoring unknown argument '$1'" + shift + ;; + esac +done + +# -- Calculated Defaults +if [ -z "$ip" ]; then + ip=$(hostname -I | tail -n -1 | tr -d [:blank:]) +fi + +if [ -z "$debug" ]; then + debug=false +fi + +if [ -z "$ZOOKEEPER_CONFIG" ]; then + ZOOKEEPER_CONFIG="$BROKER_CONFIG_PATH" +fi + +if [ -z "$ZOOKEEPER_CONFIG" ]; then + ZOOKEEPER_CONFIG="$BROKER_CONFIG_PATH" +fi + +if [ ! -z "$command_config" ] && [ ! -f "$command_config" ]; then + echo "$ip : main : FATAL : specified command config not found: $command_config" + exit +elif [ ! -z "$command_config" ]; then + echo "$ip : main : using command-config=$command_config" +fi + +# -- Output Paths +data_dir=/tmp/DataCollection_${ip} +data_file=$data_dir/disk.info +io_stats_file=$data_dir/io_stat.info +cpu_info=$data_dir/cpu.info +mem_info=$data_dir/mem.info +missing_files=$data_dir/missing_files.info +data_dir=/tmp/DataCollection_${ip} +data_file=$data_dir/disk.info +io_stats_file=$data_dir/io_stat.info +cpu_info=$data_dir/cpu.info +mem_info=$data_dir/mem.info +missing_files=$data_dir/missing_files.info +file_descriptor=$data_dir/file_descriptor.info +hosts_info=$data_dir/hosts_file.info +output_tar=/tmp/InstaCollection.tar.gz + + +#==========================# +# -- Functions -- # +#==========================# +copy_config_files() { + echo "$ip : ${FUNCNAME[0]} : Copying files" + local config_files=( + "$BROKER_CONFIG_PATH/server.properties" + "$BROKER_CONFIG_PATH/log4j.properties" + "$BROKER_LOG_PATH/server.log" + "$BROKER_LOG_PATH/kafkaServer.out" + "$BROKER_LOG_PATH/kafka-authorizer.log" + "$BROKER_LOG_PATH/controller.log" + "$BROKER_LOG_PATH/state-change.log" + "$BROKER_JAAS_CONFIG/kafka_server_jaas.conf" + "$ZOOKEEPER_CONFIG/zookeeper.properties" + "$ZOOKEEPER_CONFIG/zoo.cfg" + "$ZOOKEEPER_CONFIG/log4j.properties" + "$ZOOKEEPER_LOG_PATH/zoo.log" + "$ZOOKEEPER_JAAS_CONFIG/zookeeper_jaas.conf" + "$ZOOKEEPER_LOG_PATH/zookeeper.out" + ) + if [ "$GC_LOGGING_ENABLED" == "yes" ]; then + config_files+=("$GC_LOG_PATH/kafkaServer-gc.log" "$GC_LOG_PATH/zookeeper-gc.log") + fi + + for i in "${config_files[@]}"; do + # - Check if the file exists and copy + if [[ -f "$i" ]]; then + cp -nr $i* -t $data_dir + + # - Redact passwords + #!TODO: move redactions to a list & for loop + #!TODO: account for server.properties* for redactions + if [[ "$i" == *"server.properties"* ]]; then + redact_passwords "$data_dir/server.properties" + elif [[ "$i" == *"kafka_server_jaas.conf"* ]]; then + redact_passwords "$data_dir/kafka_server_jaas.conf" + elif [[ "$i" == *"zookeeper_jaas.conf"* ]]; then + redact_passwords "$data_dir/zookeeper_jaas.conf" + elif [[ "$i" == *"server.properties"* ]]; then + redact_passwords "$data_dir/server.properties" + fi + else + if [ "$debug" = true ]; then + echo "$ip : ${FUNCNAME[0]} : DEBUG : File $i not found" + fi + echo "$ip : ${FUNCNAME[0]} : File $i not found" >>$missing_files + fi + done + echo "$ip : ${FUNCNAME[0]} : Done copying files" +} + +redact_passwords() { + local input_file=$1 + echo "$ip : ${FUNCNAME[0]} : Redacting passwords from $input_file" + sed -i.bak -e 's: *password.*$:password ****:g' $input_file + rm $input_file.bak +} + +get_size_info() { + # - collects size of data directories + echo "$ip : ${FUNCNAME[0]} : Executing disk space commands" + local commands=("df -h" "du -h") + local paths=($(echo "$BROKER_DATA_PATHS" | tr ',' '\n')) + + if [ -d "$BROKER_DATA_PATHS" ]; then + for i in "${commands[@]}"; do + for j in "${paths[@]}"; do + + # check if the path exists + if [ -d "${paths[@]}" ]; then + echo "" >>$data_file + k=$(echo $i $j) + echo "$k" >>$data_file + eval $k >>$data_file + else + echo "$ip : ${FUNCNAME[0]} : ERROR : PATHNOTFOUND : ${paths[@]}" + fi + done + done + else + echo "$ip : ${FUNCNAME[0]} : ERROR: Directory does not exist: $BROKER_DATA_PATHS" + fi + echo "$ip : ${FUNCNAME[0]} : Done executing disk space commands" +} + +get_io_stats() { + if ! [ -x "$(command -v iostat)" ]; then + echo "$ip : ${FUNCNAME[0]} : Executable not found - iostat" + else + # - Collects iostat for 60 sec. please change according to requirement + echo "$ip : ${FUNCNAME[0]} : Executing iostat command" + eval timeout -sHUP 60s iostat -x -m -t -y -z 30 $io_stats_file + echo "$ip : ${FUNCNAME[0]} : Done executing iostat command" + fi +} + +get_file_descriptor() { + echo "$ip : ${FUNCNAME[0]}: Getting file descriptor count" + eval ulimit -n $file_descriptor + echo "$ip : ${FUNCNAME[0]}: Done getting file descriptor count" +} + +get_hosts() { + echo "$ip : ${FUNCNAME[0]} : Getting hosts info" + if [[ -f "/etc/hosts" ]]; then + echo "$ip : ${FUNCNAME[0]} : Getting contents of hosts file" + eval cat /etc/hosts $hosts_info + echo "$ip : ${FUNCNAME[0]} : Done getting contents of hosts file" + else + echo "$ip : ${FUNCNAME[0]} : ERROR: FILENOTFOUND /etc/hosts" + fi +} + +get_cpu_memory() { + echo "$ip : ${FUNCNAME[0]} : Getting CPU & Memory info" + if [[ -f "/proc/cpuinfo" ]]; then + echo "$ip : ${FUNCNAME[0]} : Executing cpuinfo command" + eval cat /proc/cpuinfo $cpu_info + echo "$ip : ${FUNCNAME[0]} : Done getting CPU info" + else + echo "$ip : ${FUNCNAME[0]} : ERROR : FILENOTFOUND /proc/cpuinfo" + fi + + if [[ -f "/proc/meminfo" ]]; then + echo "$ip : ${FUNCNAME[0]} : Executing cpuinfo command" + eval cat /proc/meminfo $mem_info + echo "$ip : ${FUNCNAME[0]} : Done getting memory info" + else + echo "$ip : ${FUNCNAME[0]} : ERROR : FILENOTFOUND /proc/meminfo" + fi +} + +get_kafka_cli_info() { + echo "$ip : ${FUNCNAME[0]} : Executing kafka CLI commands " + # - List of commands & filenames to save output + #!TODO: make .sh vs bin less messy + #!TODO: account for version skew with --zookeeper, etc + + local commands=( + "$BROKER_BIN_PATH/kafka-topics.sh --version" + "$BROKER_BIN_PATH/kafka-topics.sh --bootstrap-server $ip:$KAFKA_CLIENT_PORT --describe" + "$BROKER_BIN_PATH/kafka-broker-api-versions.sh --bootstrap-server $ip:$KAFKA_CLIENT_PORT" + "$BROKER_BIN_PATH/kafka-consumer-groups.sh --bootstrap-server $ip:$KAFKA_CLIENT_PORT --describe --all-groups --verbose" + ) + local filenames=( + "kafka-versions-sh" + "kafka-topics-describe-sh" + "kafka-api-versions-sh" + "consumer-groups-sh" + ) + + local commands_bin=( + "$BROKER_BIN_PATH/kafka-topics --version" + "$BROKER_BIN_PATH/kafka-topics --bootstrap-server $ip:$KAFKA_CLIENT_PORT --describe" + "$BROKER_BIN_PATH/kafka-broker-api-versions --bootstrap-server $ip:$KAFKA_CLIENT_PORT" + "$BROKER_BIN_PATH/kafka-consumer-groups --bootstrap-server $ip:$KAFKA_CLIENT_PORT --describe --all-groups --verbose" + ) + local filenames_bin=( + "kafka-versions-bin" + "kafka-topics-describe-bin" + "kafka-api-versions-bin" + "consumer-groups-bin" + ) + if [ -f "$BROKER_BIN_PATH/kafka-topics" ]; then + commands=$commands_bin + filenames=$filenames_bin + fi + + arrlen=${#commands[@]} + arrlen="$((arrlen - 1))" + for ((i = 0; i <= ${arrlen}; i++)); do + fname=${commands[i]} + fname=${fname%% *} + + thiscmd=${commands[i]} + if [[ -f "$command_config" ]]; then + thiscmd+="--command-config $command_config" + fi + + if [[ -f "${fname}" ]]; then + local cmd_file=$data_dir/${filenames[i]}.info + if [ "$debug" = true ]; then + echo "$ip : ${FUNCNAME[0]} : DEBUG: Will execute: [$thiscmd]" + fi + echo "" >>$cmd_file + eval $thiscmd >>$cmd_file + else + echo "$ip : ${FUNCNAME[0]} : ERROR : FILENOTFOUND ${fname}" + fi + done + echo "$ip : ${FUNCNAME[0]} : Done executing kafka CLI commands " +} + + +#==========================# +# -- Collect Data -- # +#==========================# +echo "$ip : main : Creating local directory for data collection: $data_dir" +# - rename already exsisting directory +mv $data_dir $data_dir_$(date +%Y%m%d%H%M) 2>/dev/null +mkdir $data_dir + +# - start execution +get_io_stats & +copy_config_files & +get_size_info & +get_kafka_cli_info & +get_cpu_memory & +get_file_descriptor & +get_hosts & + +echo "$ip : main : Waiting for collection to complete" +wait +echo "$ip : main : Collection complete" + +# -- Report missing files +if [ "$debug" = true ]; then + "$ip : main : ********************** Missing files *********************" + cat $data_dir/missing_files.info + echo "$ip : main : **********************************************************" +fi + +# - Compress the info directory +echo "$ip : main : Compressing results to $output_tar" +tar -zcf $output_tar -C $data_dir . +echo "$ip : main : Process Complete."