Automatic Machine Learning at Scale: H2O AutoML

May 11,2018 | Engineering

For supervised machine learning tasks, AutoML can significantly speed up your time to deployment and tighten the feedback loop for feature experimentation. To make sure the tools perform reasonably well, we test H2O AutoML against a popular benchmark of commonly used machine learning algorithms using both a single virtual machine and a cluster of four such machines running Apache Spark on AWS ElasticMapReduce (EMR). Once you know how to use AutoML, you might want to think about where to leverage it so as to provide the most value. At the end of the post, we’ll give you some insight into how we think about these tools at IKASI. For now, here’s how to get up and running.

The Takeaway

Automatic Machine Learning (AutoML) tools allow you to benefit from the predictive power of stacked model ensembles with just a table of data and the name of the column you want to predict. With H2O.ai Sparkling Water, these tools have reached the point where they can handle building one-off models on any amount of data.  You can use AutoML to:

  1. Start with a good model as a baseline
  2. See how different kinds of models perform on your dataset
  3. Avoid common pitfalls of machine learning

how to: single machine

See Appendix: How to provision a VM in AWS EC2, or ssh into an existing VM running linux. When you have access to a VM, follow these instructions to install H2O with python:

Each step can be created as follows:

sudo python -m pip install --upgrade pip
sudo python -m pip install --upgrade requests future colorama scikit-learn tabulate
sudo python -m pip install -f https://h2o-release.s3.amazonaws.com/h2o/latest_stable_Py.html h2o

Now you can download the data and the python2 script we used to perform the benchmark.

wget https://s3.amazonaws.com/benchm-ml--main/train-10m.csv
wget https://s3.amazonaws.com/benchm-ml--main/test.csv
wget https://emr-h2o-automl-benchmark.s3.amazonaws.com/automl_run_test_local.py -O run_test.py

python run_test.py

how to: cluster

Create an EMR cluster using the AWS Management Console to make sure the default roles are created. Once that’s done you can terminate it. Add ssh access for your ip address to the inbound rules for the EMR master security group in AWS.

From your local machine, once your command-line aws configuration is set up:

wget https://emr-h2o-automl-benchmark.s3.amazonaws.com/automl_run_test_emr.py -O run_test_emr.py

This script will create a cluster, or add steps to an existing cluster. Edit run_test_emr.py before running it to set up your preferences and AWS account-specific information at the top of the file. It adds three steps for each modeling job, although the first step could be run as a bootstrap action instead once the modeling code is stable.

  1. Copy the modeling code (model_script.py) from S3 to the EMR cluster driver
  2. Use spark-submit to run the main step
    • a. Use spark.yarn.maxAppAttempts=1 to disable retries
    • b. Use ‘cluster’ deploy mode and ‘yarn’ master
    • c. model_script.py is in python2
  3. Copy the model results from the cluster’s hadoop filesystem to S3

Once you’ve set the appropriate variables at the top of run_test_emr.py, run the script:

python3 -m pip install --upgrade boto3
python3 run_test.py

 

example data

 

Month DayofMonth DayOfWeek DepTime UniqueCarrier Origin Dest Distance dep_delayed_15min
c-8 c-21 c-7 1934 AA ATL DFW 732 N
c-4 c-20 c-3 1548 US PIT MCO 834 N
c-9 c-2 c-5 1422 XE RDU CLE 416 N
c-11 c-25 c-6 1015 OO DEN MEM 872 N
c-10 c-7 c-6 1828 WN MDW OMA 423 Y

results

We trained H2O AutoML on a single virtual machine (r4.8xlarge) using ten million rows of data from the szilard benchmark. The data consists of eight input columns (six categorical, two numeric) and one output column (dep_delayed_15min). The categorical columns are of low cardinality, and there is no missing data. With a requested stopping time of six thousand seconds (one hour and forty minutes), H2O AutoML achieved an average Area Under the Curve (AUC) of 79.85 over three runs with its best-of-family ensemble. For comparison, on the benchmark the best linear model achieved an AUC of 71.1, the best Random Forest (RF), 77.8, and the best Gradient Boosting Machine (GBM), 78.7 in a similar amount of time.

