-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathhadoop-ec2
executable file
·296 lines (261 loc) · 12.9 KB
/
hadoop-ec2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#!/usr/bin/env python2.5 # -*- python -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import ConfigParser
from hadoop.ec2.commands import *
from hadoop.ec2.cluster import Cluster
from hadoop.ec2.storage import create_formatted_snapshot
from hadoop.ec2.storage import Storage
from hadoop.ec2.util import merge_config_with_options
from hadoop.ec2.util import xstr
import logging
from optparse import OptionParser
from optparse import make_option
import os
import subprocess
import sys
version_file = os.path.join(sys.path[0], "VERSION.txt")
VERSION = open(version_file, "r").read().strip()
LAUNCH_OPTIONS = [
make_option("-a", "--ami", metavar="AMI",
help="The AMI ID of the image to launch."),
# Beware: --env can only be specified on the command line,
# not in the .cfg file because we don't handle array types
# well. TODO(philip): A better approach here?
make_option("-e", "--env", metavar="ENV", action="append",
help="An environment variable to pass to instances. (May be specified multiple times.)"),
make_option("-f", "--user-data-file", metavar="DATA-FILE",
help="The file containing user data to be made available to instances."),
make_option("-k", "--key-name", metavar="KEY-PAIR",
help="The key pair to use when launching instances."),
make_option("-p", "--user-packages", metavar="PACKAGES",
help="A space-separated list of packages to install on instances on start up."),
make_option("-t", "--instance-type", metavar="TYPE",
help="The type of instance to be launched. One of m1.small, m1.large, m1.xlarge, c1.medium, or c1.xlarge."),
make_option("-z", "--availability-zone", metavar="ZONE",
help="The availability zone to run the instances in."),
make_option("--auto-shutdown", metavar="TIMEOUT_MINUTES",
help="The time in minutes after launch when an instance will be automatically shut down."),
make_option("--client-cidr", metavar="CIDR", action="append",
help="The CIDR of the client, which is used to allow access through the firewall to the master node. (May be specified multiple times.)")
]
PLACEMENT_OPTIONS = [
make_option("-z", "--availability-zone", metavar="ZONE",
help="The availability zone to run the instances in."),
]
FORCE_OPTIONS = [
make_option("--force", action="store_true", default=False,
help="Do not ask for confirmation."),
]
SSH_OPTIONS = [
make_option("--ssh-options", metavar="SSH-OPTIONS",
help="SSH options to use."),
]
def print_usage():
print """Usage: hadoop-ec2 COMMAND [OPTIONS]
where COMMAND and [OPTIONS] may be one of:
list [CLUSTER] list all running Hadoop EC2 clusters
or instances in CLUSTER
launch-master CLUSTER launch or find a master in CLUSTER
launch-slaves CLUSTER NUM_SLAVES launch NUM_SLAVES slaves in CLUSTER
launch-cluster CLUSTER NUM_SLAVES launch a master and NUM_SLAVES slaves
in CLUSTER
create-formatted-snapshot CLUSTER create an empty, formatted snapshot of
SIZE size SIZE GiB
list-storage CLUSTER list storage volumes for CLUSTER
create-storage CLUSTER ROLE create volumes for NUM_INSTANCES instances
NUM_INSTANCES SPEC_FILE in ROLE for CLUSTER, using SPEC_FILE
attach-storage ROLE attach storage volumes for ROLE to CLUSTER
login CLUSTER log in to the master in CLUSTER over SSH
proxy CLUSTER start a SOCKS proxy on localhost into the CLUSTER
push CLUSTER FILE scp FILE to the master in CLUSTER
exec CLUSTER CMD execute CMD on the master in CLUSTER
terminate-cluster CLUSTER terminate all instances in CLUSTER
delete-cluster CLUSTER delete the group information for CLUSTER
delete-storage CLUSTER delete all storage volumes for CLUSTER
update-slaves-file CLUSTER update the slaves file on the CLUSTER master
Use hadoop-ec2 COMMAND --help to see additional options for specific commands."""
def parse_options(command, option_list=[], extra_arguments=(), unbounded_args=False):
"""
Parse the arguments to command using the given option list.
If unbounded_args is true then there must be at least as many extra arguments
as specified by extra_arguments (the first argument is always CLUSTER).
Otherwise there must be exactly the same number of arguments as extra_arguments.
"""
expected_arguments = ["CLUSTER",]
expected_arguments.extend(extra_arguments)
usage = """%%prog %s [options] %s
Options may also be specified in a configuration file called .hadoop-ec2/ec2-clusters.cfg
located in the user's home directory. Options specified on the command line take
precedence over any in the configuration file.""" % (command, " ".join(expected_arguments))
parser = OptionParser(usage=usage, version="%%prog %s" % VERSION, option_list=option_list)
parser.disable_interspersed_args()
(options, args) = parser.parse_args(sys.argv[2:])
if unbounded_args:
if len(args) < len(expected_arguments):
parser.error("incorrect number of arguments")
elif len(args) != len(expected_arguments):
parser.error("incorrect number of arguments")
cluster_name = args[0]
config_filename = os.path.join(os.environ['HOME'], '.hadoop-ec2/ec2-clusters.cfg')
config = ConfigParser.ConfigParser()
read_files = config.read(['ec2-clusters.cfg', config_filename])
logging.debug("Read %d configuration files: %s" % \
(len(read_files), ", ".join(read_files)))
opt = merge_config_with_options(cluster_name, config, vars(options))
logging.debug("Options: %s" % str(opt))
return (opt, args, Cluster(cluster_name))
def _prompt(prompt):
""" Returns true if user responds "yes" to prompt. """
return raw_input("%s [yes or no]: " % prompt) == "yes"
if __name__ == "__main__":
# Use HADOOP_EC2_LOGGING_LEVEL=DEBUG to enable debugging output.
logging.basicConfig(level=getattr(logging, os.getenv("HADOOP_EC2_LOGGING_LEVEL", "INFO")))
if len(sys.argv) < 2:
print_usage()
sys.exit(1)
command = sys.argv[1]
if command == 'list':
if len(sys.argv) == 2:
list_all()
else:
list(sys.argv[2])
elif command == 'launch-master':
(opt, args, cluster) = parse_options(command, LAUNCH_OPTIONS)
# TODO(tom): check that required args are present
launch_master(cluster, opt.get('ami'), opt.get('key_name'), opt.get('user_data_file'),
opt.get('instance_type'), opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr'))
attach_storage(cluster, (MASTER,))
wait_for_hadoop(cluster, 0)
print_master_url(cluster)
elif command == 'launch-slaves':
(opt, args, cluster) = parse_options(command, LAUNCH_OPTIONS, ("NUM_SLAVES",))
number_of_slaves = int(args[1])
launch_slaves(cluster, number_of_slaves, opt.get('user_data_file'),
opt.get('user_packages'), opt.get('auto_shutdown'), opt.get('env'))
attach_storage(cluster, (SLAVE,))
print_master_url(cluster)
elif command == 'launch-cluster':
(opt, args, cluster) = parse_options(command, LAUNCH_OPTIONS, ("NUM_SLAVES",))
number_of_slaves = int(args[1])
launch_master(cluster, opt.get('ami'), opt.get('key_name'), opt.get('user_data_file'),
opt.get('instance_type'), opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'), opt.get('client_cidr'))
launch_slaves(cluster, number_of_slaves, opt.get('user_data_file'),
opt.get('user_packages'), opt.get('auto_shutdown'), opt.get('env'))
attach_storage(cluster, ROLES)
wait_for_hadoop(cluster, number_of_slaves)
print_master_url(cluster)
elif command == 'login':
(opt, args, cluster) = parse_options(command, SSH_OPTIONS)
instances = cluster.check_running(MASTER, 1)
if not instances:
sys.exit(1)
subprocess.call('ssh %s root@%s' % (xstr(opt.get('ssh_options')), instances[0].public_dns_name), shell=True)
elif command == 'proxy':
(opt, args, cluster) = parse_options(command, SSH_OPTIONS)
instances = cluster.check_running(MASTER, 1)
if not instances:
sys.exit(1)
options = " ".join((
'-o "ConnectTimeout=10"',
'-o "ServerAliveInterval=60"',
'-o "ControlPath=none"',
'-N -D 6666'))
process = subprocess.Popen('ssh %s %s root@%s' %
(xstr(opt.get('ssh_options')), options, instances[0].public_dns_name),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True)
print """export HADOOP_EC2_PROXY_PID=%s;
echo Proxy pid %s;""" % (process.pid, process.pid)
elif command == 'push':
(opt, args, cluster) = parse_options(command, SSH_OPTIONS, ("FILE",))
instances = cluster.check_running(MASTER, 1)
if not instances:
sys.exit(1)
subprocess.call('scp %s -r %s root@%s:' % (xstr(opt.get('ssh_options')), args[1], instances[0].public_dns_name), shell=True)
elif command == 'exec':
(opt, args, cluster) = parse_options(command, SSH_OPTIONS, ("CMD",), True)
instances = cluster.check_running(MASTER, 1)
if not instances:
sys.exit(1)
subprocess.call("ssh %s root@%s '%s'" % (xstr(opt.get('ssh_options')), instances[0].public_dns_name, " ".join(args[1:])), shell=True)
elif command == 'terminate-cluster':
(opt, args, cluster) = parse_options(command, FORCE_OPTIONS)
cluster.print_status(ROLES)
if not opt["force"] and not _prompt("Terminate all instances?"):
print "Not terminating cluster."
else:
print "Terminating cluster"
cluster.terminate()
elif command == 'delete-cluster':
(opt, args, cluster) = parse_options(command)
cluster.delete_groups(ROLES)
elif command == 'create-formatted-snapshot':
(opt, args, cluster) = parse_options(command, extra_arguments=("SIZE",))
size = int(args[1])
AMI_UBUNTU_INTREPID_X86 = 'ami-ec48af85' # want a general AMI to do formatting
create_formatted_snapshot(cluster, size, opt.get('availability_zone'),
AMI_UBUNTU_INTREPID_X86, opt.get('key_name'), xstr(opt.get('ssh_options')))
elif command == 'list-storage':
(opt, args, cluster) = parse_options(command)
storage = Storage(cluster)
storage.print_status(ROLES)
elif command == 'create-storage':
(opt, args, cluster) = parse_options(command, PLACEMENT_OPTIONS, ("ROLE", "NUM_INSTANCES", "SPEC_FILE"))
storage = Storage(cluster)
role = args[1]
number_of_instances = int(args[2])
spec_file = args[3]
storage.create(role, number_of_instances, opt.get('availability_zone'), spec_file)
storage.print_status(ROLES)
elif command == 'attach-storage':
(opt, args, cluster) = parse_options(command, extra_arguments=("ROLE",))
storage = Storage(cluster)
role = args[1]
storage.attach(role, cluster.get_instances_in_role(role, 'running'))
storage.print_status(ROLES)
elif command == 'delete-storage':
(opt, args, cluster) = parse_options(command, FORCE_OPTIONS)
storage = Storage(cluster)
storage.print_status(ROLES)
if not opt["force"] and not _prompt("Delete all storage volumes? THIS WILL PERMANENTLY DELETE ALL DATA"):
print "Not deleting storage volumes."
else:
print "Deleting storage"
for role in ROLES:
storage.delete(role)
elif command == 'update-slaves-file':
(opt, args, cluster) = parse_options(command, SSH_OPTIONS)
ssh_options = xstr(opt.get('ssh_options'))
instances = cluster.check_running(MASTER, 1)
if not instances:
sys.exit(1)
master = instances[0]
slaves = cluster.get_instances_in_role(SLAVE, 'running')
with open('slaves', 'w') as f:
for slave in slaves:
f.write(slave.dns_name + "\n")
subprocess.call('scp %s -r %s root@%s:/etc/hadoop/conf' % (ssh_options, 'slaves', master.public_dns_name), shell=True)
# Copy private key
private_key = opt.get('private_key')
subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % (ssh_options, private_key, master.public_dns_name), shell=True)
for slave in slaves:
subprocess.call('scp %s -r %s root@%s:/root/.ssh/id_rsa' % (ssh_options, private_key, slave.public_dns_name), shell=True)
else:
print "Unrecognized command '%s'" % command
print_usage()