Skip to content

Commit

Permalink
ZOOKEEPER-765. Add python example script (Travis and Andrei via mahadev)
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@979742 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Mahadev Konar committed Jul 27, 2010
1 parent 1f5bf01 commit 34abd24
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ IMPROVEMENTS:
ZOOKEEPER-821. Add ZooKeeper version information to zkpython (Rich
Schumacher via mahadev)

ZOOKEEPER-765. Add python example script (Travis and Andrei via mahadev)

NEW FEATURES:
ZOOKEEPER-729. Java client API to recursively delete a subtree.
(Kay Kay via henry)
Expand Down
8 changes: 8 additions & 0 deletions src/contrib/zkpython/src/examples/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

This folder contains sample showing how you can use ZooKeeper from Python.

You should also check the following projects:

* http://github.com/phunt/zk-smoketest
* http://github.com/henryr/pyzk-recipes

202 changes: 202 additions & 0 deletions src/contrib/zkpython/src/examples/watch_znode_for_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
#!/usr/bin/env python2.6
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" ZNode Change Watcher Skeleton Script
This script shows you how to write a python program that watches a specific
znode for changes and reacts to them.
Steps to understand how this script works:
1. start a standalone ZooKeeper server (by default it listens on localhost:2181)
Did you know you can deploy "local clusters" by using zkconf[1]?
[1] http://github.com/phunt/zkconf
2. enter the command line console
3. create the test node:
[zk: (CONNECTED) 1] create /watch-test dummy-data
Created /watch-test
4. in another shell start this script in verbose mode
$ python watch_znode_for_changes.py -v
# you should see a lot of log messages. have a look over them because
# you can easily understand how zookeeper works
5. update the node data:
[zk: (CONNECTED) 2] set /watch-test new-data
cZxid = 0xa0000001a
ctime = Fri Jul 09 19:14:45 EEST 2010
mZxid = 0xa0000001e
mtime = Fri Jul 09 19:18:18 EEST 2010
pZxid = 0xa0000001a
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
... and you should see similar log messages:
2010-07-09 19:18:18,537:11542(0xb6ea5b70):ZOO_DEBUG@process_completions@1765: Calling a watcher for node [/watch-test], type = -1 event=ZOO_CHANGED_EVENT
2010-07-09 19:18:18,537 watch_znode_for_changes.py:83 - Running watcher: zh=0 event=3 state=3 path=/watch-test
2010-07-09 19:18:18,537:11542(0xb6ea5b70):ZOO_DEBUG@zoo_awget@2400: Sending request xid=0x4c374b33 for path [/watch-test] to 127.0.0.1:2181
2010-07-09 19:18:18,545:11542(0xb76a6b70):ZOO_DEBUG@zookeeper_process@1980: Queueing asynchronous response
2010-07-09 19:18:18,545:11542(0xb6ea5b70):ZOO_DEBUG@process_completions@1772: Calling COMPLETION_DATA for xid=0x4c374b33 rc=0
2010-07-09 19:18:18,545 watch_znode_for_changes.py:54 - This is where your application does work.
You can repeat this step multiple times.
6. that's all. in the end you can delete the node and you should see a ZOO_DELETED_EVENT
"""

import logging
import logging.handlers
import signal
import sys
import time
import threading
import zookeeper

from optparse import OptionParser

logger = logging.getLogger()

class MyClass(threading.Thread):
znode = '/watch-test'

def __init__(self, options, args):
threading.Thread.__init__(self)

logger.debug('Initializing MyClass thread.')
if options.verbose:
zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)

self.zh = zookeeper.init(options.servers)
if zookeeper.OK != zookeeper.aget(self.zh, self.znode,
self.watcher, self.handler):
logger.critical('Unable to get znode! Exiting.')
sys.exit(1)

def __del__(self):
zookeeper.close(self.zh)

def aget(self):
return zookeeper.aget(self.zh, self.znode, self.watcher, self.handler)

def handler(self, zh, rc, data, stat):
"""Handle zookeeper.aget() responses.
This code handles the zookeeper.aget callback. It does not handle watches.
Numeric arguments map to constants. See ``DATA`` in ``help(zookeeper)``
for more information.
Args:
zh Zookeeper handle that made this request.
rc Return code.
data Data stored in the znode.
Does not provide a return value.
"""
if zookeeper.OK == rc:
logger.debug('This is where your application does work.')
else:
if zookeeper.NONODE == rc:
# avoid sending too many requests if the node does not yet exists
logger.info('Node not found. Trying again to set the watch.')
time.sleep(1)

if zookeeper.OK != self.aget():
logger.critical('Unable to get znode! Exiting.')
sys.exit(1)

def watcher(self, zh, event, state, path):
"""Handle zookeeper.aget() watches.
This code is called when a znode changes and triggers a data watch.
It is not called to handle the zookeeper.aget call itself.
Numeric arguments map to constants. See ``DATA`` in ``help(zookeeper)``
for more information.
Args:
zh Zookeeper handle that set this watch.
event Event that caused the watch (often called ``type`` elsewhere).
state Connection state.
path Znode that triggered this watch.
Does not provide a return value.
"""
out = ['Running watcher:',
'zh=%d' % zh,
'event=%d' % event,
'state=%d' % state,
'path=%s' % path]
logger.debug(' '.join(out))
if event == zookeeper.CHANGED_EVENT and \
state == zookeeper.CONNECTED_STATE and \
self.znode == path:
if zookeeper.OK != self.aget():
logger.critical('Unable to get znode! Exiting.')
sys.exit(1)

def run(self):
while True:
time.sleep(86400)


def main(argv=None):
# Allow Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_DFL)

parser = OptionParser()
parser.add_option('-v', '--verbose',
dest='verbose',
default=False,
action='store_true',
help='Verbose logging. (default: %default)')
parser.add_option('-s', '--servers',
dest='servers',
default='localhost:2181',
help='Comma-separated list of host:port pairs. (default: %default)')

(options, args) = parser.parse_args()

if options.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)

formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

logger.info('Starting Zookeeper python example: %s' % ' '.join(sys.argv))

mc = MyClass(options, args)
mc.start()
mc.join()


if __name__ == '__main__':
main()

0 comments on commit 34abd24

Please sign in to comment.