Hadoop upgrades over the last few years meant long outages where the Big Data platform team would shutdown the cluster, perform the upgrade, start services and then complete validation before notifying users it was ok to resume activity. This approach is a typical pattern for major upgrades even outside Target and reduces the complexity and risks associated with the upgrade. While this worked great for the platform team, it was not ideal for the hundreds of users and thousands of jobs that were dependent on the platform. That is why we decided to shake things up and go all in for rolling maintenance.

Cluster Details:

  1. Hadoop (Core) 2.7.1 to 2.7.3
  2. Mixed cluster workload running Hive, Tez, MR, Spark, Pig, HBase, Storm

Goal

Our March 2016 Hadoop upgrade was the turning point for rolling maintenance. With a large outage for the upgrade and monthly maintenance windows leading up to it, we decided to challenge ourselves with rolling maintenance to reach our uptime goals for core components. This would allow the platform team to deploy changes faster, reduce maintenance risk by not bundling changes together every month and more importantly not impact users with planned downtime. Drawing the line in the sand for rolling maintenance meant that it was time to get to work on our upgrade strategy.

Reviewing the Playbook

We started reviewing the upgrade process in November 2016 with the goal to upgrade our first admin cluster in December. The short turnaround time meant that we would leverage our cluster administration tool to handle the upgrade orchestration. The focus shifted towards understanding the order of events and evaluating the process for potential impacts. Running through the upgrades required a quick way to iterate through Hadoop cluster deployments and then tearing them down to retest. This is where REDStack, which is our internal cloud provisioning tool to build out a secure Hadoop cluster with production like configurations, came into play. After going through the rolling upgrade process a few times, we knew right away there were a few areas we needed to spend time on.

Modifying the Game Plan

The first adjustment we made was to minimize the HiveServer2 (HS2) port change disruption. Having that restart under a new port and run that way for multiple days while the upgrade finished would have caused a major impact for a lot of applications. What we did was modify the stack upgrade XML file so we could pause the upgrade to shutdown HS2 instances right before the HS2 binary upgrade and restart. We could then resume the upgrade process to start them back up under the new version on the same ports. This created a second grouping for Apache Hive components in order to bring our HS2 impact down from days to a few minutes for a quick restart.

Hive Modifications:

   <group name="HIVE" title="Hive">
      <skippable>true</skippable>
      <service-check>false</service-check>
      <supports-auto-skip-failure>false</supports-auto-skip-failure>
      <service name="HIVE">
        <component>HIVE_METASTORE</component>
        <component>WEBHCAT_SERVER</component>
      </service>
    </group>

    <group name="HIVE" title="HiveServer2">
        <skippable>true</skippable>
        <service-check>true</service-check>
        <supports-auto-skip-failure>false</supports-auto-skip-failure>
        <service name="HIVE">
            <component>HIVE_SERVER</component>
        </service>
    </group>

Scouting the Opponent

With confidence increasing, the next item we needed to address was how end user jobs would react during the upgrade knowing we had hundreds of servers to roll through. This required us to dig into the distributed cache settings where we realized we had a couple options to set the MapReduce classpath:

  1. Include an archive that contains the MapReduce, Yarn, HDFS and Hadoop common jars and dependencies
  2. Include an archive that just contains the MapReduce jars and force it to source the remaining dependencies from your local filesystem on each node that has your Hadoop software

Going with the first option meant that you would see a reference to ‘$PWD/mr-framework/hadoop’ in your mapreduce.application.classpath. This referenced a path to the jars and since the jars were readable by all, the localizer will run inside the address space of the YARN NodeManager to download and not in a separate process (private/application specific). To configure this, we modified the following properties:

  1. mapreduce.application.framework.path - Path to the MapReduce tar, referenced by the current version that is active on each node
/hadoop/apps/${version}/mapreduce/mapreduce.tar.gz#mr-framework
  1. mapreduce.application.classpath - MR Application classpath which references the framework path above
$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hadoop/${version}/hadoop/lib/hadoop-lzo-0.6.0.${version}.jar:/etc/hadoop/conf/secure

After these two settings were in place, they were pushed out with a rolling restart to make sure the mapreduce-site.xml files were updated and were ready for testing. During the Core Master phase for the rolling upgrade, the restart of Mapreduce2/HistoryServer2 step caused an upload of the latest tarball referenced above. The following output was observed which shows the HDFS upload:

