Merge branch 'master' of git.martlubbers.net:clean-tests
[clean-tests.git] / cloudiTasks / cloudiTasks.icl
1 module cloudiTasks
2
3 import Data.Func, Data.Functor, Data.Tuple
4 import StdEnv
5 import iTasks
6
7 Start w = doTasks master w
8
9 master :: Task ()
10 master
11 = get applicationOptions
12 >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
13 // >-| set 42 (remoteShare (sharedStore "bork" 42) {domain="localhost",port=9099})
14 // >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5)
15 // >-| asyncTask "localhost" 9090 (blockWait 5)
16 // >-| asyncTask (PrivateNode 9099) (traceValue 5 >-| traceValue 42)
17 >-| asyncTask "localhost" 9099 (updateInformation [] 5)
18 // >-| sleepSortPar [5,1,3,8]
19 >&^ viewSharedInformation []
20 @! ()
21
22 asyncTaskChannel :: !String !Int !((sds () (Queue r) w) -> Task a) !((sds () (Queue w) r) -> Task b) -> Task (a, b)
23 asyncTaskChannel host port remote local
24 = asyncTask host port (remote shareTo)
25 -&&-
26 where
27 shareTo :: (sds () (Queue r) (Queue r))
28 shareTo = sdsFocus ("to-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue)
29
30 shareFro :: (sds () (Queue w) (Queue w))
31 shareFro = sdsFocus ("fro-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue)
32
33 blockWait :: Int -> Task Int
34 blockWait i = accWorld (sleep i)
35 where
36 sleep :: !Int !*e -> (!Int, !*e)
37 sleep _ _ = code {
38 ccall sleep "I:I:A"
39 }
40
41 sleepSortPar :: [Int] -> Task [Int]
42 sleepSortPar numbers = parallel
43 [ (Embedded, \stl->
44 asyncTaskSpawn port (blockWait num)
45 >-| appendTask Embedded (\_->return num) stl
46 @? const NoValue)
47 \\ num <- numbers
48 & port <- [9092..9099]
49 ] [] @? \tv->case tv of
50 NoValue = NoValue
51 (Value ts _)
52 # r = [v\\(_, Value v True)<-ts]
53 = Value r (length r == length numbers)