Multiprocesing - procesy se po case stavaji zombie rubrika: Programování: Python

6 Twista
položil/-a 13.9.2015

Zdravim,

Uz nejakou dobu resime problem s multiprocesingem v pythonu. Pouzivame standartni multiprocesing modul (python 2.7.x)

Zakladni uvaha:

Budeme mit Message Queue servicu (RabbitMQ v nasem pripade) a pokud prijde zprava - tak spawneme novy proces ktery task zpracuje.

Problem:

Po case se procesy stavaji zombi procesmi.

Implementace:

pripravil jsme zakladni implementaci, snazil jsem se to osekat co nejvic to slo aby to bylo co nejprehlednejsi

# -*- coding: utf-8 -*-
from multiprocessing import Process
import signal
from threading import Lock
 
 
class Task(Process):
 
    def __init__(self, data):
        super(Task, self).__init__()
        self.data = data
 
    def run(self):
        signal.signal(signal.SIGCHLD, signal.SIG_DFL)
        self.do_job() # long job there
        pass
 
    def do_job(self):
        # very long job
        pass
 
 
class MQListener(object):
 
    def __init__(self):
        self.tasks = []
        self.tasks_lock = Lock()
        self.register_signal_handler()
        mq = RabbitMQ()
        mq.listen("task_queue", self.on_message)
 
    def register_signal_handler(self):
        signal.signal(signal.SIGCHLD, self.on_signal_received)
 
    def on_signal_received(self, *_):
        self._check_existing_processes()
 
    def on_message(self, message):
        # ack message and create task
        task = Task(message)
        with self.tasks_lock:
            self.tasks.append(task)
            task.start()
        pass
 
    def _check_existing_processes(self):
        """
        go over all created task, if some is not alive - remove them from tasks collection
        """
        try:
            with self.tasks_lock:
                running_tasks = []
                for w in self.tasks:
                    if not w.is_alive():
                        w.join()
                    else:
                        running_tasks.append(w)
 
                self.tasks = running_tasks
        except Exception:
            # log
            pass
 
 
 
if __name__ == '__main__':
    m = MQListener()

Myslim si ze problem je v tom ze s novym procesem (forknutim) se zdedi i lock a proto to pak nejde, Zkouseli jsme vic veci - tohle je posledni verze.

Dekuji za jakoukoliv pomoc pri reseni. Nebranime se ani pouziti nejake knihovny - za doporuceni budu taky rad

odkaz Vyřešeno
Anonym
odpověděl/-a 18.9.2015
 
upravil/-a 18.9.2015

Díval ses na Celery? Celery implementuje to, co se snažíš hledat - když do něho dojde zpráva, tak ji zpracuje (a nebo restartuje zpracování).

Úspěšně ji používám (a nejen já) právě na odložené zpracování úloh z front-endu. Front-end pošle zprávu do Celery a když má worker na pozadí čas, tak si úlohu vyzvedne a spustí.

Komentáře

  • Twista : diky, nakonec jsem nasadil Celery a vypada to ze se vyresily problemy 22.9.2015
  • Anonym : A pokud nepotřebuješ extrémní spolehlivost (např. doručení zprávy i když celery neběží), tak jako broker je možné použít Redis - je nepříjemně rychlý a oproti *MQ lehkotonážní... A pohodlněji se nasazuje - za to Vás budou mít vaši Opsové rádi. 29.9.2015
  • Twista : Zas tolik zprav to neni - takze zatim urcite rabbitmq staci. navic nasazeni moc neresime - vse pohodlne pres Ansible :) kazdopadne diky za tip :) 18.10.2015

Pro zobrazení všech 3 odpovědí se prosím přihlaste:

Rychlé přihlášení přes sociální sítě:

Nebo se přihlaste jménem a heslem:

Zadejte prosím svou e-mailovou adresu.
Zadejte své heslo.