Workflow-0.8.3: Workflow patterns over a monad for thread state logging & recovery

Safe HaskellNone
LanguageHaskell98

Control.Workflow

Contents

Description

A workflow can be seen as a persistent thread. The workflow monad writes a log that permit to restore the thread at the interrupted point. step is the (partial) monad transformer for the Workflow monad. A workflow is defined by its name and, optionally by the key of the single parameter passed. There primitives for starting workflows also restart the interrupted workflow when it has been in execution previously.

A small example that print the sequence of integers in te console if you interrupt the progam, when restarted again, it will start from the last printed number

module Main where
import Control.Workflow
import Control.Concurrent(threadDelay)
import System.IO (hFlush,stdout)

mcount n= do step $  do
                       putStr (show n ++ " ")
                       hFlush stdout
                       threadDelay 1000000
             mcount (n+1)
             return () -- to disambiguate the return type

main= exec1  "count"  $ mcount (0 :: Int)
>>> runghc demos\sequence.hs
>0 1 2 3
>CTRL-C Pressed
>>> runghc demos\sequence.hs
>3 4 5 6 7
>CTRL-C Pressed
>>> runghc demos\sequence.hs
>7 8 9 10 11
...

The program restart at the last saved step.

As you can see, some side effect can be re-executed after recovery if the log is not complete. This may happen after an unexpected shutdown (in this case) or due to an asynchronous log writing policy. (see syncWrite)

