monomorph structgen
[clean-tests.git] / cloudiTasks / cloudiTasks.icl
index 081848e..f5dbace 100644 (file)
@@ -1,31 +1,35 @@
 module cloudiTasks
 
+import Data.Func, Data.Functor, Data.Tuple
 import StdEnv
 import iTasks
-import Data.Func
-import iTasks.Extensions.Distributed
 
-Start w = flip doTasksWithOptions w \args eo
-       # (eo, s) = case args of
-               [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
-               _ = (eo, onRequest "/" o master)
-       = Ok (s args, {eo & distributed=True})
+Start w = doTasks master w
 
-slave :: [String] -> Task ()
-slave args
-       =   get applicationOptions
-       >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
-       >-| asyncTaskListener
-
-master :: [String] -> Task ()
-master args
+master :: Task ()
+master
        = get applicationOptions
        >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
-       >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (blockWait 5)
+//     >-| set 42 (remoteShare (sharedStore "bork" 42) {domain="localhost",port=9099})
 //     >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5)
-       >>! \s->viewInformation [] s
+//     >-| asyncTask "localhost" 9090 (blockWait 5)
+//     >-| asyncTask (PrivateNode 9099) (traceValue 5 >-| traceValue 42)
+       >-| asyncTask "localhost" 9099 (updateInformation [] 5)
+//     >-| sleepSortPar [5,1,3,8]
+       >&^ viewSharedInformation []
        @! ()
 
+asyncTaskChannel :: !String !Int !((sds () (Queue r) w) -> Task a) !((sds () (Queue w) r) -> Task b) -> Task (a, b)
+asyncTaskChannel host port remote local
+       =    asyncTask host port (remote shareTo)
+       -&&- 
+where
+       shareTo :: (sds () (Queue r) (Queue r))
+       shareTo = sdsFocus ("to-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue)
+
+       shareFro :: (sds () (Queue w) (Queue w))
+       shareFro = sdsFocus ("fro-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue)
+
 blockWait :: Int -> Task Int
 blockWait i = accWorld (sleep i)
 where
@@ -33,3 +37,17 @@ where
        sleep _ _ = code {
                        ccall sleep "I:I:A"
                }
+
+sleepSortPar :: [Int] -> Task [Int]
+sleepSortPar numbers = parallel
+       [ (Embedded, \stl->
+                   asyncTaskSpawn port (blockWait num)
+               >-| appendTask Embedded (\_->return num) stl
+               @? const NoValue)
+       \\ num  <- numbers
+       &  port <- [9092..9099]
+       ] [] @? \tv->case tv of
+               NoValue = NoValue
+               (Value ts _)
+                       # r = [v\\(_, Value v True)<-ts]
+                       = Value r (length r == length numbers)