1cd0d36232e6d63e13cc885c95723ef86295df1e
[clean-tests.git] / cloudiTasks / cloudiTasks.icl
1 module cloudiTasks
2
3 import Data.Func
4 import StdEnv
5 import iTasks
6 import iTasks.Extensions.DateTime
7 import iTasks.Extensions.Distributed
8
9 Start w = flip doTasksWithOptions w \args eo
10 # (eo, s) = case args of
11 [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
12 //_ = (eo, onRequest "/" o master)
13 _ = (eo, onStartup o master)
14 = Ok (s args, {eo & distributed=True})
15
16 slave :: [String] -> Task ()
17 slave args
18 = get applicationOptions
19 >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
20 >-| asyncTaskListener
21
22 master :: [String] -> Task ()
23 master args
24 = get applicationOptions
25 >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
26 // >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (blockWait 5)
27 // >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (traceValue 5 >-| traceValue 42)
28 // >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5)
29 >-| sleepSortPar [1,2,3,4]
30 // >&> viewSharedInformation []
31 >>- traceValue
32 @! ()
33
34 blockWait :: Int -> Task Int
35 blockWait i = accWorld (sleep i)
36 where
37 sleep :: !Int !*e -> (!Int, !*e)
38 sleep _ _ = code {
39 ccall sleep "I:I:A"
40 }
41
42 sleepSort :: [Int] -> Task [Int]
43 sleepSort numbers = parallel
44 [ (Embedded, \_->waitForTimer False num >-| return num)
45 \\ num <- numbers
46 ] [] @? tresult (length numbers)
47
48 sleepSortPar :: [Int] -> Task [Int]
49 sleepSortPar numbers = parallel
50 [ (Embedded, \_->asyncTask
51 (PrivateNode port ["--slave", toString port])
52 $ blockWait num >-| return num)
53 \\ num <- numbers
54 & port <- [9091..9099]
55 ] [] @? tresult (length numbers)
56
57 tresult l NoValue = NoValue
58 tresult l (Value ts _)
59 = let r = [v\\(_, Value v True)<-ts] in Value r (length r == l)