On a cluster of four machines in Amazon Elastic MapReduce (EMR), the Sparkling Water version managed an AUC of 78.01, also averaged over three runs. When given four hours, Sparkling Water was able to achieve an AUC of 80.9 in a single run. The best result in the benchmark was an AUC of 81.2, achieved by training a GBM for more than nine hours. This is the power of stacked ensembles, models that combine the predictions of disparate machine learning models to do better than any one model by itself.

If your data has fewer than one hundred million rows, we’d recommend using single-machine H2O AutoML on a memory-optimized EC2 instance. Otherwise, or if you have existing Spark infrastructure, the Sparkling Water version still performs quite well, and there are other options available to those with more machine learning expertise. When using Spark, you may need to specify the data types manually. For the purposes of the benchmark, we had to convert string columns to enums using asfactor.

Single Machine (r4.8xlarge) – 32 virtual CPUs, 244 GiB memory, $2.128 per hour
max_runtime_secs: 6000 seconds (1 hr 40 mins)
actual runtime: 6946.8 seconds, 6859.2 seconds, 6868.7 seconds
Leaderboard-1, Leaderboard-2, Leaderboard-3
Best of Family AUC: 0.800351, 0.796376, 0.798908
Best of Family AUC sample mean: 0.7985, sample variance: 4.049e-06

Cluster – four r4.8xlarge workers, one m4.4xlarge master
max_runtime_secs: 6000  (1 hour 40 mins)
actual runtime: 6362.7 seconds, 6316.5 seconds. 6439.1 seconds
step runtime:  1 hour 51 min, 1 hour 50 min, 1 hour 52 min
Leaderboard-1, Leaderboard-2, Leaderboard-3
Best of Family AUC: 0.77937, 0.780583, 0.780254
Best of Family AUC sample mean: 0.7801, sample variance: 3.935e-07

Leaderboard-4
max_runtime_secs: 12000  (3 hour 20 mins)
actual runtime: 12646.8 seconds
step runtime:  3 hours 36 min
Best of Family AUC: 0.808793

Without access to the test set, we would have chosen StackedEnsemble_BestOfFamily or StackedEnsemble_AllModels, but sometimes they do slightly worse on the test set when compared to a particular GBM (Gradient Boosting Machine) or DRF (Distributed Random Forest). See this video by H2O for more on their implementations of GBMs and DRFs. The best-of-family ensemble model would include only the best models of each kind in the final ensemble. Extremely Random Trees (XRT), Neural Networks (DeepLearning), and Generalized Linear Models (GLM) are more examples of the kinds of model generated by H2O AutoML.

Conclusion

Until now, various implementations of the Random Forest algorithm have been the most user-friendly machine learning options. With the release of H2O AutoML, this is no longer the case. From an engineering standpoint, it is a huge advantage to be able to provide a training set, a test set (optional), and a column to predict, with no need to set any parameter other than how long you want the job to take. In addition, the output is considerably more helpful as a baseline than a linear model or any model alone. After feature engineering, machine learning is in practice about finding the best algorithm for the job. You need to test a panel of algorithms after each change to the input data, and H2O AutoML makes it possible at any scale.

At IKASI, we use AutoML as a sanity check, not a cure-all. It can’t find features for you, but it can tell you which ones are useful to your model. Don’t ignore the human context: define your objectives around your business, not just metrics like AUC. Finally, AutoML is a great way to avoid the pitfalls of Machine Learning. Data leakage for example, is commonly found when your model achieves perfect results on the test set. As usual, if it looks too good to be true, it probably is.

Michael Lauria

Developer, IKASI

appendix

Create a keypair (on a mac, using ssh-keygen -t rsa), perhaps choose ~/.ssh/emr_test_rsa for your private key location, and ~/.ssh/emr_test_rsa.pub for your public key location. Then use aws ec2 import-key-pair --key-name emr_test_rsa --public-key-material file://~/.ssh/emr_test_rsa.pub to import your key pair.

Create a launch template, replacing “subnet-1234a56b” with your own subnet:

