73dec053e17dbe57238096e0211b5905b2d3d824
[clean-tests.git] / cloudiTasks / cloudiTasks.icl
1 module cloudiTasks
2
3 import Data.Func, Data.Functor, Data.Tuple
4 import StdEnv
5 import iTasks
6
7 Start w = doTasks master w
8
9 master :: Task ()
10 master
11 = get applicationOptions
12 >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
13 // >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5)
14 // >-| asyncTask "localhost" 9090 (blockWait 5)
15 // >-| asyncTask (PrivateNode 9099) (traceValue 5 >-| traceValue 42)
16 >-| asyncTaskSpawn 9099 (return 42)
17 >-| asyncTaskSpawn 9099 (updateInformation [] 42)
18 // >-| (updateInformation [] 42 -&&- updateInformation [] 22) @. ("localhost", 9099)
19 // >-| asyncTaskSpawn 9099 (updateInformation [] 42 -&&- updateInformation [] 22)
20 // >-| sleepSortPar [5,1,3,8]
21 >&^ viewSharedInformation []
22 @! ()
23
24 asyncTaskChannel :: !String !Int !((sds () (Queue r) w) -> Task a) !((sds () (Queue w) r) -> Task b) -> Task (a, b)
25 asyncTaskChannel host port remote local
26 = asyncTask host port (remote shareTo)
27 -&&-
28 where
29 shareTo :: (sds () (Queue r) (Queue r))
30 shareTo = sdsFocus ("to-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue)
31
32 shareFro :: (sds () (Queue w) (Queue w))
33 shareFro = sdsFocus ("fro-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue)
34
35 blockWait :: Int -> Task Int
36 blockWait i = accWorld (sleep i)
37 where
38 sleep :: !Int !*e -> (!Int, !*e)
39 sleep _ _ = code {
40 ccall sleep "I:I:A"
41 }
42
43 sleepSortPar :: [Int] -> Task [Int]
44 sleepSortPar numbers = parallel
45 [ (Embedded, \stl->
46 asyncTaskSpawn port (blockWait num)
47 >-| appendTask Embedded (\_->return num) stl
48 @? const NoValue)
49 \\ num <- numbers
50 & port <- [9092..9099]
51 ] [] @? \tv->case tv of
52 NoValue = NoValue
53 (Value ts _)
54 # r = [v\\(_, Value v True)<-ts]
55 = Value r (length r == length numbers)