Asynchronní programování: Zdroje a jejich uvolňování (počítání referencí?) rubrika: Návrh

8 v6ak
položil/-a 23.7.2014

Líbí se mi asynchronní programování pomocí futures, promises apod. Chtěl bych se ale zeptat, jak se dá ideálně řešit následující problém a případně pod jakými klíčovými slovy mám hledat knihovnu.

Mám nějaký zdroj, který musím zase odevzdat a případně poté spustit další události. Pro příklad bohatě stačí třeba takový otevřený lokální logovací soubor. (Tj. nemám jeden globální log soubor, ale mnoho malých. Po dokončení práce jej musím zavřít.) Takovýchto resources ale může být víc a mohu chtít různou granularitu. Včasné uvolnění zdroje je pro mě z nějakého důvodu podstatné, nemohu vše uvolnit až po skončení celé práce.

Pokud bych aplikaci napsal blokujícím způsobem, bylo by vše jasné: Resource bych otevřel a pomocí try-finally nebo podobného mechanismu (příslušná higher-order funkce) zase zavřel po dokončení práce. To u asynchronního kódu nelze, protože podstatná část kódu (callbacky) bude volána až poté, co to vyleze z try bloku. Tím bych ten resource vrátil předčasně a kód by failoval.

Teoreticky bych mohl udělat nějaké čekání na všechny podúkoly, ale nechci blokovat. Moc se mi to nelíbí. Čekání je vděčný zdroj deadlocků a podobných problémů.

Napadá mě nějaký systém, který by evidoval, kdo se může přihlásit o které zdroje. Nešlo by o exkluzivní přístup a nenahrazovalo by to zámky či obdobné mechanismy. Pouze by to značilo, že daná úloha má v úmyslu ten zdroj někdy případně využít či dát k využití nějaké další úloze. Jakmile by o zdroj nikdo nestál, mohl bych jej uklidit (např. zavřít soubor nebo třeba vypsat uživateli hlášku o dokončení práce). To by mělo jít s počítáním referencí - udržoval bych si počet zájemců o zdroj a v případě poklesu na nulu jej uvolnil. Musel bych si hlídat tasky, které skončí výjimkou apod., aby se ve všech případech po úspěšném či neúspěšném ukončení snížil počet zájemců přesně o 1. (To je pohoda - finally a AtomicInt.)

Pod čím to ale hledat? Trošku mi to připomíná semafor naruby nebo AtomicInt s handlerem.

Našel jsem jiné řešení, ale continuations moc neznám, takže z toho nejsem moudrej: http://jsuereth.com/scala-arm/continuations.html

Řeším to konkrétně ve Scale, ale jde mi spíš o obecný koncept.

odkaz Vyřešeno
9 Honza Břešťan
odpověděl/-a 23.7.2014

To reseni v kontinuacich je jedina schudna cesta, kterou znam (a pouzivam). V zasade to funguje vsude stejne: kontinuace je akce zavesena na tvuj future, ktera se provede, kdyz future dobehne (nebo selze, nebo ho nekdo zrusi). Nikde nic neblokuje ani synchronne neceka, o spusteni te kontinuace se postara framework (nejaky scheduler).

Ve Scale nedokazu poradit kodem, ale zaklad treba v C# vypada nasledovne. Pouziju C# 4 bez await, protoze ten (skoro) vsech tech problemu zbavuje a pise se vetsinou stejne jako synchronni kod, coz by pro ucely ukazky moc nepomohlo :)

IDisposable resource = GetResource();
Task task = DoSomethingWithResourceAsync(resource)
    .ContinueWith(() =>
    {
         // Tohle je ta kontinuace, provede se az po skonceni/zruseni predchoziho tasku.
 
         if (resource != null) { resource.Dispose(); }
    });

Chybi tomu spousta error handlingu, cancellation handlingu apod., ale v zasade je tohle minimum. Daji se takhle obecne retezit asynchronni/paralelni workflows, ale hlavne z nich jdou delat stromy, nebo naopak zavesit kontinuaci na moment, kdy dobehne nejaka urcena mnozina futures - v podstate jako automaticky semafor nebo barrier.

O neco slozitejsi pripad se sdilenim resource jsem zrovna nedavno v C# 4 resil, situace pak vypadala zjednodusene nejak takhle (zase tam chybi error a cancellation handling, protoze uff):

var resource = default (IDisposable);
return Task.Factory.StartNew(() =>
    {
        // Tohle vlastne chci synchronne, ale potrebuju zachytit kontext pro pripadne chyby.
        resource = GetResource();
        // Udelam nekolik asynchronnich akci nad stejnou resource.
        Task<Result>[] intermediateResults = GetIntermediateResultsUsingResourceAsync(resource);
        // Nasledujici ContinueWhenAll je vlastne asynchronni varianta Thread.Join
        return Task.Factory.ContinueWhenAll(intermediateResults, AggregateResults)
    })
    .Unwrap() // Unwrap, protoze tady mam Task<Task<AggregatedResult>> a ja to chci zavesit na ten vnitrni.
    .ContinueWith(() =>
    {
        // Spousta error handlingu kvuli frameworku.
        if (resource != null) { resource.Dispose(); }
    })

Jen pro uplnost prvni priklad, ale v C# 5 s awaitem:

IDisposable resource = null;
try
{
    resource = GetResource();
    await DoSomethingWithResourceAsync(resource);
    // Vsechno po awaitu (vcetne zachovani exception handlingu) kompilator zabali do kontinuace a nascheduluje ji,
    // zatimco tahle metoda rovnou vrati. Az po prvni await je ale kod synchronni.
}
finally
{
    if (resource != null) { resource.Dispose(); }
}

Komentáře

  • v6ak : Jo, navěsit to do onComplete vypadá jako dobrý nápad, něco podobného asi asi napíšu. Pokud se budu nudit, pořeším ještě případné vyhození OutOfMemoryError či StackOverflowError (a podobných chyb) v nevhodnou chvíli. 23.7.2014
  • Augi : Přesně tak, napsal jsi vše podstatné. Už v průběhu čtení otázky jsem si říkal "to je jasný, continuations". 23.7.2014

Pro zobrazení všech 4 odpovědí se prosím přihlaste:

Rychlé přihlášení přes sociální sítě:

Nebo se přihlaste jménem a heslem:

Zadejte prosím svou e-mailovou adresu.
Zadejte své heslo.