Skip to content

Commit

Permalink
[#53] scaledown should try a different pod if the current target is f…
Browse files Browse the repository at this point in the history
…ailing

- Now the drain.sh will try next pod if the first pod failed scaled down
- drain.sh logs are timestamped
- drain.sh logs will report time of scaledown and total message migrated.
  • Loading branch information
howardgao committed Oct 28, 2023
1 parent d5bb905 commit 5c0100e
Showing 1 changed file with 182 additions and 115 deletions.
297 changes: 182 additions & 115 deletions modules/activemq-artemis-launch/added/drain.sh
Original file line number Diff line number Diff line change
@@ -1,177 +1,244 @@
#!/bin/sh

export BROKER_HOST=`hostname -f`
function log() {
logtime=$(date)
echo "[$logtime]-[drain.sh] $1"
}

echo "[drain.sh] drainer container ip(from hostname) is $BROKER_HOST"
function get_total_messages_on_broker() {

instanceDir="${HOME}/${AMQ_NAME}"
log "get total messages on broker ${BROKER_HOST}"

ENDPOINT_NAME="${AMQ_NAME}-amq-headless"
RET_VALUE=$(curl -s -G -k http://${AMQ_USER}:${AMQ_PASSWORD}@${BROKER_HOST}:8161/console/jolokia/read/org.apache.activemq.artemis:broker=%22${AMQ_NAME}%22/AddressNames)

if [ "$HEADLESS_SVC_NAME" ]; then
ENDPOINT_NAME=$HEADLESS_SVC_NAME
fi
PYCMD=$(cat <<EOF
import sys, json
addrs = ''
value = json.load(sys.stdin)['value']
for addr in value:
addrs = addrs + ' ' + addr
print addrs
EOF
)

endpointsUrl="https://${KUBERNETES_SERVICE_HOST:-kubernetes.default.svc}:${KUBERNETES_SERVICE_PORT:-443}/api/v1/namespaces/${POD_NAMESPACE}/"
endpointsAuth="Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"
all_addresses=$(echo "$RET_VALUE" | python2 -c "$PYCMD")
log "get back all addresses $all_addresses"
arr=($all_addresses)

TOTAL_MESSAGES_ON_BROKER=0
for address in "${arr[@]}"
do
log "checking on address ${address}"
M_COUNT=$(curl -s -G -k http://${AMQ_USER}:${AMQ_PASSWORD}@${BROKER_HOST}:8161/console/jolokia/read/org.apache.activemq.artemis:broker=%22${AMQ_NAME}%22,address=%22${address}%22,component=addresses/MessageCount)
value=$(echo $M_COUNT | python2 -c "import sys, json; print json.load(sys.stdin)['value']")

TOTAL_MESSAGES_ON_BROKER=$(($TOTAL_MESSAGES_ON_BROKER + $value))
if [[ $value > 0 ]]; then
log "There are $value messages on address $address."
fi
done
}

function waitForJolokia() {
while : ;
do
sleep 5
curl -s -o /dev/null -G -k http://${AMQ_USER}:${AMQ_PASSWORD}@${BROKER_HOST}:8161/console/jolokia
if [ $? -eq 0 ]; then
log "jolokia is ready"
break
fi
done
}

export BROKER_HOST=`hostname -f`

log "drainer container host is $BROKER_HOST"

instanceDir="${HOME}/${AMQ_NAME}"

ENDPOINT_NAME="${AMQ_NAME}-amq-headless"

if [ "$HEADLESS_SVC_NAME" ]; then
ENDPOINT_NAME=$HEADLESS_SVC_NAME
fi

endpointsUrl="https://${KUBERNETES_SERVICE_HOST:-kubernetes.default.svc}:${KUBERNETES_SERVICE_PORT:-443}/api/v1/namespaces/${POD_NAMESPACE}/"
endpointsAuth="Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"

endpointsCode=$(curl -s -o /dev/null -w "%{http_code}" -G -k -H "${endpointsAuth}" ${endpointsUrl})
if [ $endpointsCode -ne 200 ]; then
echo "[drain.sh] Can't find endpoints with ips status <${endpointsCode}>"
log "can't find endpoints with ips status <${endpointsCode}>"
exit 1
fi

ENDPOINTS=$(curl -s -X GET -G -k -H "${endpointsAuth}" ${endpointsUrl}"endpoints/${ENDPOINT_NAME}")
echo "[drain.sh] $ENDPOINTS"

log "endpoints: $ENDPOINTS"

# we will find out a broker pod's fqdn name which is <pod-name>.<$HEADLESS_SVC_NAME>.<namespace>.svc.<domain-name>
# https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/
count=0
foundTarget="false"
n_endpoints=$(echo $ENDPOINTS | python2 -c "import sys, json; print len(json.load(sys.stdin)['subsets'][0]['addresses'])")
log "size of endpoints $n_endpoints"

source /opt/amq/bin/launch.sh nostart

count=-1
log "starting message migration loop"

# message migration loop
# it goes through the endpoints(pod) until the scale down is successful or all tried
while [ 1 ]; do

count=$(( count + 1 ))

if [ $count -eq $n_endpoints ]; then
log "tried all $n_endpoints endpoints, scaledown failed. Sleeping 300 seconds before exit."
sleep 300
exit 1
fi

log "attempting on endpoint: $count"

ip=$(echo $ENDPOINTS | python2 -c "import sys, json; print json.load(sys.stdin)['subsets'][0]['addresses'][${count}]['ip']")

if [ $? -ne 0 ]; then
echo "[drain.sh] Can't find ip to scale down to tried ${count} ips"
exit 1
log "can't find ip to scale down to tried ${count} ips"
continue
fi
echo "[drain.sh] got ip ${ip} broker ip is ${BROKER_HOST}"

log "got endpoint ip ${ip}"

podName=$(echo $ENDPOINTS | python2 -c "import sys, json; print json.load(sys.stdin)['subsets'][0]['addresses'][${count}]['targetRef']['name']")
if [ $? -ne 0 ]; then
echo "[drain.sh] Can't find pod name to scale down to tried ${count}"
exit 1
log "can't find pod name to scale down to tried ${count}"
continue
fi
echo "[drain.sh] got podName ${podName} broker ip is ${BROKER_HOST}"

log "got endpoint pod name ${podName}"
if [ "$podName" != "$BROKER_HOST" ]; then
# found an endpoint pod as a candidate for scaledown target
podNamespace=$(echo $ENDPOINTS | python2 -c "import sys, json; print json.load(sys.stdin)['subsets'][0]['addresses'][${count}]['targetRef']['namespace']")
if [ $? -ne 0 ]; then
echo "[drain.sh] Can't find pod namespace to scale down to tried ${count}"
exit 1
log "can't find pod namespace to scale down to tried ${count}"
continue
fi
foundTarget="true"
break
fi

count=$(( count + 1 ))
done
log "found an candidate target: $podName"

if [ "$foundTarget" == "false" ]; then
echo "[drain.sh] Can't find a target to scale down to"
exit 1
fi
IFSP=$IFS
IFS=
dnsNames=$(nslookup ${ip})

# get host name of target pod
IFSP=$IFS
IFS=
dnsNames=$(nslookup ${ip})
echo "[drain.sh] $dnsNames"

hostNamePrefix="${podName}.${HEADLESS_SVC_NAME}.${podNamespace}.svc."
echo "[drain.sh] searching hostname with prefix: $hostNamePrefix"

while read -r line
do
IFS=' ' read -ra ARRAY <<< "$line"
if [ ${#ARRAY[@]} -gt 0 ]; then
hostName=${ARRAY[-1]}
if [[ $hostName == ${hostNamePrefix}* ]]; then
# remove the last dot
case $hostName in *.) hostName=${hostName%"."};; esac
echo "[drain.sh] found hostname: $hostName"
break
log "looked up dns entries $dnsNames"

hostNamePrefix="${podName}.${HEADLESS_SVC_NAME}.${podNamespace}.svc."

log "searching hostname with prefix: $hostNamePrefix"

while read -r line
do
IFS=' ' read -ra ARRAY <<< "$line"
if [ ${#ARRAY[@]} -gt 0 ]; then
hostName=${ARRAY[-1]}
if [[ $hostName == ${hostNamePrefix}* ]]; then
# remove the last dot
case $hostName in *.) hostName=${hostName%"."};; esac
log "found target hostname: $hostName"
break
fi
fi
done <<< ${dnsNames}
IFS=$IFSP

if [ -z "$hostName" ]; then
log "can't find target host name"
continue
fi
fi
done <<< ${dnsNames}
IFS=$IFSP

if [ -z "$hostName" ]; then
echo "[drain.sh] Can't find target host name"
exit 1
fi
SCALE_TO_BROKER="${hostName}"
log "scale down target is: $SCALE_TO_BROKER"

source /opt/amq/bin/launch.sh nostart
# Add connector to the pod to scale down to
log "removing any existing scaledownconnector"
sed -i "s/<connector name=\"scaledownconnector\">.*//" ${instanceDir}/etc/broker.xml

SCALE_TO_BROKER="${hostName}"
echo "[drain.sh] scale down target is: $SCALE_TO_BROKER"
log "adding new connector"
connector="<connector name=\"scaledownconnector\">tcp:\/\/${SCALE_TO_BROKER}:61616<\/connector>"
sed -i "/<\/connectors>/ s/.*/${connector}\n&/" ${instanceDir}/etc/broker.xml

# Add connector to the pod to scale down to
connector="<connector name=\"scaledownconnector\">tcp:\/\/${SCALE_TO_BROKER}:61616<\/connector>"
sed -i "/<\/connectors>/ s/.*/${connector}\n&/" ${instanceDir}/etc/broker.xml
# Remove the acceptors
acceptor="<acceptor name=\"artemis\">tcp:\/\/${BROKER_HOST}:61616?protocols=CORE<\/acceptor>"
sed -i -ne "/<acceptors>/ {p; i $acceptor" -e ":a; n; /<\/acceptors>/ {p; b}; ba}; p" ${instanceDir}/etc/broker.xml

# Remove the acceptors
#sed -i -ne "/<acceptors>/ {p; " -e ":a; n; /<\/acceptors>/ {p; b}; ba}; p" ${instanceDir}/etc/broker.xml
acceptor="<acceptor name=\"artemis\">tcp:\/\/${BROKER_HOST}:61616?protocols=CORE<\/acceptor>"
sed -i -ne "/<acceptors>/ {p; i $acceptor" -e ":a; n; /<\/acceptors>/ {p; b}; ba}; p" ${instanceDir}/etc/broker.xml
# start the broker and issue the scaledown command to drain the messages.
log "launch the drainer broker"

#start the broker and issue the scaledown command to drain the messages.
${instanceDir}/bin/artemis-service start
${instanceDir}/bin/artemis-service start

tail -n 100 -f ${AMQ_NAME}/log/artemis.log &
#no longer log to file by default
#tail -n 100 -f ${AMQ_NAME}/log/artemis.log &

waitForJolokia
waitForJolokia

RET_CODE=`curl -G -k http://${AMQ_USER}:${AMQ_PASSWORD}@${BROKER_HOST}:8161/console/jolokia/exec/org.apache.activemq.artemis:broker=%22${AMQ_NAME}%22/scaleDown/scaledownconnector`
# calculate total messages
get_total_messages_on_broker
total_before_scaledown=$TOTAL_MESSAGES_ON_BROKER

HTTP_CODE=`echo $RET_CODE | python2 -c "import sys, json; print json.load(sys.stdin)['status']"`
log "initiating scaledown. There are $total_before_scaledown messages to be migrated"
mm_start=$(date +%s)

echo "[drain.sh] curl return code ${HTTP_CODE}"
RET_CODE=`curl -s -G -k http://${AMQ_USER}:${AMQ_PASSWORD}@${BROKER_HOST}:8161/console/jolokia/exec/org.apache.activemq.artemis:broker=%22${AMQ_NAME}%22/scaleDown/scaledownconnector`

if [ "${HTTP_CODE}" != "200" ]; then
echo "[drain.sh] scaleDown is not successful, response: $RET_CODE"
echo "[drain.sh] sleeping for 30 seconds to allow inspection before it restarts"
sleep 30
exit 1
fi
mm_end=$(date +%s)
mm_time=$(($mm_end - $mm_start))
log "scaledown finished. Time used: $mm_time"

#restart the broker to check messages
${instanceDir}/bin/artemis-service stop
if [ $? -ne 0 ]; then
echo "[drain.sh] force stopping the broker"
${instanceDir}/bin/artemis-service force-stop
fi
${instanceDir}/bin/artemis-service start
HTTP_CODE=`echo $RET_CODE | python2 -c "import sys, json; print json.load(sys.stdin)['status']"`

waitForJolokia
log "scaleDown return code ${HTTP_CODE}"

echo "[drain.sh] checking messages are all drained"
RET_VALUE=$(curl -G -k http://${AMQ_USER}:${AMQ_PASSWORD}@${BROKER_HOST}:8161/console/jolokia/read/org.apache.activemq.artemis:broker=%22${AMQ_NAME}%22/AddressNames)
if [ "${HTTP_CODE}" != "200" ]; then
log "scaleDown is not successful, response: $RET_CODE"
continue
fi

PYCMD=$(cat <<EOF
import sys, json
addrs = ''
value = json.load(sys.stdin)['value']
for addr in value:
addrs = addrs + ' ' + addr
print addrs
EOF
)
all_addresses=$(echo "$RET_VALUE" | python2 -c "$PYCMD")
arr=($all_addresses)
for address in "${arr[@]}"
do
echo "[drain.sh] checking on address ${address}"
M_COUNT=$(curl -G -k http://${AMQ_USER}:${AMQ_PASSWORD}@${BROKER_HOST}:8161/console/jolokia/read/org.apache.activemq.artemis:broker=%22${AMQ_NAME}%22,address=%22${address}%22,component=addresses/MessageCount)
value=$(echo $M_COUNT | python2 -c "import sys, json; print json.load(sys.stdin)['value']")
if [[ $value > 0 ]]; then
echo "[drain.sh] scaledown not complete. There are $value messages on address $address"
log "restart broker to check messages"
${instanceDir}/bin/artemis-service stop
exit 1
if [ $? -ne 0 ]; then
log "force stopping the broker"
${instanceDir}/bin/artemis-service force-stop
fi

${instanceDir}/bin/artemis-service start

waitForJolokia

log "checking messages are all drained"

scaleDownSuccessful="true"
get_total_messages_on_broker
total_after_scaledown=$TOTAL_MESSAGES_ON_BROKER

log "messages left after scaledown: $total_after_scaledown"

if [ $total_after_scaledown -ne 0 ]; then
scaleDownSuccessful="false"
fi
message_migrated=$(($total_before_scaledown - $total_after_scaledown))

log "stopping the broker"
${instanceDir}/bin/artemis-service stop
if [ $? -ne 0 ]; then
${instanceDir}/bin/artemis-service force-stop
fi

if [ $scaleDownSuccessful == "true" ]; then
log "scaledown is successful, total messages migrated: $message_migrated"
exit 0
fi
log "scaledown not successful, messages left: $total_after_scaledown"
fi
done
echo "[drain.sh] scaledown is successful"
${instanceDir}/bin/artemis-service stop
if [ $? -ne 0 ]; then
${instanceDir}/bin/artemis-service force-stop
fi
exit 0

#this shouldn't happen, return 1 to let operator retry
exit 1

0 comments on commit 5c0100e

Please sign in to comment.