1
2
3
export MY_SUBNET="subnet-1234a56b"; aws s3 cp s3://emr-h2o-automl-benchmark/launch-template-single-new.json - | sed "s/MY_SUBNET/$MY_SUBNET/" > launch_template.json

aws ec2 create-launch-template --launch-template-name automl_test --launch-template-data "$(cat launch_template.json | tr -d '[:space:]')"

 

Create an EC2 instance in AWS by choosing “Launch instance from template” from the console.

Choose the first version of the template, then scroll down and launch the instance. Once your instance launches, you can add your IP address to the security group Inbound Rules and use the public IP to ssh to your VM and install H2O. Select the instance in your console and you’ll find the public IP. Click the default security group to edit the inbound rules and add your IP address.

Next ssh to the VM using ssh -i "~/.ssh/emr_test_rsa" ec2-user@<your-public-ip-address> and replace the ip address with the one for your VM.

emr-spec.json.template

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
{
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "LogUri": "${EMR_LOG_URI}",
    "Instances": {
        "EmrManagedMasterSecurityGroup": "${EMR_MASTER_SG}",
        "KeepJobFlowAliveWhenNoSteps": true,
        "EmrManagedSlaveSecurityGroup": "${EMR_WORKER_SG}",
        "InstanceGroups": [
            {
                "InstanceType": "${EMR_MASTER_INSTANCE_COUNT}",
                "InstanceType": "${EMR_MASTER_INSTANCE_TYPE}",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "Name": "master_instance_group"
            },
            {
                "InstanceType": "${EMR_WORKER_INSTANCE_COUNT}",
                "InstanceType": "${EMR_WORKER_INSTANCE_TYPE}",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "Name": "core_instance_group"
            }
        ],
        "TerminationProtected": false,
        "Ec2SubnetId": "${EMR_EC2_SUBNET}",
        "Ec2KeyName": "${EMR_KEY_NAME}"
    },
    "BootstrapActions": [
        {
            "ScriptBootstrapAction": {
                "Path":
                    "s3://support.elasticmapreduce/spark/maximize-spark-default-config"
            },
            "Name": "Maximize Spark Default Config"
        },
        {
            "ScriptBootstrapAction": {
                "Path":
                    "s3://emr-h2o-automl-benchmark/bootstrap-action-download-sparkling"
            },
            "Name": "Get Sparkling Water"
        }
    ],
    "ReleaseLabel": "emr-5.12.0",
    "Steps": [
        {
            "HadoopJarStep": {
                "Args": ["state-pusher-script"],
                "Jar": "command-runner.jar"
            },
            "ActionOnFailure": "TERMINATE_CLUSTER",
            "Name": "Setup Debugging"
        }
    ],
    "Applications": [
        {
            "Name": "Ganglia"
        },
        {
            "Name": "Spark"
        },
        {
            "Name": "Hadoop"
        }
    ],
    "Configurations": [
        {
            "Classification": "spark",
            "Properties": {
                "maximizeResourceAllocation": "true"
            }
        },
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.dynamicAllocation.enabled": "false",
                "spark.serializer":
                    "org.apache.spark.serializer.KryoSerializer",
                "spark.scheduler.minRegisteredResourcesRatio": "1"
            }
        }
    ],
    "ServiceRole": "EMR_DefaultRole",
    "VisibleToAllUsers": true,
    "Name": "Sparkling Water Test Cluster"
}
 automl_run_test_emr.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import json
import string
import boto3

### Start config
# Use these
S3_BUCKET = 'emr-h2o-automl-benchmark'
MODEL_SCRIPT_KEY = 'model_script.py'
EMR_SPEC_KEY = 'emr-spec.json.template'

# Possibly use these
AWS_REGION = 'us-west-2'
EMR_MASTER_INSTANCE_COUNT = 1
EMR_MASTER_INSTANCE_TYPE = 'm4.4xlarge'
EMR_WORKER_INSTANCE_COUNT = 4
EMR_WORKER_INSTANCE_TYPE = 'r4.8xlarge'

