X-Git-Url: https://git.martlubbers.net/?a=blobdiff_plain;f=cloudiTasks%2FcloudiTasks.icl;h=1cd0d36232e6d63e13cc885c95723ef86295df1e;hb=9343419d1659f24d616ef9a64292c2896b603163;hp=c5c337510e7010916bb861da2b6dcd09c0909fdd;hpb=7db58a8b1702130a00cce2a957730e96cf477b77;p=clean-tests.git diff --git a/cloudiTasks/cloudiTasks.icl b/cloudiTasks/cloudiTasks.icl index c5c3375..1cd0d36 100644 --- a/cloudiTasks/cloudiTasks.icl +++ b/cloudiTasks/cloudiTasks.icl @@ -1,98 +1,34 @@ module cloudiTasks -import iTasks.Internal.Serialization -import iTasks.Internal.TaskEval -import iTasks.UI.Editor.Common -import StdEnv import Data.Func -import Data.Tuple -import Data.Map.GenJSON +import StdEnv import iTasks import iTasks.Extensions.DateTime -import iTasks.Internal.Distributed.RemoteTask -import qualified Data.Map -import System.Time - -:: CloudTaskType = ExistingNode String Int - -//mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds - -asyncTask :: CloudTaskType (Task a) -> Task a | iTask a -asyncTask (ExistingNode host port) t - = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port}) - >>- \(id, _)->viewInformation [] id <<@ Title "result" - @? const NoValue -// = upd (\queue -// >>- \sp->externalProcess -// {tv_sec=10,tv_nsec=0} -// eo.appPath -// ["--slave", toString sp] -// Nothing//(Just tdir) -// Nothing -// ishare -// oshare -// -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of -// ([], []) = False -// x = True) \v->set ([], []) oshare >-| traceValue v]) -// >>- traceValue -// @? const NoValue +import iTasks.Extensions.Distributed Start w = flip doTasksWithOptions w \args eo # (eo, s) = case args of [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave) - _ = (eo, onRequest "/" o master) + //_ = (eo, onRequest "/" o master) + _ = (eo, onStartup o master) = Ok (s args, {eo & distributed=True}) -JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t] -JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c) -JSONDecode{|TaskWrapper|} _ c = (Nothing, c) -gEq{|TaskWrapper|} _ _ = False -gEditor{|TaskWrapper|} = emptyEditor -gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma - slave :: [String] -> Task () slave args = get applicationOptions >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort) - >-| parallel - [(Embedded, \stl->flip (@!) () $ forever $ - watch cloudITasksQueueInt - >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]-> - set xs cloudITasksQueueInt - >-| appendTask Embedded (\_->wrapTask tid task) stl - ] - )] [] - @? const NoValue -where - wrapTask :: (Task a) -> Task () | iTask a - wrapTask (Task evalorig) = Task eval @! () - where - eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld) - eval event ops iworld = case evalorig - -nextTaskIdShare :: SDSSource () TaskId () -nextTaskIdShare = SDSSource - { SDSSourceOptions - | name = "nextTaskIdShare" - , read = \_->appFst Ok o getNextTaskId - , write = \_ _->tuple $ Ok (\_ _->True) - } - -cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper) -cloudITasksQueue = - mapReadWrite - ( \(nextTaskId, _)->nextTaskId - , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks]) - ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt) - -cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)] -cloudITasksQueueInt = sharedStore "cloudQueue" [] + >-| asyncTaskListener master :: [String] -> Task () master args = get applicationOptions >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort) - >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink") +// >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (blockWait 5) +// >-| asyncTask (PrivateNode 9099 ["--slave", "9099"]) (traceValue 5 >-| traceValue 42) +// >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5) + >-| sleepSortPar [1,2,3,4] +// >&> viewSharedInformation [] + >>- traceValue @! () blockWait :: Int -> Task Int @@ -102,3 +38,22 @@ where sleep _ _ = code { ccall sleep "I:I:A" } + +sleepSort :: [Int] -> Task [Int] +sleepSort numbers = parallel + [ (Embedded, \_->waitForTimer False num >-| return num) + \\ num <- numbers + ] [] @? tresult (length numbers) + +sleepSortPar :: [Int] -> Task [Int] +sleepSortPar numbers = parallel + [ (Embedded, \_->asyncTask + (PrivateNode port ["--slave", toString port]) + $ blockWait num >-| return num) + \\ num <- numbers + & port <- [9091..9099] + ] [] @? tresult (length numbers) + +tresult l NoValue = NoValue +tresult l (Value ts _) + = let r = [v\\(_, Value v True)<-ts] in Value r (length r == l)