module cloudiTasks import iTasks.Internal.Serialization import iTasks.Internal.TaskEval import iTasks.UI.Editor.Common import StdEnv import Data.Func import Data.Tuple import Data.Map.GenJSON import iTasks import iTasks.Extensions.DateTime import iTasks.Internal.Distributed.RemoteTask import qualified Data.Map import System.Time :: CloudTaskType = ExistingNode String Int //mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds asyncTask :: CloudTaskType (Task a) -> Task a | iTask a asyncTask (ExistingNode host port) t = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port}) >>- \(id, _)->viewInformation [] id <<@ Title "result" @? const NoValue // = upd (\queue // >>- \sp->externalProcess // {tv_sec=10,tv_nsec=0} // eo.appPath // ["--slave", toString sp] // Nothing//(Just tdir) // Nothing // ishare // oshare // -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of // ([], []) = False // x = True) \v->set ([], []) oshare >-| traceValue v]) // >>- traceValue // @? const NoValue Start w = flip doTasksWithOptions w \args eo # (eo, s) = case args of [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave) _ = (eo, onRequest "/" o master) = Ok (s args, {eo & distributed=True}) JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t] JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c) JSONDecode{|TaskWrapper|} _ c = (Nothing, c) gEq{|TaskWrapper|} _ _ = False gEditor{|TaskWrapper|} = emptyEditor gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma slave :: [String] -> Task () slave args = get applicationOptions >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort) >-| parallel [(Embedded, \stl->flip (@!) () $ forever $ watch cloudITasksQueueInt >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]-> set xs cloudITasksQueueInt >-| appendTask Embedded (\_->wrapTask tid task) stl ] )] [] @? const NoValue where wrapTask :: (Task a) -> Task () | iTask a wrapTask (Task evalorig) = Task eval @! () where eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld) eval event ops iworld = case evalorig nextTaskIdShare :: SDSSource () TaskId () nextTaskIdShare = SDSSource { SDSSourceOptions | name = "nextTaskIdShare" , read = \_->appFst Ok o getNextTaskId , write = \_ _->tuple $ Ok (\_ _->True) } cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper) cloudITasksQueue = mapReadWrite ( \(nextTaskId, _)->nextTaskId , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks]) ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt) cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)] cloudITasksQueueInt = sharedStore "cloudQueue" [] master :: [String] -> Task () master args = get applicationOptions >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort) >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink") @! () blockWait :: Int -> Task Int blockWait i = accWorld (sleep i) where sleep :: !Int !*e -> (!Int, !*e) sleep _ _ = code { ccall sleep "I:I:A" }