# Change these
EMR_LOG_URI = 's3://emr-h2o-automl-benchmark/emr-logs/'
EMR_KEY_NAME = 'emr_test_rsa'
EMR_EC2_SUBNET = 'subnet-56c1661f'
EMR_MASTER_SG = 'sg-1275526a'
EMR_WORKER_SG = 'sg-1375526b'
### End config

s3 = boto3.client('s3', region_name=AWS_REGION)
emr = boto3.client('emr', region_name=AWS_REGION)


def get_emr_spec():
    tmp_key = '/tmp/' + EMR_SPEC_KEY
    s3.download_file(S3_BUCKET, EMR_SPEC_KEY, tmp_key)
    with open(tmp_key, 'r') as f:
        emr_spec_template = string.Template(f.read())
        json_spec = emr_spec_template.substitute(
            dict(
                EMR_LOG_URI=EMR_LOG_URI,
                EMR_KEY_NAME=EMR_KEY_NAME,
                EMR_EC2_SUBNET=EMR_EC2_SUBNET,
                EMR_MASTER_SG=EMR_MASTER_SG,
                EMR_MASTER_INSTANCE_COUNT=EMR_MASTER_INSTANCE_COUNT,
                EMR_MASTER_INSTANCE_TYPE=EMR_MASTER_INSTANCE_TYPE,
                EMR_WORKER_SG=EMR_WORKER_SG,
                EMR_WORKER_INSTANCE_COUNT=EMR_WORKER_INSTANCE_COUNT,
                EMR_WORKER_INSTANCE_TYPE=EMR_WORKER_INSTANCE_TYPE))
        emr_spec = json.loads(json_spec)
    return emr_spec


def create_emr():
    spec = get_emr_spec()
    resp = emr.run_job_flow(**spec)
    return resp['JobFlowId']


def get_spark_script_step(step_name, script_path):
    return {
        'Name': step_name,
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', script_path, '/home/hadoop/main.py']
        }
    }


def get_main_step(step_name, train_data_path, test_data_path, target_column,
                  timeout):
    return {
        'Name': step_name,
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar':
            'command-runner.jar',
            'Args': [
                'spark-submit',
                '--deploy-mode',
                'cluster',
                '--master',
                'yarn',
                '--packages',
                'ai.h2o:sparkling-water-core_2.11:2.2.10',
                '--conf',
                'spark.yarn.maxAppAttempts=1',
                '/home/hadoop/main.py',
                train_data_path,
                test_data_path,
                target_column,
                timeout,
            ]
        }
    }


def get_s3_copy_step(step_name, output_path):
    return {
        'Name': step_name,
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args':
            ['s3-dist-cp', '--src', 'hdfs:///model', '--dest', output_path]
        }
    }


def add_steps(cluster_id, steps):
    resp = emr.add_job_flow_steps(JobFlowId=cluster_id, Steps=steps)
    print(resp)


def get_best_cluster(cluster_ids, pending_threshold):
    if not cluster_ids:
        return None
    cluster_info = []
    for cluster_id in cluster_ids:
        pending_steps = emr.list_steps(
            ClusterId=cluster_id, StepStates=['PENDING'])['Steps']
        cluster_info.append((cluster_id, len(pending_steps)))
    best_cluster_id, pending_count = min(cluster_info, key=lambda x: x[1])
    print([best_cluster_id, pending_count])
    return None if pending_count > pending_threshold else best_cluster_id


def get_or_create_emr():
    resp = emr.list_clusters(ClusterStates=['RUNNING', 'WAITING'])
    clusters = resp['Clusters']
    print(clusters)
    cluster_ids = [
        cluster['Id'] for cluster in clusters
        if cluster['Name'] == 'Sparkling Water Test Cluster'
    ]
    cluster_id = get_best_cluster(cluster_ids, pending_threshold=4)
    if cluster_id is None:
        cluster_id = create_emr()
    return cluster_id


def run_job(timeout, train_data_path, test_data_path, output_path,
            target_column, script_path):
    cluster_id = get_or_create_emr()
    steps = [
        get_spark_script_step('Get Spark Script', script_path),
        get_main_step('Run AutoML', train_data_path, test_data_path,
                      target_column, timeout),
        get_s3_copy_step('Copy Model', output_path)
    ]
    add_steps(cluster_id, steps)


