From: Mart Lubbers Date: Thu, 27 Feb 2020 15:52:21 +0000 (+0100) Subject: cloud X-Git-Url: https://git.martlubbers.net/?a=commitdiff_plain;h=319e21ea8c819a21c273ea286bad8781e000c332;p=clean-tests.git cloud --- diff --git a/cloudiTasks/cloudiTasks.icl b/cloudiTasks/cloudiTasks.icl index c5c3375..081848e 100644 --- a/cloudiTasks/cloudiTasks.icl +++ b/cloudiTasks/cloudiTasks.icl @@ -1,41 +1,9 @@ module cloudiTasks -import iTasks.Internal.Serialization -import iTasks.Internal.TaskEval -import iTasks.UI.Editor.Common import StdEnv -import Data.Func -import Data.Tuple -import Data.Map.GenJSON import iTasks -import iTasks.Extensions.DateTime -import iTasks.Internal.Distributed.RemoteTask -import qualified Data.Map -import System.Time - -:: CloudTaskType = ExistingNode String Int - -//mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds - -asyncTask :: CloudTaskType (Task a) -> Task a | iTask a -asyncTask (ExistingNode host port) t - = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port}) - >>- \(id, _)->viewInformation [] id <<@ Title "result" - @? const NoValue -// = upd (\queue -// >>- \sp->externalProcess -// {tv_sec=10,tv_nsec=0} -// eo.appPath -// ["--slave", toString sp] -// Nothing//(Just tdir) -// Nothing -// ishare -// oshare -// -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of -// ([], []) = False -// x = True) \v->set ([], []) oshare >-| traceValue v]) -// >>- traceValue -// @? const NoValue +import Data.Func +import iTasks.Extensions.Distributed Start w = flip doTasksWithOptions w \args eo # (eo, s) = case args of @@ -43,56 +11,19 @@ Start w = flip doTasksWithOptions w \args eo _ = (eo, onRequest "/" o master) = Ok (s args, {eo & distributed=True}) -JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t] -JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c) -JSONDecode{|TaskWrapper|} _ c = (Nothing, c) -gEq{|TaskWrapper|} _ _ = False -gEditor{|TaskWrapper|} = emptyEditor -gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma - slave :: [String] -> Task () slave args = get applicationOptions >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort) - >-| parallel - [(Embedded, \stl->flip (@!) () $ forever $ - watch cloudITasksQueueInt - >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]-> - set xs cloudITasksQueueInt - >-| appendTask Embedded (\_->wrapTask tid task) stl - ] - )] [] - @? const NoValue -where - wrapTask :: (Task a) -> Task () | iTask a - wrapTask (Task evalorig) = Task eval @! () - where - eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld) - eval event ops iworld = case evalorig - -nextTaskIdShare :: SDSSource () TaskId () -nextTaskIdShare = SDSSource - { SDSSourceOptions - | name = "nextTaskIdShare" - , read = \_->appFst Ok o getNextTaskId - , write = \_ _->tuple $ Ok (\_ _->True) - } - -cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper) -cloudITasksQueue = - mapReadWrite - ( \(nextTaskId, _)->nextTaskId - , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks]) - ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt) - -cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)] -cloudITasksQueueInt = sharedStore "cloudQueue" [] + >-| asyncTaskListener master :: [String] -> Task () master args = get applicationOptions >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort) - >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink") + >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (blockWait 5) +// >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5) + >>! \s->viewInformation [] s @! () blockWait :: Int -> Task Int