From: Mart Lubbers Date: Wed, 26 Feb 2020 14:28:44 +0000 (+0100) Subject: cloud X-Git-Url: https://git.martlubbers.net/?a=commitdiff_plain;h=7db58a8b1702130a00cce2a957730e96cf477b77;p=clean-tests.git cloud --- diff --git a/cloudiTasks/cloudiTasks.icl b/cloudiTasks/cloudiTasks.icl new file mode 100644 index 0000000..c5c3375 --- /dev/null +++ b/cloudiTasks/cloudiTasks.icl @@ -0,0 +1,104 @@ +module cloudiTasks + +import iTasks.Internal.Serialization +import iTasks.Internal.TaskEval +import iTasks.UI.Editor.Common +import StdEnv +import Data.Func +import Data.Tuple +import Data.Map.GenJSON +import iTasks +import iTasks.Extensions.DateTime +import iTasks.Internal.Distributed.RemoteTask +import qualified Data.Map +import System.Time + +:: CloudTaskType = ExistingNode String Int + +//mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds + +asyncTask :: CloudTaskType (Task a) -> Task a | iTask a +asyncTask (ExistingNode host port) t + = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port}) + >>- \(id, _)->viewInformation [] id <<@ Title "result" + @? const NoValue +// = upd (\queue +// >>- \sp->externalProcess +// {tv_sec=10,tv_nsec=0} +// eo.appPath +// ["--slave", toString sp] +// Nothing//(Just tdir) +// Nothing +// ishare +// oshare +// -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of +// ([], []) = False +// x = True) \v->set ([], []) oshare >-| traceValue v]) +// >>- traceValue +// @? const NoValue + +Start w = flip doTasksWithOptions w \args eo + # (eo, s) = case args of + [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave) + _ = (eo, onRequest "/" o master) + = Ok (s args, {eo & distributed=True}) + +JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t] +JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c) +JSONDecode{|TaskWrapper|} _ c = (Nothing, c) +gEq{|TaskWrapper|} _ _ = False +gEditor{|TaskWrapper|} = emptyEditor +gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma + +slave :: [String] -> Task () +slave args + = get applicationOptions + >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort) + >-| parallel + [(Embedded, \stl->flip (@!) () $ forever $ + watch cloudITasksQueueInt + >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]-> + set xs cloudITasksQueueInt + >-| appendTask Embedded (\_->wrapTask tid task) stl + ] + )] [] + @? const NoValue +where + wrapTask :: (Task a) -> Task () | iTask a + wrapTask (Task evalorig) = Task eval @! () + where + eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld) + eval event ops iworld = case evalorig + +nextTaskIdShare :: SDSSource () TaskId () +nextTaskIdShare = SDSSource + { SDSSourceOptions + | name = "nextTaskIdShare" + , read = \_->appFst Ok o getNextTaskId + , write = \_ _->tuple $ Ok (\_ _->True) + } + +cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper) +cloudITasksQueue = + mapReadWrite + ( \(nextTaskId, _)->nextTaskId + , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks]) + ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt) + +cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)] +cloudITasksQueueInt = sharedStore "cloudQueue" [] + +master :: [String] -> Task () +master args + = get applicationOptions + >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort) + >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink") + @! () + +blockWait :: Int -> Task Int +blockWait i = accWorld (sleep i) +where + sleep :: !Int !*e -> (!Int, !*e) + sleep _ _ = code { + ccall sleep "I:I:A" + } diff --git a/cloudiTasks/slave/cloudiTasks b/cloudiTasks/slave/cloudiTasks new file mode 100755 index 0000000..6c6bf4d Binary files /dev/null and b/cloudiTasks/slave/cloudiTasks differ