forked from apache/storm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorm
More file actions
executable file
·412 lines (349 loc) · 14 KB
/
Copy pathstorm
File metadata and controls
executable file
·412 lines (349 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
#!/usr/bin/python
import os
import sys
import random
import subprocess as sub
import getopt
import re
def identity(x):
return x
def cygpath(x):
command = ["cygpath", "-wp", x]
p = sub.Popen(command,stdout=sub.PIPE)
output, errors = p.communicate()
lines = output.split("\n")
return lines[0]
if sys.platform == "cygwin":
normclasspath = cygpath
else:
normclasspath = identity
CONF_DIR = os.path.expanduser("~/.storm")
STORM_DIR = "/".join(os.path.realpath( __file__ ).split("/")[:-2])
CONFIG_OPTS = []
CONFFILE = ""
def get_config_opts():
global CONFIG_OPTS
return "-Dstorm.options=" + (','.join(CONFIG_OPTS)).replace(' ', "%%%%")
if not os.path.exists(STORM_DIR + "/RELEASE"):
print "******************************************"
print "The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code."
print "\nYou can download a Storm release at https://github.com/nathanmarz/storm/downloads"
print "******************************************"
sys.exit(1)
def get_jars_full(adir):
files = os.listdir(adir)
ret = []
for f in files:
if f.endswith(".jar"):
ret.append(adir + "/" + f)
return ret
def get_classpath(extrajars):
ret = get_jars_full(STORM_DIR)
ret.extend(get_jars_full(STORM_DIR + "/lib"))
ret.extend(extrajars)
return normclasspath(":".join(ret))
def confvalue(name, extrapaths):
global CONFFILE
command = [
"java", "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrapaths), "backtype.storm.command.config_value", name
]
p = sub.Popen(command, stdout=sub.PIPE)
output, errors = p.communicate()
lines = output.split("\n")
for line in lines:
tokens = line.split(" ")
if tokens[0] == "VALUE:":
return " ".join(tokens[1:])
return ""
def print_localconfvalue(name):
"""Syntax: [storm localconfvalue conf-name]
Prints out the value for conf-name in the local Storm configs.
The local Storm configs are the ones in ~/.storm/storm.yaml merged
in with the configs in defaults.yaml.
"""
print name + ": " + confvalue(name, [CONF_DIR])
def print_remoteconfvalue(name):
"""Syntax: [storm remoteconfvalue conf-name]
Prints out the value for conf-name in the cluster's Storm configs.
The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
merged in with the configs in defaults.yaml.
This command must be run on a cluster machine.
"""
print name + ": " + confvalue(name, [STORM_DIR + "/conf"])
def parse_args(string):
r"""Takes a string of whitespace-separated tokens and parses it into a list.
Whitespace inside tokens may be quoted with single quotes, double quotes or
backslash (similar to command-line arguments in bash).
>>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''')
['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n']
"""
re_split = re.compile(r'''((?:
[^\s"'\\] |
"(?: [^"\\] | \\.)*" |
'(?: [^'\\] | \\.)*' |
\\.
)+)''', re.VERBOSE)
args = re_split.split(string)[1::2]
args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args]
args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args]
return [re.compile(r'\\(.)').sub('\\1', x) for x in args]
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
global CONFFILE
all_args = [
"java", jvmtype, get_config_opts(),
"-Dstorm.home=" + STORM_DIR,
"-Djava.library.path=" + confvalue("java.library.path", extrajars),
"-Dstorm.conf.file=" + CONFFILE,
"-cp", get_classpath(extrajars),
] + jvmopts + [klass] + list(args)
print "Running: " + " ".join(all_args)
if fork:
os.spawnvp(os.P_WAIT, "java", all_args)
else:
os.execvp("java", all_args) # replaces the current process and never returns
def jar(jarfile, klass, *args):
"""Syntax: [storm jar topology-jar-path class ...]
Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""
exec_storm_class(
klass,
jvmtype="-client",
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
args=args,
jvmopts=["-Dstorm.jar=" + jarfile])
def kill(*args):
"""Syntax: [storm kill topology-name [-w wait-time-secs]]
Kills the topology with the name topology-name. Storm will
first deactivate the topology's spouts for the duration of
the topology's message timeout to allow all messages currently
being processed to finish processing. Storm will then shutdown
the workers and clean up their state. You can override the length
of time Storm waits between deactivation and shutdown with the -w flag.
"""
exec_storm_class(
"backtype.storm.command.kill_topology",
args=args,
jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def activate(*args):
"""Syntax: [storm activate topology-name]
Activates the specified topology's spouts.
"""
exec_storm_class(
"backtype.storm.command.activate",
args=args,
jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def listtopos(*args):
"""Syntax: [storm list]
List the running topologies and their statuses.
"""
exec_storm_class(
"backtype.storm.command.list",
args=args,
jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def deactivate(*args):
"""Syntax: [storm deactivate topology-name]
Deactivates the specified topology's spouts.
"""
exec_storm_class(
"backtype.storm.command.deactivate",
args=args,
jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def rebalance(*args):
"""Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*]
Sometimes you may wish to spread out where the workers for a topology
are running. For example, let's say you have a 10 node cluster running
4 workers per node, and then let's say you add another 10 nodes to
the cluster. You may wish to have Storm spread out the workers for the
running topology so that each node runs 2 workers. One way to do this
is to kill the topology and resubmit it, but Storm provides a "rebalance"
command that provides an easier way to do this.
Rebalance will first deactivate the topology for the duration of the
message timeout (overridable with the -w flag) and then redistribute
the workers evenly around the cluster. The topology will then return to
its previous state of activation (so a deactivated topology will still
be deactivated and an activated topology will go back to being activated).
The rebalance command can also be used to change the parallelism of a running topology.
Use the -n and -e switches to change the number of workers or number of executors of a component
respectively.
"""
exec_storm_class(
"backtype.storm.command.rebalance",
args=args,
jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"])
def shell(resourcesdir, command, *args):
tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
runnerargs = [tmpjarpath, command]
runnerargs.extend(args)
exec_storm_class(
"backtype.storm.command.shell_submission",
args=runnerargs,
jvmtype="-client",
extrajars=[CONF_DIR],
fork=True)
os.system("rm " + tmpjarpath)
def repl():
"""Syntax: [storm repl]
Opens up a Clojure REPL with the storm jars and configuration
on the classpath. Useful for debugging.
"""
cppaths = [STORM_DIR + "/conf"]
exec_storm_class("clojure.lang.Repl", jvmtype="-client", extrajars=cppaths)
def nimbus(klass="backtype.storm.daemon.nimbus"):
"""Syntax: [storm nimbus]
Launches the nimbus daemon. This command should be run under
supervision with a tool like daemontools or monit.
See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
cppaths = [STORM_DIR + "/conf"]
jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
"-Dlogfile.name=nimbus.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
]
exec_storm_class(
klass,
jvmtype="-server",
extrajars=cppaths,
jvmopts=jvmopts)
def supervisor(klass="backtype.storm.daemon.supervisor"):
"""Syntax: [storm supervisor]
Launches the supervisor daemon. This command should be run
under supervision with a tool like daemontools or monit.
See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
cppaths = [STORM_DIR + "/conf"]
jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
"-Dlogfile.name=supervisor.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
]
exec_storm_class(
klass,
jvmtype="-server",
extrajars=cppaths,
jvmopts=jvmopts)
def ui():
"""Syntax: [storm ui]
Launches the UI daemon. The UI provides a web interface for a Storm
cluster and shows detailed stats about running topologies. This command
should be run under supervision with a tool like daemontools or monit.
See Setting up a Storm cluster for more information.
(https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster)
"""
cppaths = [STORM_DIR + "/conf"]
jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
"-Dlogfile.name=ui.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
]
exec_storm_class(
"backtype.storm.ui.core",
jvmtype="-server",
jvmopts=jvmopts,
extrajars=[STORM_DIR, STORM_DIR + "/conf"])
def drpc():
"""Syntax: [storm drpc]
Launches a DRPC daemon. This command should be run under supervision
with a tool like daemontools or monit.
See Distributed RPC for more information.
(https://github.com/nathanmarz/storm/wiki/Distributed-RPC)
"""
jvmopts = ["-Xmx768m",
"-Dlogfile.name=drpc.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml"
]
exec_storm_class(
"backtype.storm.daemon.drpc",
jvmtype="-server",
jvmopts=jvmopts,
extrajars=[STORM_DIR + "/conf"])
def dev_zookeeper():
"""Syntax: [storm dev-zookeeper]
Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local dir and
"storm.zookeeper.port" as its port. This is only intended for development/testing, the
Zookeeper instance launched is not configured to be used in production.
"""
cppaths = [STORM_DIR + "/conf"]
exec_storm_class(
"backtype.storm.command.dev_zookeeper",
jvmtype="-server",
extrajars=[STORM_DIR + "/conf"])
def version():
"""Syntax: [storm version]
Prints the version number of this Storm release.
"""
releasefile = STORM_DIR + "/RELEASE"
if os.path.exists(releasefile):
print open(releasefile).readline().strip()
else:
print "Unknown"
def print_classpath():
"""Syntax: [storm classpath]
Prints the classpath used by the storm client when running commands.
"""
print get_classpath([])
def print_commands():
"""Print all client commands and link to documentation"""
print "Commands:\n\t", "\n\t".join(sorted(COMMANDS.keys()))
print "\nHelp:", "\n\thelp", "\n\thelp <command>"
print "\nDocumentation for the storm client can be found at https://github.com/nathanmarz/storm/wiki/Command-line-client\n"
print "Configs can be overridden using one or more -c flags, e.g. \"storm list -c nimbus.host=nimbus.mycompany.com\"\n"
def print_usage(command=None):
"""Print one help message or list of available commands"""
if command != None:
if COMMANDS.has_key(command):
print (COMMANDS[command].__doc__ or
"No documentation provided for <%s>" % command)
else:
print "<%s> is not a valid command" % command
else:
print_commands()
def unknown_command(*args):
print "Unknown command: [storm %s]" % ' '.join(sys.argv[1:])
print_usage()
COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui,
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
"activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
"list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version}
def parse_config(config_list):
global CONFIG_OPTS
if len(config_list) > 0:
for config in config_list:
CONFIG_OPTS.append(config)
def parse_config_opts(args):
curr = args[:]
curr.reverse()
config_list = []
args_list = []
while len(curr) > 0:
token = curr.pop()
if token == "-c":
config_list.append(curr.pop())
elif token == "--config":
global CONFFILE
CONFFILE = curr.pop()
else:
args_list.append(token)
return config_list, args_list
def main():
if len(sys.argv) <= 1:
print_usage()
sys.exit(-1)
global CONFIG_OPTS
config_list, args = parse_config_opts(sys.argv[1:])
parse_config(config_list)
COMMAND = args[0]
ARGS = args[1:]
(COMMANDS.get(COMMAND, unknown_command))(*ARGS)
if __name__ == "__main__":
main()