+++ /dev/null
-module cloudiTasks
-
-import Data.Functor
-import Data.Queue
-import iTasks.Internal.Serialization
-import iTasks.Internal.TaskEval
-import iTasks.UI.Editor.Common
-import StdEnv
-import Data.Func
-import Data.Tuple
-import Data.Map.GenJSON
-import iTasks
-import iTasks.Extensions.DateTime
-import iTasks.Internal.Distributed.RemoteTask
-import qualified Data.Map
-import System.Time
-
-:: CloudTaskType = ExistingNode String Int
-
-//mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds
-
-asyncTask :: CloudTaskType (Task a) -> Task a | iTask a
-asyncTask (ExistingNode host port) t
- = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port})
- >>- \(tid, _)->let rvalue = remoteShare (sdsFocus tid cloudITasksValues) {domain=host,port=port}
- in Task (proxy NoValue rvalue rvalue)
-where
- proxy ::
- //The old task value
- (TaskValue a)
- //The original value queue
- (sds1 () (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange)))
- //The temporary value queue
- (sds2 () (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange)))
- Event
- TaskEvalOpts
- !*IWorld
- -> *(TaskResult a, *IWorld) | RWShared sds1 & Readable, Registrable sds2 & iTask a
- proxy lastVal valueShare tValueShare event {TaskEvalOpts|taskId,lastEval} iworld
- = case readRegister taskId tValueShare iworld of
- (Ok (ReadingDone queue), iworld)
- = case dequeue queue of
- (Nothing, queue)
- = (ValueResult
- lastVal
- {lastEvent=lastEval, removedTasks=[]}
- NoChange
- (Task (proxy lastVal valueShare tValueShare))
- , iworld)
- (Just (tv, ui), queue)
- = case write queue valueShare (TaskContext taskId) iworld of
- (Ok _, iworld)
- = (ValueResult
- tv
- {lastEvent=lastEval, removedTasks=[]}
- ui
- (Task (proxy tv valueShare valueShare))
- , iworld)
- (Error e, iworld) = (ExceptionResult e, iworld)
- (Ok (Reading tValueShare), iworld)
- = (ValueResult
- lastVal
- {lastEvent=lastEval, removedTasks=[]}
- NoChange
- (Task (proxy lastVal valueShare tValueShare))
- , iworld)
- (Error e, iworld) = (ExceptionResult e, iworld)
-
-Start w = flip doTasksWithOptions w \args eo
- # (eo, s) = case args of
- [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
- _ = (eo, onRequest "/" o master)
- = Ok (s args, {eo & distributed=True})
-
-JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t]
-JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c)
-JSONDecode{|TaskWrapper|} _ c = (Nothing, c)
-gEq{|TaskWrapper|} _ _ = False
-gEditor{|TaskWrapper|} = emptyEditor
-gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma
-
-slave :: [String] -> Task ()
-slave args
- = get applicationOptions
- >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
- >-| parallel
- [(Embedded, \stl->flip (@!) () $ forever $
- watch cloudITasksQueueInt
- >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]->
- set xs cloudITasksQueueInt
- >-| appendTask Embedded (\_->wrapTask tid task) stl
- ]
- )] []
- @? const NoValue
-where
- wrapTask :: TaskId (Task a) -> Task () | iTask a
- wrapTask taskId (Task teval) = Task \event opts iworld->
- case teval event {TaskEvalOpts|opts & taskId=taskId} iworld of
- (ValueResult tv tei uic newtask, iworld)
- = case modify (enqueue (tv, uic)) (sdsFocus taskId cloudITasksValues) EmptyContext iworld of
- (Ok (ModifyingDone _), iworld)
- = (ValueResult (() <$ tv) tei uic $ wrapTask taskId newtask, iworld)
- (Ok _, iworld) = (ExceptionResult $ exception "wrapTask async share????", iworld)
- (Error e, iworld) = (ExceptionResult e, iworld)
- (ExceptionResult e, iworld) = (ExceptionResult e, iworld)
- (DestroyedResult, iworld) = (DestroyedResult, iworld)
-
-derive JSONEncode Queue, Event, Set
-derive JSONDecode Queue, Event, Set
-cloudITasksValues :: SDSLens TaskId (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange)) | TC, JSONEncode{|*|}, JSONDecode{|*|} a
-cloudITasksValues = sdsTranslate "" toString
- $ memoryStore "cloudITasks-values" $ Just newQueue
-
-cloudITasksEvents :: SDSLens TaskId (Queue Event) (Queue Event)
-cloudITasksEvents = sdsTranslate "" toString
- $ memoryStore "cloudITasks-events" $ Just newQueue
-
-nextTaskIdShare :: SDSSource () TaskId ()
-nextTaskIdShare = SDSSource
- { SDSSourceOptions
- | name = "nextTaskIdShare"
- , read = \_->appFst Ok o getNextTaskId
- , write = \_ _->tuple $ Ok (\_ _->True)
- }
-
-cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper)
-cloudITasksQueue =
- mapReadWrite
- ( \(nextTaskId, _)->nextTaskId
- , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks])
- ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt)
-
-cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)]
-cloudITasksQueueInt = sdsFocus "queue" $ memoryStore "cloudITasks" (Just [])
-
-master :: [String] -> Task ()
-master args
- = get applicationOptions
- >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
- >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink")
- @! ()
-
-blockWait :: Int -> Task Int
-blockWait i = accWorld (sleep i)
-where
- sleep :: !Int !*e -> (!Int, !*e)
- sleep _ _ = code {
- ccall sleep "I:I:A"
- }