This is a fork of NSQWorker. It implements a generic message router. See Examples.
Currently only a threaded worker is supported. it handles NSQ messaging with the official Python/Tornado library and executes a blocking message handler function in an executor thread pool.
The motivation behind this package is to replace Celery/RabbitMQ worker processes which perform long running tasks with NSQ.
pip install git+https://github.com/rikonor/nsqworker.git
import nsq
from nsqhandler import NSQHandler, load_routes, route
@load_routes
class Test(NSQHandler):
@route(lambda msg: True)
def test(self, message):
self.logger.info("Test")
Test("test4", "test")
nsq.run()
import nsq
import time
from nsqworker import nsqworker
def process_message(message):
print "start", message.id
time.sleep(2)
print "end", message.id
def handle_exc(message, e):
traceback.print_exc()
w.io_loop.add_callback(message.requeue)
w = nsqworker.ThreadWorker(message_handler=process_message,
exception_handler=handle_exc, concurrency=5, ...)
w.subscribe_reader()
nsq.run()
The arguments for the ThreadWorker constructor are a synchronous, blocking function that handles messages, concurrency, an optional exception_handler and all other arguments for the official NSQ Python library - pynsq.
-
The worker will explicitly call
message.finish()in case the handler function didn't callmessage.finish()ormessage.requeue(). -
The worker will periodically call
message.touch()every 30s for long running tasks so they won't timeout by nsqd. -
The exception handler is called with a message and an exception as the arguments in case it was given during the worker's initialization and an exception is raised while processing a message.
-
Multiple readers can be added for handing messages from multiple topics and channels.
-
Any interactions with NSQ from within a thread worker, such as
message.requeue(),message.finish()and publishing message must be added as callback to the ioloop -
An optional
timeout=<seconds>can be added to the worker constructor, if it is defined, after the defined timeout the optional exception handler will invoked with annsqworker.errors.TimeoutError. Due toconcurrent.futures.ThreadPoolExecutorlimitations it is impossible to cancel the running executor thread and it may continue running even after the timeout exception was raised. -
An optional
--loglevelcommand line argument can be provided to set the logging level, default isINFO. The same logging level can be used with other loggers by getting it form the worker withnumric_level = worker.logger.level -
TODO - message de-duping.