Multiprocesing - procesy se po case stavaji zombie rubrika: Programování: Python

Zdravim,
Uz nejakou dobu resime problem s multiprocesingem v pythonu. Pouzivame standartni multiprocesing modul (python 2.7.x)
Zakladni uvaha:
Budeme mit Message Queue servicu (RabbitMQ v nasem pripade) a pokud prijde zprava - tak spawneme novy proces ktery task zpracuje.
Problem:
Po case se procesy stavaji zombi procesmi.
Implementace:
pripravil jsme zakladni implementaci, snazil jsem se to osekat co nejvic to slo aby to bylo co nejprehlednejsi
# -*- coding: utf-8 -*- from multiprocessing import Process import signal from threading import Lock class Task(Process): def __init__(self, data): super(Task, self).__init__() self.data = data def run(self): signal.signal(signal.SIGCHLD, signal.SIG_DFL) self.do_job() # long job there pass def do_job(self): # very long job pass class MQListener(object): def __init__(self): self.tasks = [] self.tasks_lock = Lock() self.register_signal_handler() mq = RabbitMQ() mq.listen("task_queue", self.on_message) def register_signal_handler(self): signal.signal(signal.SIGCHLD, self.on_signal_received) def on_signal_received(self, *_): self._check_existing_processes() def on_message(self, message): # ack message and create task task = Task(message) with self.tasks_lock: self.tasks.append(task) task.start() pass def _check_existing_processes(self): """ go over all created task, if some is not alive - remove them from tasks collection """ try: with self.tasks_lock: running_tasks = [] for w in self.tasks: if not w.is_alive(): w.join() else: running_tasks.append(w) self.tasks = running_tasks except Exception: # log pass if __name__ == '__main__': m = MQListener()
Myslim si ze problem je v tom ze s novym procesem (forknutim) se zdedi i lock a proto to pak nejde, Zkouseli jsme vic veci - tohle je posledni verze.
Dekuji za jakoukoliv pomoc pri reseni. Nebranime se ani pouziti nejake knihovny - za doporuceni budu taky rad

Díval ses na Celery? Celery implementuje to, co se snažíš hledat - když do něho dojde zpráva, tak ji zpracuje (a nebo restartuje zpracování).
Úspěšně ji používám (a nejen já) právě na odložené zpracování úloh z front-endu. Front-end pošle zprávu do Celery a když má worker na pozadí čas, tak si úlohu vyzvedne a spustí.
Pro zobrazení všech 3 odpovědí se prosím přihlaste:
Nebo se přihlaste jménem a heslem:
Komentáře