module cloudiTasks
-import iTasks.Internal.Serialization
-import iTasks.Internal.TaskEval
-import iTasks.UI.Editor.Common
+import Data.Func, Data.Functor, Data.Tuple
import StdEnv
-import Data.Func
-import Data.Tuple
-import Data.Map.GenJSON
import iTasks
-import iTasks.Extensions.DateTime
-import iTasks.Internal.Distributed.RemoteTask
-import qualified Data.Map
-import System.Time
-:: CloudTaskType = ExistingNode String Int
+Start w = doTasks master w
-//mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds
-
-asyncTask :: CloudTaskType (Task a) -> Task a | iTask a
-asyncTask (ExistingNode host port) t
- = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port})
- >>- \(id, _)->viewInformation [] id <<@ Title "result"
- @? const NoValue
-// = upd (\queue
-// >>- \sp->externalProcess
-// {tv_sec=10,tv_nsec=0}
-// eo.appPath
-// ["--slave", toString sp]
-// Nothing//(Just tdir)
-// Nothing
-// ishare
-// oshare
-// -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of
-// ([], []) = False
-// x = True) \v->set ([], []) oshare >-| traceValue v])
-// >>- traceValue
-// @? const NoValue
-
-Start w = flip doTasksWithOptions w \args eo
- # (eo, s) = case args of
- [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
- _ = (eo, onRequest "/" o master)
- = Ok (s args, {eo & distributed=True})
-
-JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t]
-JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c)
-JSONDecode{|TaskWrapper|} _ c = (Nothing, c)
-gEq{|TaskWrapper|} _ _ = False
-gEditor{|TaskWrapper|} = emptyEditor
-gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma
-
-slave :: [String] -> Task ()
-slave args
- = get applicationOptions
- >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
- >-| parallel
- [(Embedded, \stl->flip (@!) () $ forever $
- watch cloudITasksQueueInt
- >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]->
- set xs cloudITasksQueueInt
- >-| appendTask Embedded (\_->wrapTask tid task) stl
- ]
- )] []
- @? const NoValue
-where
- wrapTask :: (Task a) -> Task () | iTask a
- wrapTask (Task evalorig) = Task eval @! ()
- where
- eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld)
- eval event ops iworld = case evalorig
-
-nextTaskIdShare :: SDSSource () TaskId ()
-nextTaskIdShare = SDSSource
- { SDSSourceOptions
- | name = "nextTaskIdShare"
- , read = \_->appFst Ok o getNextTaskId
- , write = \_ _->tuple $ Ok (\_ _->True)
- }
-
-cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper)
-cloudITasksQueue =
- mapReadWrite
- ( \(nextTaskId, _)->nextTaskId
- , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks])
- ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt)
-
-cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)]
-cloudITasksQueueInt = sharedStore "cloudQueue" []
-
-master :: [String] -> Task ()
-master args
+master :: Task ()
+master
= get applicationOptions
>>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
- >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink")
+// >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5)
+// >-| asyncTask "localhost" 9090 (blockWait 5)
+// >-| asyncTask (PrivateNode 9099) (traceValue 5 >-| traceValue 42)
+// >-| asyncTaskSpawn 9099 (return 42)
+ >-| sleepSortPar [5,1,3,8]
+ >&^ viewSharedInformation []
@! ()
blockWait :: Int -> Task Int
sleep _ _ = code {
ccall sleep "I:I:A"
}
+
+sleepSortPar :: [Int] -> Task [Int]
+sleepSortPar numbers = parallel
+ [ (Embedded, \stl->
+ asyncTaskSpawn port (blockWait num)
+ >-| appendTask Embedded (\_->return num) stl
+ @? const NoValue)
+ \\ num <- numbers
+ & port <- [9092..9099]
+ ] [] @? \tv->case tv of
+ NoValue = NoValue
+ (Value ts _)
+ # r = [v\\(_, Value v True)<-ts]
+ = Value r (length r == length numbers)