X-Git-Url: https://git.martlubbers.net/?a=blobdiff_plain;f=cloudiTasks%2FcloudiTasks.icl;h=f5dbaceea63b921beb1879ddd183a7e149f5e8a9;hb=dd851e8574c37cad729a3ba3b4e64a548bb4422e;hp=c5c337510e7010916bb861da2b6dcd09c0909fdd;hpb=7db58a8b1702130a00cce2a957730e96cf477b77;p=clean-tests.git diff --git a/cloudiTasks/cloudiTasks.icl b/cloudiTasks/cloudiTasks.icl index c5c3375..f5dbace 100644 --- a/cloudiTasks/cloudiTasks.icl +++ b/cloudiTasks/cloudiTasks.icl @@ -1,100 +1,35 @@ module cloudiTasks -import iTasks.Internal.Serialization -import iTasks.Internal.TaskEval -import iTasks.UI.Editor.Common +import Data.Func, Data.Functor, Data.Tuple import StdEnv -import Data.Func -import Data.Tuple -import Data.Map.GenJSON import iTasks -import iTasks.Extensions.DateTime -import iTasks.Internal.Distributed.RemoteTask -import qualified Data.Map -import System.Time -:: CloudTaskType = ExistingNode String Int +Start w = doTasks master w -//mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds - -asyncTask :: CloudTaskType (Task a) -> Task a | iTask a -asyncTask (ExistingNode host port) t - = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port}) - >>- \(id, _)->viewInformation [] id <<@ Title "result" - @? const NoValue -// = upd (\queue -// >>- \sp->externalProcess -// {tv_sec=10,tv_nsec=0} -// eo.appPath -// ["--slave", toString sp] -// Nothing//(Just tdir) -// Nothing -// ishare -// oshare -// -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of -// ([], []) = False -// x = True) \v->set ([], []) oshare >-| traceValue v]) -// >>- traceValue -// @? const NoValue - -Start w = flip doTasksWithOptions w \args eo - # (eo, s) = case args of - [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave) - _ = (eo, onRequest "/" o master) - = Ok (s args, {eo & distributed=True}) - -JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t] -JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c) -JSONDecode{|TaskWrapper|} _ c = (Nothing, c) -gEq{|TaskWrapper|} _ _ = False -gEditor{|TaskWrapper|} = emptyEditor -gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma - -slave :: [String] -> Task () -slave args - = get applicationOptions - >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort) - >-| parallel - [(Embedded, \stl->flip (@!) () $ forever $ - watch cloudITasksQueueInt - >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]-> - set xs cloudITasksQueueInt - >-| appendTask Embedded (\_->wrapTask tid task) stl - ] - )] [] - @? const NoValue -where - wrapTask :: (Task a) -> Task () | iTask a - wrapTask (Task evalorig) = Task eval @! () - where - eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld) - eval event ops iworld = case evalorig - -nextTaskIdShare :: SDSSource () TaskId () -nextTaskIdShare = SDSSource - { SDSSourceOptions - | name = "nextTaskIdShare" - , read = \_->appFst Ok o getNextTaskId - , write = \_ _->tuple $ Ok (\_ _->True) - } - -cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper) -cloudITasksQueue = - mapReadWrite - ( \(nextTaskId, _)->nextTaskId - , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks]) - ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt) - -cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)] -cloudITasksQueueInt = sharedStore "cloudQueue" [] - -master :: [String] -> Task () -master args +master :: Task () +master = get applicationOptions >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort) - >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink") +// >-| set 42 (remoteShare (sharedStore "bork" 42) {domain="localhost",port=9099}) +// >-| asyncTask (ExistingNode "localhost" 9099) (blockWait 5) +// >-| asyncTask "localhost" 9090 (blockWait 5) +// >-| asyncTask (PrivateNode 9099) (traceValue 5 >-| traceValue 42) + >-| asyncTask "localhost" 9099 (updateInformation [] 5) +// >-| sleepSortPar [5,1,3,8] + >&^ viewSharedInformation [] @! () +asyncTaskChannel :: !String !Int !((sds () (Queue r) w) -> Task a) !((sds () (Queue w) r) -> Task b) -> Task (a, b) +asyncTaskChannel host port remote local + = asyncTask host port (remote shareTo) + -&&- +where + shareTo :: (sds () (Queue r) (Queue r)) + shareTo = sdsFocus ("to-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue) + + shareFro :: (sds () (Queue w) (Queue w)) + shareFro = sdsFocus ("fro-" +++ host +++ toString port) $ memoryStore "asyncITasks-channels" (Just newQueue) + blockWait :: Int -> Task Int blockWait i = accWorld (sleep i) where @@ -102,3 +37,17 @@ where sleep _ _ = code { ccall sleep "I:I:A" } + +sleepSortPar :: [Int] -> Task [Int] +sleepSortPar numbers = parallel + [ (Embedded, \stl-> + asyncTaskSpawn port (blockWait num) + >-| appendTask Embedded (\_->return num) stl + @? const NoValue) + \\ num <- numbers + & port <- [9092..9099] + ] [] @? \tv->case tv of + NoValue = NoValue + (Value ts _) + # r = [v\\(_, Value v True)<-ts] + = Value r (length r == length numbers)