3 import iTasks.Internal.Serialization
4 import iTasks.Internal.TaskEval
5 import iTasks.UI.Editor.Common
9 import Data.Map.GenJSON
11 import iTasks.Extensions.DateTime
12 import iTasks.Internal.Distributed.RemoteTask
13 import qualified Data.Map
16 :: CloudTaskType = ExistingNode String Int
18 //mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds
20 asyncTask :: CloudTaskType (Task a) -> Task a | iTask a
21 asyncTask (ExistingNode host port) t
22 = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port})
23 >>- \(id, _)->viewInformation [] id <<@ Title "result"
26 // >>- \sp->externalProcess
27 // {tv_sec=10,tv_nsec=0}
29 // ["--slave", toString sp]
30 // Nothing//(Just tdir)
34 // -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of
36 // x = True) \v->set ([], []) oshare >-| traceValue v])
40 Start w = flip doTasksWithOptions w \args eo
41 # (eo, s) = case args of
42 [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
43 _ = (eo, onRequest "/" o master)
44 = Ok (s args, {eo & distributed=True})
46 JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t]
47 JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c)
48 JSONDecode{|TaskWrapper|} _ c = (Nothing, c)
49 gEq{|TaskWrapper|} _ _ = False
50 gEditor{|TaskWrapper|} = emptyEditor
51 gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma
53 slave :: [String] -> Task ()
55 = get applicationOptions
56 >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
58 [(Embedded, \stl->flip (@!) () $ forever $
59 watch cloudITasksQueueInt
60 >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]->
61 set xs cloudITasksQueueInt
62 >-| appendTask Embedded (\_->wrapTask tid task) stl
67 wrapTask :: (Task a) -> Task () | iTask a
68 wrapTask (Task evalorig) = Task eval @! ()
70 eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld)
71 eval event ops iworld = case evalorig
73 nextTaskIdShare :: SDSSource () TaskId ()
74 nextTaskIdShare = SDSSource
76 | name = "nextTaskIdShare"
77 , read = \_->appFst Ok o getNextTaskId
78 , write = \_ _->tuple $ Ok (\_ _->True)
81 cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper)
84 ( \(nextTaskId, _)->nextTaskId
85 , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks])
86 ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt)
88 cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)]
89 cloudITasksQueueInt = sharedStore "cloudQueue" []
91 master :: [String] -> Task ()
93 = get applicationOptions
94 >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
95 >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink")
98 blockWait :: Int -> Task Int
99 blockWait i = accWorld (sleep i)
101 sleep :: !Int !*e -> (!Int, !*e)