X-Git-Url: https://git.martlubbers.net/?a=blobdiff_plain;f=cloudiTasks%2FcloudiTasks.icl;h=f5dbaceea63b921beb1879ddd183a7e149f5e8a9;hb=dd851e8574c37cad729a3ba3b4e64a548bb4422e;hp=081848e329fb4d382e43ac32b6f1e490fa6ad660;hpb=319e21ea8c819a21c273ea286bad8781e000c332;p=clean-tests.git diff --git a/cloudiTasks/cloudiTasks.icl b/cloudiTasks/cloudiTasks.icl index 081848e..f5dbace 100644 --- a/cloudiTasks/cloudiTasks.icl +++ b/cloudiTasks/cloudiTasks.icl @@ -1,31 +1,35 @@ module cloudiTasks +import Data.Func, Data.Functor, Data.Tuple import StdEnv import iTasks -import Data.Func -import iTasks.Extensions.Distributed -Start w = flip doTasksWithOptions w \args eo - # (eo, s) = case args of - [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave) - _ = (eo, onRequest "/" o master) - = Ok (s args, {eo & distributed=True}) +Start w = doTasks master w -slave :: [String] -> Task () -slave args - = get applicationOptions - >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort) - >-| asyncTaskListener - -master :: [String] -> Task () -master args +master :: Task () +master = get applicationOptions >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort) - >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (blockWait 5) +// >-| set 42 (remoteShare (sharedStore "bork" 42) {domain="localhost",port=9099}) // >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5) - >>! \s->viewInformation [] s +// >-| asyncTask "localhost" 9090 (blockWait 5) +// >-| asyncTask (PrivateNode 9099) (traceValue 5 >-| traceValue 42) + >-| asyncTask "localhost" 9099 (updateInformation [] 5) +// >-| sleepSortPar [5,1,3,8] + >&^ viewSharedInformation [] @! () +asyncTaskChannel :: !String !Int !((sds () (Queue r) w) -> Task a) !((sds () (Queue w) r) -> Task b) -> Task (a, b) +asyncTaskChannel host port remote local + = asyncTask host port (remote shareTo) + -&&- +where + shareTo :: (sds () (Queue r) (Queue r)) + shareTo = sdsFocus ("to-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue) + + shareFro :: (sds () (Queue w) (Queue w)) + shareFro = sdsFocus ("fro-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue) + blockWait :: Int -> Task Int blockWait i = accWorld (sleep i) where @@ -33,3 +37,17 @@ where sleep _ _ = code { ccall sleep "I:I:A" } + +sleepSortPar :: [Int] -> Task [Int] +sleepSortPar numbers = parallel + [ (Embedded, \stl-> + asyncTaskSpawn port (blockWait num) + >-| appendTask Embedded (\_->return num) stl + @? const NoValue) + \\ num <- numbers + & port <- [9092..9099] + ] [] @? \tv->case tv of + NoValue = NoValue + (Value ts _) + # r = [v\\(_, Value v True)<-ts] + = Value r (length r == length numbers)