2016-12-29 16:05:37,076 - Called copy_to_hdfs tarball: mapreduce
2016-12-29 16:05:37,076 - Default version is 2.7.1
2016-12-29 16:05:37,077 - Because this is a Stack Upgrade, will use version 2.7.3
2016-12-29 16:05:37,077 - Source file: /hadoop/2.7.3/mapreduce.tar.gz , Dest file in HDFS: /hadoop/2.7.3/mapreduce/mapreduce.tar.gz

To make sure our classpath is setup appropriately we started a test job to validate. From the Resource Manager UI the AppMaster logs were searched for, ‘org.mortbay.log: Extract jar’. This resulted in a similar entry as the following:

2016-12-29 17:07:17,216 INFO [main] org.mortbay.log: Extract jar:file:/grid/0/hadoop/yarn/local/filecache/10/mapreduce.tar.gz/hadoop/share/hadoop/yarn/hadoop-yarn-common-2.7.1.jar!/webapps/mapreduce to /grid/0/hadoop/yarn/local/usercache/matt/appcache/application_1482271499284_0001/container_e06_1482271499284_0001_01_000001/tmp/Jetty_0_0_0_0_46279_mapreduce____.jzpsrd/webapp

During the rolling upgrade our admin tool used the active version based on the node upgrades. From a timing standpoint it held on until the last core service was upgraded, which happened to be Apache Oozie which then ran the client refresh. After that, the active version becomes 2.7.3 which allowed new jobs submitted to run under the new version. All existing jobs ran to completion using 2.7.1.

Final Practice

With our prep work completed and dozens of cloud cluster iterations for practice, we were ready to run through our admin cluster. The admin cluster upgrade went well but had a stumbling point with the Hive metastore during the restart. After identifying the issue, we realized we need a fast way to compare configurations across our clusters to proactively identify differences that may cause more problems. We threw together a quick Python script to help parse configurations from our admin tool’s API, compare values between clusters and write out configurations that did not match.

Example API Code:

# Get current desired configurations from source cluster
def get_source_configurations():
    print "Gathering configuration files for source cluster..."
    try:
        source_cluster_URL = "http://" + source_cluster_host + ":" + source_cluster_port + "/api/v1/clusters/" \
                        + source_cluster + "?fields=Clusters/desired_configs"
        response = requests.get(source_cluster_URL, auth=HTTPBasicAuth(source_user_name, source_user_passwd))
        json_data = json.loads(response.text)
        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        print err
        sys.exit(1)

# Create dictionary of config file names/versions for source cluster
def get_source_version_details():
    print "Gathering version details for configuration files for source cluster..."
    source_desired_confs = {}
    for config_file in json_data['Clusters']['desired_configs']:
        source_desired_confs[config_file] = json_data['Clusters']['desired_configs'][config_file]['tag']

After running the configuration checks between active and new cloud clusters we identified a handful of properties that needed to be updated to help us avoid issues with the remaining clusters. With the changes in place we were ready for the stage cluster.

With dozens of hosts this would be our first true test to a new upgrade process with end user jobs. The stage cluster upgrade avoided the same issues as the admin cluster and resulted in hundreds of jobs running successfully for the six hour upgrade window. Even though the process went well, we still noticed an area of opportunity to help reduce Apache Storm downtime. Stopping all running topologies resulted in an additional 10-15 minute delay as we relied on manually killing them. To get Storm downtime under our goal of 10 minutes (full restart required due to packaging updates), we leveraged the Storm API to identify running topologies and then kill them.

Example Storm API Code:

# Get topology list
def get_topologies():
    print "Gathering running topologies..."
    try:
        storm_url = "http://" + storm_ui_host + ":" + storm_ui_port + "/api/v1/topology/summary"
        response = requests.get(storm_url)
        data = json.loads(response.text)
        response.raise_for_status()
        return data
    except requests.exceptions.HTTPError as err:
        print err
        sys.exit(1)

Now that we could identify and kill topologies within a minute or so, we knew we were ready for the big game.

Slow Start to the First Half

The prior two cluster upgrades were good tests, but did not come close to the scale of our production cluster. With hundreds of nodes, 10’s of PB of raw HDFS storage and complex user jobs across multiple components, we knew we were in for a challenge. Based on the admin and stage cluster upgrades we put together estimates that had us completing the upgrade in around 44 hours. With the team assembled in a war room we were ready to start, or so we thought. The size of the upgrade caused issues with our admin tool generating the upgrade plan which resulted in a 5 hour delay while we recovered the database and fixed the initial glitch. By the time we got into the rolling restarts for slave components (HDFS DataNodes, Yarn NodeManagers, HBase RegionServers) we were already behind. After watching the first handful of sequential restarts we quickly realized our goal of 44 hours was looking a lot more like 143 hours putting us days behind. Outside of the delays we also ran into a bug with DataNodes failing to restart. With a new DataNode service restart every five minutes, the risk for data loss if multiple nodes were down was the primary concern going into the night. To reduce the risk and help proactively restart, a quick bash script was put together and scheduled in cron to help catch nodes that had failed.