When the step results are big and complex, use the Data.RefSerialize package to define the (de)serialization instances The log size will be reduced. showHistory` method will print the structure changes in each step.

If instead of RefSerialize, you use read and show instances, there will be no reduction. but still it will work, and the log will be readable for debugging purposes. The RefSerialize istance is automatically derived from Read, Show instances.

Data.Binary instances are also fine for serialization. To use Binary, just define a binary instance of your data by using showpBinary and readpBinary.

Within the RefSerialize instance of a structure, you can freely mix Show,Read RefSerialize and Data Binary instances.

Control.Workflow.Patterns contains higher level workflow patterns for handling multiple workflows

Control.Workflow.Configuration permits the use of workflows for configuration purposes

Synopsis

Documentation

data Stat #

Instances

Serialize Stat # 

Methods

showp :: Stat -> STW () #

readp :: STR Stat #

IResource Stat # 
(Monad m, MonadIO m, Serialize a, Typeable * a) => PMonadTrans (WF Stat) m a #
plift= step

Methods

plift :: m a -> WF Stat m a #

(HasFork io, MonadIO io, MonadCatch io) => HasFork (WF Stat io) # 

Methods

fork :: WF Stat io () -> WF Stat io ThreadId #

type Workflow m = WF Stat m #

type WorkflowList m a b = Map String (a -> Workflow m b) #

class PMonadTrans t m a where #

PMonadTrans permits |to define a partial monad transformer. They are not defined for all kinds of data but the ones that have instances of certain classes.That is because in the lift instance code there are some hidden use of these classes. This also may permit an accurate control of effects. An instance of MonadTrans is an instance of PMonadTrans

Minimal complete definition

plift

Methods

plift :: Monad m => m a -> t m a #

Instances

(MonadTrans t, Monad m) => PMonadTrans t m a #

An instance of MonadTrans is an instance of PMonadTrans

Methods

plift :: m a -> t m a #

(Monad m, MonadIO m, Serialize a, Typeable * a) => PMonadTrans (WF Stat) m a #
plift= step

Methods

plift :: m a -> WF Stat m a #

class MonadCatchIO m a where #

Adapted from the MonadCatchIO-mtl package. However, in this case it is needed to express serializable constraints about the returned values, so the usual class definitions for lifting IO functions are not suitable.

Minimal complete definition

catch, block, unblock

Methods

catch :: Exception e => m a -> (e -> m a) -> m a #

Generalized version of catch

block :: m a -> m a #

Generalized version of block

unblock :: m a -> m a #

Generalized version of unblock

class MonadIO io => HasFork io where #

Minimal complete definition

fork

Methods

fork :: io () -> io ThreadId #

Instances

HasFork IO # 

Methods

fork :: IO () -> IO ThreadId #

(HasFork io, MonadIO io, MonadCatch io) => HasFork (WF Stat io) # 

Methods

fork :: WF Stat io () -> WF Stat io ThreadId #

throw :: (MonadIO m, Exception e) => e -> m a #

Generalized version of throwIO

class Indexable a where #

Indexable is an utility class used to derive instances of IResource

Example:

data Person= Person{ pname :: String, cars :: [DBRef Car]} deriving (Show, Read, Typeable)
data Car= Car{owner :: DBRef Person , cname:: String} deriving (Show, Read, Eq, Typeable)

Since Person and Car are instances of Read ans Show, by defining the Indexable instance will implicitly define the IResource instance for file persistence:

instance Indexable Person where  key Person{pname=n} = "Person " ++ n
instance Indexable Car where key Car{cname= n} = "Car " ++ n

Minimal complete definition

key

Methods

key :: a -> String #

defPath :: a -> String #

Instances

Indexable Int 

Methods

key :: Int -> String #

defPath :: Int -> String #

Indexable Integer 
Indexable () 

Methods

key :: () -> String #

defPath :: () -> String #

Indexable String 

Methods

key :: String -> String #

defPath :: String -> String #

Indexable (WFRef a) # 

Methods

key :: WFRef a -> String #

defPath :: WFRef a -> String #

keyWF :: Indexable a => String -> a -> String #

Return the unique name of a workflow with a parameter (executed with exec or start)

Start/restart workflows

start #

Arguments

:: (MonadCatch m, MonadIO m, Indexable a, Serialize a, Typeable a) 
=> String

name that identifies the workflow.

-> (a -> Workflow m b)

workflow to execute

-> a

initial value (ever use the initial value for restarting the workflow)

-> m (Either WFErrors b)

result of the computation

Start or continue a workflow . WFErrors and exceptions are returned as Left err (even if they were triggered as exceptions). Other exceptions are returned as Left (Exception e) use killWF or delWF in case of error to clear the log.

exec :: (Indexable a, Serialize a, Typeable a, Monad m, MonadIO m, MonadCatch m) => String -> (a -> Workflow m b) -> a -> m b #

Start or continue a workflow with exception handling the workflow flags are updated even in case of exception WFerrors are raised as exceptions

exec1d :: (MonadIO m, MonadCatch m) => String -> Workflow m b -> m b #

A version of exec1 that deletes its state after complete execution or thread killed

exec1 :: (Monad m, MonadIO m, MonadCatch m) => String -> Workflow m a -> m a #

A version of exec with no seed parameter.

exec1nc :: (Monad m, MonadIO m, MonadMask m) => String -> Workflow m a -> m a #

executes a workflow, but does not mark it as finished even if the process ended. It this case, the workflow just will return the last result. If the workflow was gathering data from user questions for a configuration, then this primitive will store them in the log the first time, and can be retrieve it the next time.

wfExec :: (Serialize a, Typeable a, MonadCatch m, MonadIO m) => Workflow m a -> Workflow m a #

Start or restart an anonymous workflow inside another workflow. Its state is deleted when finished and the result is stored in the parent's WF state.

startWF #

Arguments

:: (MonadCatch m, MonadIO m, Serialize a, Serialize b, Typeable a, Indexable a) 
=> String

Name of workflow in the workflow list

-> a

Initial value (ever use the initial value even to restart the workflow)

-> WorkflowList m a b

function to execute

-> m (Either WFErrors b)

Result of the computation

Start or continue a workflow from a list of workflows with exception handling. see start for details about exception and error handling

restartWorkflows :: (Serialize a, Typeable a) => Map String (a -> Workflow IO b) -> IO () #

Re-start the non finished workflows in the list, for all the initial values that they may have been invoked. The list contain he identifiers of the workflows and the procedures to be called. All the workflows initiated with exec* or start* will be restarted with all possible seed values.

Lifting to the Workflow monad

step :: (MonadIO m, Serialize a, Typeable a) => m a -> Workflow m a #

Lifts a monadic computation to the WF monad, and provides transparent state loging and resuming the computation Note: Side effect can be repeated at recovery time if the log was not complete before shut down see the integer sequence example, above.

stepExec :: (Typeable t, Serialize t, MonadIO m) => DBRef Stat -> m t -> m (DBRef Stat, t) #

unsafeIOtoWF :: Monad m => IO a -> Workflow m a #

Executes a computation inside of the workflow monad whatever the monad encapsulated in the workflow. Warning: this computation is executed whenever the workflow restarts, no matter if it has been already executed previously. This is useful for intializations or debugging. To avoid re-execution when restarting use: step $ unsafeIOtoWF...

To perform IO actions in a workflow that encapsulates an IO monad, use step over the IO action directly:

 step $ action

instead of

  step $ unsafeIOtoWF $ action

References to intermediate values in the workflow log

data WFRef a #

Instances

Read (WFRef a) # 
Show (WFRef a) # 

Methods

showsPrec :: Int -> WFRef a -> ShowS #

show :: WFRef a -> String #

showList :: [WFRef a] -> ShowS #

Serialize (WFRef a) # 

Methods

showp :: WFRef a -> STW () #

readp :: STR (WFRef a) #

Indexable (WFRef a) # 

Methods

key :: WFRef a -> String #

defPath :: WFRef a -> String #

newWFRef :: (Serialize a, Typeable a, MonadIO m, MonadCatch m) => a -> Workflow m (WFRef a) #

Log a value in the workflow log and return a reference to it.

newWFRef x= stepWFRef (return  x) >>= return . fst

stepWFRef :: (Serialize a, Typeable a, MonadIO m) => m a -> Workflow m (WFRef a, a) #

Execute an step and return a reference to the result besides the result itself

readWFRef :: (Serialize a, Typeable a) => WFRef a -> STM (Maybe a) #

Read the content of a Workflow reference. Note that its result is not in the Workflow monad

State manipulation

writeWFRef :: (Serialize a, Typeable a) => WFRef a -> a -> STM () #

Writes a new value en in the workflow reference, that is, in the workflow log. Why would you use this?. Don't do that!. modifiying the content of the workflow log would change the excution flow when the workflow restarts. This metod is used internally in the package. The best way to communicate with a workflow is trough a persistent queue, using Data.Persistent.Collection:

worflow= exec1 "wf" do
         r <- stepWFRef  expr
         push "queue" r
         back <- pop "queueback"
         ...

moveState :: (MonadIO m, Indexable a, Serialize a, Typeable a) => String -> a -> a -> m () #

Moves the state of workflow with a seed value to become the state of other seed value This may be of interest when the entry value changes its key value but should not initiate a new workflow but continues with the current one

Workflow inspection

waitWFActive :: String -> STM () #

wait until the workflow is restarted

getAll :: Monad m => Workflow m [IDynamic] #

Return all the steps of the workflow log. The values are dynamic

to get all the steps with result of type Int: all <- getAll let lfacts = mapMaybe safeFromIDyn all :: [Int]

getWFKeys :: String -> IO [String] #

Return the keys of the workflows that are running with a given prefix

getWFHistory :: (Indexable a, Serialize a) => String -> a -> IO (Maybe Stat) #

Return the current state of the computation, in the IO monad

waitFor #

Arguments

:: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) 
=> (b -> Bool)

The condition that the retrieved object must meet

-> String

The workflow name

-> a

the INITIAL value used in the workflow to start it

-> IO b

The first event that meet the condition

Observe the workflow log until a condition is met.

waitForSTM #

Arguments

:: (Indexable a, Serialize a, Serialize b, Typeable a, Indexable b, Typeable b) 
=> (b -> Bool)

The condition that the retrieved object must meet

-> String

The workflow name

-> a

The INITIAL value used in the workflow

-> STM b

The first event that meet the condition

Persistent timeouts

waitUntilSTM :: TVar Bool -> STM () #

Wait until a certain clock time has passed by monitoring its flag, in the STM monad. This permits to compose timeouts with locks waiting for data using orElse

  • example: wait for any respoinse from a Queue if no response is given in 5 minutes, it is returned True.
  flag <- getTimeoutFlag $  5 * 60
  ap   <- step  .  atomically $  readSomewhere >>= return . Just  orElse  waitUntilSTM flag  >> return Nothing
  case ap of
       Nothing -> do logWF "timeout" ...
       Just x -> do logWF $ "received" ++ show x ...
 

getTimeoutFlag :: MonadIO m => Integer -> Workflow m (TVar Bool) #

Start the timeout and return the flag to be monitored by waitUntilSTM This timeout is persistent. This means that the counter is initialized in the first call to getTimeoutFlag no matter if the workflow is restarted. The time during which the worlkflow has been stopped count also. Thus, the wait time can exceed the time between failures. when timeout is 0 means no timeout.

withTimeout :: (MonadIO m, Typeable a, Serialize a) => Integer -> STM a -> Workflow m (Maybe a) #

Return either the result of the STM conputation or Nothing in case of timeout. The computation can retry This timeout is persistent. This means that the counter is initialized in the first call to getTimeoutFlag no matter if the workflow is restarted. The time during which the worlkflow has been stopped count also. Thus, the wait time can exceed the time between failures. when timeout is 0 it means no timeout.

withKillTimeout :: (MonadIO m, MonadCatch m) => String -> Int -> Integer -> m a -> m a #

Executes a computation understanding that it is inside the workflow identified by id. If f finish after time it genetates a Timeout exception which may result in the end of the workflow if the programmer does not catch it. If the workflow is restarted after time2 has elapsed, the workflow will restart from the beginning. If not, it will restart after the last logged step.

Usually time2> time

time2=0 means time2 is infinite withKillTimeout :: CMC.MonadCatchIO m => String -> Int -> Integer -> m a -> m a withKillTimeout id time time2 f = do tid <- liftIO myThreadId tstart <- liftIO getTimeSeconds let final= liftIO $ do tnow <- getTimeSeconds let ref = getDBRef $ keyResource $ stat0{wfName=id} -- !> (keyResource $ stat0{wfName=id} ) when (time2 /=0) . atomically $ do s <- readDBRef ref onNothing error ( "withKillTimeout: Workflow not found: "++ id) writeDBRef ref s{lastActive= tnow,timeout= Just (time2 - fromIntegral (tnow - tstart))} clearRunningFlag id let proc= do twatchdog <- liftIO $ case time of 0 -> return tid _ -> forkIO $ threadDelay (time * 1000000) >> throwTo tid Timeout r <- f liftIO $ killThread twatchdog return r

proc finally final

Trace logging

logWF :: MonadIO m => String -> Workflow m () #

Log a message in the workflow history. I can be printed out with showHistory The message is printed in the standard output too

Termination of workflows

killThreadWF :: (Indexable a, Serialize a, Typeable a, MonadIO m) => String -> a -> m () #

Kill the executing thread if not killed, but not its state. exec start or restartWorkflows will continue the workflow

killWF :: (Indexable a, MonadIO m) => String -> a -> m () #

Kill the process (if running) and drop it from the list of restart-able workflows. Its state history remains , so it can be inspected with getWfHistory showHistory and so on.

When the workflow has been called with no parameter, use: ()

delWF :: (Indexable a, MonadIO m, Typeable a) => String -> a -> m () #

Delete the WF from the running list and delete the workflow state from persistent storage. Use it to perform cleanup if the process has been killed.

When the workflow has been called with no parameter, use: ()

killThreadWF1 :: MonadIO m => String -> m () #

A version of KillThreadWF for workflows started wit no parameter by exec1

delWFHistory :: Indexable t => String -> t -> IO () #

Delete the history of a workflow. Be sure that this WF has finished.

Log writing policy

syncWrite :: SyncMode -> IO () #

Specify the cache synchronization policy with permanent storage. See SyncMode for details

data SyncMode :: * #

Constructors

Synchronous

sync state to permanent storage when atomicallySync is invoked

Asyncronous 

Fields

SyncManual

use syncCache to write the state

Print log history

showHistory :: Stat -> ByteString #

show the state changes along the workflow, that is, all the intermediate results

isInRecover :: Monad m => Workflow m Bool #

True if the workflow in recovery mode, reading the log to recover the process state

Low leve, internal use

runWF1 :: MonadIO m => String -> WF Stat m b -> Stat -> Bool -> m b #

getState :: (Monad m, MonadIO m, Indexable a, Serialize a, Typeable a) => String -> x -> a -> m (Either WFErrors (String, x, Stat)) #

Orphan instances

Monad m => Monad (WF s m) # 

Methods

(>>=) :: WF s m a -> (a -> WF s m b) -> WF s m b #

(>>) :: WF s m a -> WF s m b -> WF s m b #

return :: a -> WF s m a #

fail :: String -> WF s m a #

(Monad m, Functor m) => Functor (WF s m) # 

Methods

fmap :: (a -> b) -> WF s m a -> WF s m b #

(<$) :: a -> WF s m b -> WF s m a #

(Monad m, Functor m) => Applicative (WF s m) # 

Methods

pure :: a -> WF s m a #

(<*>) :: WF s m (a -> b) -> WF s m a -> WF s m b #

(*>) :: WF s m a -> WF s m b -> WF s m b #

(<*) :: WF s m a -> WF s m b -> WF s m a #

MonadIO m => MonadIO (WF Stat m) # 

Methods

liftIO :: IO a -> WF Stat m a #