.'
[clean-tests.git] / cloudiTasks / cloudiTasks.icl
index c5c3375..1cd0d36 100644 (file)
@@ -1,98 +1,34 @@
 module cloudiTasks
 
-import iTasks.Internal.Serialization
-import iTasks.Internal.TaskEval
-import iTasks.UI.Editor.Common
-import StdEnv
 import Data.Func
-import Data.Tuple
-import Data.Map.GenJSON
+import StdEnv
 import iTasks
 import iTasks.Extensions.DateTime
-import iTasks.Internal.Distributed.RemoteTask
-import qualified Data.Map
-import System.Time
-
-:: CloudTaskType = ExistingNode String Int
-
-//mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds
-
-asyncTask :: CloudTaskType (Task a) -> Task a | iTask a
-asyncTask (ExistingNode host port) t
-       = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port})
-       >>- \(id, _)->viewInformation [] id <<@ Title "result"
-       @? const NoValue
-//     = upd (\queue
-//             >>- \sp->externalProcess
-//                             {tv_sec=10,tv_nsec=0}
-//                             eo.appPath
-//                             ["--slave", toString sp]
-//                             Nothing//(Just tdir)
-//                             Nothing
-//                             ishare
-//                             oshare
-//             -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of
-//                             ([], []) = False
-//                             x = True) \v->set ([], []) oshare >-| traceValue v])
-//             >>- traceValue
-//             @? const NoValue
+import iTasks.Extensions.Distributed
 
 Start w = flip doTasksWithOptions w \args eo
        # (eo, s) = case args of
                [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
-               _ = (eo, onRequest "/" o master)
+               //_ = (eo, onRequest "/" o master)
+               _ = (eo, onStartup o master)
        = Ok (s args, {eo & distributed=True})
 
-JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t]
-JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c)
-JSONDecode{|TaskWrapper|} _ c = (Nothing, c)
-gEq{|TaskWrapper|} _ _ = False
-gEditor{|TaskWrapper|} = emptyEditor
-gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma
-
 slave :: [String] -> Task ()
 slave args
        =   get applicationOptions
        >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
-       >-| parallel
-               [(Embedded, \stl->flip (@!) () $ forever $
-                           watch cloudITasksQueueInt
-                       >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]->
-                                   set xs cloudITasksQueueInt
-                               >-| appendTask Embedded (\_->wrapTask tid task) stl
-                       ]
-               )] []
-       @? const NoValue
-where
-       wrapTask :: (Task a) -> Task () | iTask a
-       wrapTask (Task evalorig) = Task eval @! ()
-       where
-               eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld)
-               eval event ops iworld = case evalorig 
-
-nextTaskIdShare :: SDSSource () TaskId ()
-nextTaskIdShare = SDSSource
-       { SDSSourceOptions
-       | name  = "nextTaskIdShare"
-       , read  = \_->appFst Ok o getNextTaskId
-       , write = \_ _->tuple $ Ok (\_ _->True)
-       }
-
-cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper)
-cloudITasksQueue =
-       mapReadWrite
-               ( \(nextTaskId, _)->nextTaskId
-               , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks])
-               ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt)
-
-cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)]
-cloudITasksQueueInt = sharedStore "cloudQueue" []
+       >-| asyncTaskListener
 
 master :: [String] -> Task ()
 master args
        = get applicationOptions
        >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
-       >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink")
+//     >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (blockWait 5)
+//     >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (traceValue 5 >-| traceValue 42)
+//     >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5)
+       >-| sleepSortPar [1,2,3,4]
+//     >&> viewSharedInformation []
+       >>- traceValue
        @! ()
 
 blockWait :: Int -> Task Int
@@ -102,3 +38,22 @@ where
        sleep _ _ = code {
                        ccall sleep "I:I:A"
                }
+
+sleepSort :: [Int] -> Task [Int]
+sleepSort numbers = parallel
+       [ (Embedded, \_->waitForTimer False num >-| return num)
+       \\ num  <- numbers
+       ] [] @? tresult (length numbers)
+
+sleepSortPar :: [Int] -> Task [Int]
+sleepSortPar numbers = parallel
+       [ (Embedded, \_->asyncTask
+               (PrivateNode port ["--slave", toString port])
+               $ blockWait num >-| return num)
+       \\ num  <- numbers
+       &  port <- [9091..9099]
+       ] [] @? tresult (length numbers)
+
+tresult l NoValue = NoValue
+tresult l (Value ts _)
+       = let r = [v\\(_, Value v True)<-ts] in Value r (length r == l)