.
[clean-tests.git] / cloudiTasks / cloudiTasks.icl
1 module cloudiTasks
2
3 import Data.Func, Data.Functor, Data.Tuple
4 import StdEnv
5 import iTasks
6
7 Start w = doTasks master w
8
9 master :: Task ()
10 master
11 = get applicationOptions
12 >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
13 >-| updateSharedInformation [] (remoteShare (sharedStore "bork" 42) {domain="localhost",port=9099})
14 // >-| set 42 (remoteShare (sharedStore "bork" 42) {domain="localhost",port=9099})
15 // >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5)
16 // >-| asyncTask "localhost" 9090 (blockWait 5)
17 // >-| asyncTask (PrivateNode 9099) (traceValue 5 >-| traceValue 42)
18 // >-| asyncTask "localhost" 9099 (updateInformation [] 5)
19 // >-| sleepSortPar [5,1,3,8]
20 >&^ viewSharedInformation []
21 @! ()
22 //
23 //asyncTaskChannel :: !String !Int !((sds () (Queue r) w) -> Task a) !((sds () (Queue w) r) -> Task b) -> Task (a, b)
24 //asyncTaskChannel host port remote local
25 // = asyncTask host port (remote shareTo)
26 // -&&-
27 //where
28 // shareTo :: (sds () (Queue r) (Queue r))
29 // shareTo = sdsFocus ("to-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue)
30 //
31 // shareFro :: (sds () (Queue w) (Queue w))
32 // shareFro = sdsFocus ("fro-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue)
33
34 blockWait :: Int -> Task Int
35 blockWait i = accWorld (sleep i)
36 where
37 sleep :: !Int !*e -> (!Int, !*e)
38 sleep _ _ = code {
39 ccall sleep "I:I:A"
40 }
41
42 sleepSortPar :: [Int] -> Task [Int]
43 sleepSortPar numbers = parallel
44 [ (Embedded, \stl->
45 /* asyncTaskSpawn port (blockWait num)
46 >-| */appendTask Embedded (\_->return num) stl
47 @? const NoValue)
48 \\ num <- numbers
49 & port <- [9092..9099]
50 ] [] @? \tv->case tv of
51 NoValue = NoValue
52 (Value ts _)
53 # r = [v\\(_, Value v True)<-ts]
54 = Value r (length r == length numbers)