def main():
    timeout_seconds = '6000'
    train_file = 'train-10m'
    train_data_path = 's3://benchm-ml--main/{}.csv'.format(train_file)
    test_data_path = 's3://benchm-ml--main/test.csv'
    target_column = 'dep_delayed_15min'
    output_path = 's3://{}/output/test-{}-{}'.format(
        S3_BUCKET, timeout_seconds, train_file)
    script_path = 's3://{}/{}'.format(S3_BUCKET, MODEL_SCRIPT_KEY)
    run_job(
        timeout=timeout_seconds,
        train_data_path=train_data_path,
        test_data_path=test_data_path,
        output_path=output_path,
        target_column=target_column,
        script_path=script_path)


if __name__ == '__main__':
    main()
model.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
from pyspark.sql import SparkSession
import h2o
from h2o.automl import H2OAutoML
from pysparkling import *
import traceback
import timeit
import sys

spark = SparkSession.builder.appName('SparklingWaterApp').getOrCreate()
spark_conf = spark.sparkContext.getConf().getAll()
for item in spark_conf:
    print item
hc = H2OContext.getOrCreate(spark)
print hc

print sys.argv
if len(sys.argv) == 5 and all(x.startswith('s3://') for x in sys.argv[1:3]):
    s3_path_train = sys.argv[1]
    s3_path_test = sys.argv[2]
    target_column = sys.argv[3]
    timeout = int(sys.argv[4])
else:
    h2o.cluster().shutdown(prompt=False)
    spark.stop()
    raise Exception('Path in s3:// must be specified')

print 'ATTEMPT DF LOAD'
start_time = timeit.default_timer()
train_df = spark.read.load(
    s3_path_train,
    format='com.databricks.spark.csv',
    header='true',
    delimiter=',',
    inferSchema='true')
test_df = spark.read.load(
    s3_path_test,
    format='com.databricks.spark.csv',
    header='true',
    delimiter=',',
    inferSchema='true')
elapsed = timeit.default_timer() - start_time
print 'FINISH DF LOAD'
print 'Time taken: %f seconds' % elapsed

# H2O
h2o_train_df = hc.as_h2o_frame(train_df)
h2o_test_df = hc.as_h2o_frame(test_df)

# Identify predictors and response
x = h2o_train_df.columns
y = target_column

print h2o_train_df.types
print h2o_test_df.types

print 'ATTEMPT H2O MODEL BUILD'
start_time = timeit.default_timer()

# Transform all string columns to categoricals, or you could specify columns as in
# https://github.com/h2oai/sparkling-water/blob/master/py/examples/scripts/ChicagoCrimeDemo.py
for col in h2o_train_df.types:
    if h2o_train_df.types[col] == 'string':
        h2o_train_df[col] = h2o_train_df[col].asfactor()
        h2o_test_df[col] = h2o_test_df[col].asfactor()

print h2o_train_df.types
print h2o_test_df.types

# For binary classification, response should be a factor
h2o_train_df[y] = h2o_train_df[y].asfactor()
h2o_test_df[y] = h2o_test_df[y].asfactor()

aml = H2OAutoML(max_runtime_secs=timeout)
aml.train(x=x, y=y, training_frame=h2o_train_df, leaderboard_frame=h2o_test_df)

elapsed = timeit.default_timer() - start_time
print 'FINISH H2O MODEL BUILD'
print 'Time taken: %f seconds' % elapsed

model_dir = 'hdfs:///model'
model_path = h2o.save_model(model=aml.leader, path=model_dir, force=True)
print model_path

h2o.export_file(
    frame=aml.leaderboard,
    path=model_dir + '/leaderboard',
    force=True,
    parts=1)

try:
    mojo_path = aml.leader.save_mojo(path=model_path + '.mojo', force=True)
except Exception:
    print 'Export to MOJO not supported. Exception Handled:'
    print '------'
    traceback.print_exc()
    print '------'
else:
    print mojo_path

details_path = aml.leader.save_model_details(
    path=model_path + '_details', force=True)
print details_path

h2o.cluster().shutdown(prompt=False)
spark.stop()

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *

Share This