使用Pyleus开发Storm应用程序(0.9.4) 2015-05-14 20:30
介绍
Pyleus一款基于Python的Storm API组件。通过Pyleus,可以使用Python开发语言开发Storm应用程序。
安装
- 安装
pip install pyleus
- 配置
vi ~/.pyleus.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | [storm]
# path to Storm executable (pyleus will automatically look in PATH)
storm_cmd_path: /opt/hadoop/client/storm/bin/storm
# optional: use -n option of pyleus CLI instead
#nimbus_ip: 192.168.1.13
# java options to pass to Storm CLI
jvm_opts: -Djava.io.tmpdir=/tmp
[build]
# PyPI server to use during the build of your topologies
pypi_index_url: http://pypi.ninjacorp.com/simple/
# always use system-site-packages for pyleus virtualenvs (default: false)
system_site_packages: true
|
使用
- 目录结构
├── net
│ ├── __init__.py
│ └── cheyo
│ ├── __init__.py
│ └── pyleus
│ ├── dummy_bolt.py
│ ├── dummy_spout.py
│ └── __init__.py
└── pyleus_topo.yaml
- 代码
dummy_spout.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 | from pyleus.storm import Spout
class DummySpout(Spout):
OUTPUT_FIELDS = ['sentence', 'name']
def next_tuple(self):
self.emit(("This is a sentence.", "spout",))
if __name__ == '__main__':
DummySpout().run()
|
dummy_bolt.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | from pyleus.storm import SimpleBolt
class DummyBolt(SimpleBolt):
OUTPUT_FIELDS = ['sentence']
def process_tuple(self, tup):
sentence, name = tup.values
new_sentence = "{0} says, \"{1}\"".format(name, sentence)
self.emit((new_sentence,), anchors=[tup])
if __name__ == '__main__':
DummyBolt().run()
|
pyleus_topo.yaml:
1 2 3 4 5 6 7 8 9 10 11 12 13 | name: my_first_topology
topology:
- spout:
name: my-first-spout
module: net.cheyo.pyleus.dummy_spout
- bolt:
name: my-first-bolt
module: net.cheyo.pyleus.dummy_bolt
groupings:
- shuffle_grouping: my-first-spout
|
- 编译
pyleus build pyleus_topo.yaml
- 运行
pyleus submit -n data03 my_first_topology.jar
注意
Storm集群上也需要安装pyleus:
pip install pyleus