Example Bash Script:

check_wait_restart() {
  echo "Checking Datanode..."
  PID=$(pgrep -u hdfs -f DataNode | head -1)
  if [ ! -z "${PID}" ]; then
    echo "Datanode is running."
    return 0
  fi

  echo "Datanode is stopped, sleeping..."
  sleep 120

  echo "Checking Datanode..."
  PID=$(pgrep -u hdfs -f DataNode | head -1)
  if [ ! -z "${PID}" ]; then
    echo "Datanode is running."
    return 0
  fi

  echo "Datanode is still down after 2 minutes, restarting..."
  export HADOOP_LIBEXEC_DIR=/usr/hadoop/2.7.3/hadoop/libexec
  /usr/hadoop/2.7.3/hadoop/sbin/hadoop-daemon.sh --config /usr/hadoop/2.7.3/hadoop/conf start datanode
}

Halftime Motivation

Running days behind and the risk of HDFS storage continuing to grow (10% jump in the first day), we knew we needed to speed things up. We identified two paths forward, one relying on a patch from our vendor’s engineering team to get us through the slave restarts faster (query optimization) and then the second looking at Chef to help complete the restarts. Work started on both immediately with the Target team jumping into the Chef work and knocking out templates for services and adding recipes to duplicate the admin tool logic. After a couple of hours news came that an admin tool hotfix was ready and the end result dropped our slave restart time by 75%, putting us back on the original forecasts (60 seconds per node). Focus shifted from the slave restarts over to the client refreshes, which were also sequential tasks by node. We determined that we would manage the client refreshes outside the admin tool, so we could complete within hours to get us back on schedule.

Second Half Comeback

Days behind with the client refreshes coming up meant taking a risk to get back into the game. Shell scripts were developed to handle the logic of checking/creating configuration directories, gathering and deploying service files and updating the active versions to cutover to the new version.

Example Bash Script:

# verify that conf dirs are in correct places
for svc in hadoop zookeeper tez sqoop pig oozie hive hive-hcatalog hbase ; do

  # Creating Conf Directories
  if [ ! -d "/etc/${svc}/${STACK_VERSION}/${CONF_VERSION}" ]; then
    echo "Creating ${STACK_VERSION} conf dir /etc/${svc}/${STACK_VERSION}/${CONF_VERSION}..."
    /usr/bin/conf-select create-conf-dir --package "${svc}" --stack-version ${STACK_VERSION} --conf-version ${CONF_VERSION} || :
  fi

   /usr/bin/conf-select set-conf-dir --package "${svc}" --stack-version ${STACK_VERSION} --conf-version ${CONF_VERSION} || :
done

After some quick sanity checks in our lower environment we determined that we were ready. The admin tool was paused and iterations of deploying client configurations were started. The workaround went great and client refresh times were reduced by 95%, down to four hours from updated estimate of 67 hours. Finalizing the upgrade afterwards was the whistle ending the marathon event.

Instant Classic

Completing the largest single production cluster rolling upgrade in the history of one of the largest Hadoop vendors with 99.99% uptime was a great team accomplishment (internal and external). The months of preparation that went into the upgrade and the game time decisions throughout paid off. Our final duration clocked in at just over 41.5 hours, hours ahead of our goal time and days ahead of the modified duration if no action was taken. During this time, we had thousands of jobs running, supporting multiple key initiatives and influencing business decisions. The engineering talent on the team was the ultimate reason for our success. The right mix of skills and personalities allowed us to react throughout the entire event. The other important lesson learned that was mentioned above but deserves more focus, is that a rolling upgrade takes time. While HDFS is being upgraded blocks of data will not be deleted until the upgrade is completed and HDFS is finalized. Make sure you have sufficient HDFS storage to avoid the risk of running out of space during your upgrade. With our major upgrade out of the way, we are excited to focus on the next chapter and continue to raise the bar.

About the Author

Matthew Sharp is a Principal Data Engineer on the Big Data team at Target. He has been focused on Hadoop and the Big Data ecosystem for the last 5+ years. Target’s Big Data team has full stack ownership for multiple open source technologies. The team manages hundreds of servers with 10’s of PBs of data using an advanced CI/CD pipeline to automate every change. Stay tuned for future team updates to share how we do Big Data at Target!