module cloudiTasks import Data.Functor import Data.Queue import iTasks.Internal.Serialization import iTasks.Internal.TaskEval import iTasks.UI.Editor.Common import StdEnv import Data.Func import Data.Tuple import Data.Map.GenJSON import iTasks import iTasks.Extensions.DateTime import iTasks.Internal.Distributed.RemoteTask import qualified Data.Map import System.Time :: CloudTaskType = ExistingNode String Int //mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds asyncTask :: CloudTaskType (Task a) -> Task a | iTask a asyncTask (ExistingNode host port) t = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port}) >>- \(tid, _)->let rvalue = remoteShare (sdsFocus tid cloudITasksValues) {domain=host,port=port} in Task (proxy NoValue rvalue rvalue) where proxy :: //The old task value (TaskValue a) //The original value queue (sds1 () (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange))) //The temporary value queue (sds2 () (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange))) Event TaskEvalOpts !*IWorld -> *(TaskResult a, *IWorld) | RWShared sds1 & Readable, Registrable sds2 & iTask a proxy lastVal valueShare tValueShare event {TaskEvalOpts|taskId,lastEval} iworld = case readRegister taskId tValueShare iworld of (Ok (ReadingDone queue), iworld) = case dequeue queue of (Nothing, queue) = (ValueResult lastVal {lastEvent=lastEval, removedTasks=[]} NoChange (Task (proxy lastVal valueShare tValueShare)) , iworld) (Just (tv, ui), queue) = case write queue valueShare (TaskContext taskId) iworld of (Ok _, iworld) = (ValueResult tv {lastEvent=lastEval, removedTasks=[]} ui (Task (proxy tv valueShare valueShare)) , iworld) (Error e, iworld) = (ExceptionResult e, iworld) (Ok (Reading tValueShare), iworld) = (ValueResult lastVal {lastEvent=lastEval, removedTasks=[]} NoChange (Task (proxy lastVal valueShare tValueShare)) , iworld) (Error e, iworld) = (ExceptionResult e, iworld) Start w = flip doTasksWithOptions w \args eo # (eo, s) = case args of [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave) _ = (eo, onRequest "/" o master) = Ok (s args, {eo & distributed=True}) JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t] JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c) JSONDecode{|TaskWrapper|} _ c = (Nothing, c) gEq{|TaskWrapper|} _ _ = False gEditor{|TaskWrapper|} = emptyEditor gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma slave :: [String] -> Task () slave args = get applicationOptions >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort) >-| parallel [(Embedded, \stl->flip (@!) () $ forever $ watch cloudITasksQueueInt >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]-> set xs cloudITasksQueueInt >-| appendTask Embedded (\_->wrapTask tid task) stl ] )] [] @? const NoValue where wrapTask :: TaskId (Task a) -> Task () | iTask a wrapTask taskId (Task teval) = Task \event opts iworld-> case teval event {TaskEvalOpts|opts & taskId=taskId} iworld of (ValueResult tv tei uic newtask, iworld) = case modify (enqueue (tv, uic)) (sdsFocus taskId cloudITasksValues) EmptyContext iworld of (Ok (ModifyingDone _), iworld) = (ValueResult (() <$ tv) tei uic $ wrapTask taskId newtask, iworld) (Ok _, iworld) = (ExceptionResult $ exception "wrapTask async share????", iworld) (Error e, iworld) = (ExceptionResult e, iworld) (ExceptionResult e, iworld) = (ExceptionResult e, iworld) (DestroyedResult, iworld) = (DestroyedResult, iworld) derive JSONEncode Queue, Event, Set derive JSONDecode Queue, Event, Set cloudITasksValues :: SDSLens TaskId (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange)) | TC, JSONEncode{|*|}, JSONDecode{|*|} a cloudITasksValues = sdsTranslate "" toString $ memoryStore "cloudITasks-values" $ Just newQueue cloudITasksEvents :: SDSLens TaskId (Queue Event) (Queue Event) cloudITasksEvents = sdsTranslate "" toString $ memoryStore "cloudITasks-events" $ Just newQueue nextTaskIdShare :: SDSSource () TaskId () nextTaskIdShare = SDSSource { SDSSourceOptions | name = "nextTaskIdShare" , read = \_->appFst Ok o getNextTaskId , write = \_ _->tuple $ Ok (\_ _->True) } cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper) cloudITasksQueue = mapReadWrite ( \(nextTaskId, _)->nextTaskId , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks]) ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt) cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)] cloudITasksQueueInt = sdsFocus "queue" $ memoryStore "cloudITasks" (Just []) master :: [String] -> Task () master args = get applicationOptions >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort) >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink") @! () blockWait :: Int -> Task Int blockWait i = accWorld (sleep i) where sleep :: !Int !*e -> (!Int, !*e) sleep _ _ = code { ccall sleep "I:I:A" }