09d6fde892b47dfd7988e6475620121cb7a79d05
[clean-tests.git] / slave / cloudiTasks / cloudiTasks.icl
1 module cloudiTasks
2
3 import Data.Functor
4 import Data.Queue
5 import iTasks.Internal.Serialization
6 import iTasks.Internal.TaskEval
7 import iTasks.UI.Editor.Common
8 import StdEnv
9 import Data.Func
10 import Data.Tuple
11 import Data.Map.GenJSON
12 import iTasks
13 import iTasks.Extensions.DateTime
14 import iTasks.Internal.Distributed.RemoteTask
15 import qualified Data.Map
16 import System.Time
17
18 :: CloudTaskType = ExistingNode String Int
19
20 //mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds
21
22 asyncTask :: CloudTaskType (Task a) -> Task a | iTask a
23 asyncTask (ExistingNode host port) t
24 = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port})
25 >>- \(tid, _)->let rvalue = remoteShare (sdsFocus tid cloudITasksValues) {domain=host,port=port}
26 in Task (proxy NoValue rvalue rvalue)
27 where
28 proxy ::
29 //The old task value
30 (TaskValue a)
31 //The original value queue
32 (sds1 () (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange)))
33 //The temporary value queue
34 (sds2 () (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange)))
35 Event
36 TaskEvalOpts
37 !*IWorld
38 -> *(TaskResult a, *IWorld) | RWShared sds1 & Readable, Registrable sds2 & iTask a
39 proxy lastVal valueShare tValueShare event {TaskEvalOpts|taskId,lastEval} iworld
40 = case readRegister taskId tValueShare iworld of
41 (Ok (ReadingDone queue), iworld)
42 = case dequeue queue of
43 (Nothing, queue)
44 = (ValueResult
45 lastVal
46 {lastEvent=lastEval, removedTasks=[]}
47 NoChange
48 (Task (proxy lastVal valueShare tValueShare))
49 , iworld)
50 (Just (tv, ui), queue)
51 = case write queue valueShare (TaskContext taskId) iworld of
52 (Ok _, iworld)
53 = (ValueResult
54 tv
55 {lastEvent=lastEval, removedTasks=[]}
56 ui
57 (Task (proxy tv valueShare valueShare))
58 , iworld)
59 (Error e, iworld) = (ExceptionResult e, iworld)
60 (Ok (Reading tValueShare), iworld)
61 = (ValueResult
62 lastVal
63 {lastEvent=lastEval, removedTasks=[]}
64 NoChange
65 (Task (proxy lastVal valueShare tValueShare))
66 , iworld)
67 (Error e, iworld) = (ExceptionResult e, iworld)
68
69 Start w = flip doTasksWithOptions w \args eo
70 # (eo, s) = case args of
71 [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
72 _ = (eo, onRequest "/" o master)
73 = Ok (s args, {eo & distributed=True})
74
75 JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t]
76 JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c)
77 JSONDecode{|TaskWrapper|} _ c = (Nothing, c)
78 gEq{|TaskWrapper|} _ _ = False
79 gEditor{|TaskWrapper|} = emptyEditor
80 gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma
81
82 slave :: [String] -> Task ()
83 slave args
84 = get applicationOptions
85 >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
86 >-| parallel
87 [(Embedded, \stl->flip (@!) () $ forever $
88 watch cloudITasksQueueInt
89 >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]->
90 set xs cloudITasksQueueInt
91 >-| appendTask Embedded (\_->wrapTask tid task) stl
92 ]
93 )] []
94 @? const NoValue
95 where
96 wrapTask :: TaskId (Task a) -> Task () | iTask a
97 wrapTask taskId (Task teval) = Task \event opts iworld->
98 case teval event {TaskEvalOpts|opts & taskId=taskId} iworld of
99 (ValueResult tv tei uic newtask, iworld)
100 = case modify (enqueue (tv, uic)) (sdsFocus taskId cloudITasksValues) EmptyContext iworld of
101 (Ok (ModifyingDone _), iworld)
102 = (ValueResult (() <$ tv) tei uic $ wrapTask taskId newtask, iworld)
103 (Ok _, iworld) = (ExceptionResult $ exception "wrapTask async share????", iworld)
104 (Error e, iworld) = (ExceptionResult e, iworld)
105 (ExceptionResult e, iworld) = (ExceptionResult e, iworld)
106 (DestroyedResult, iworld) = (DestroyedResult, iworld)
107
108 derive JSONEncode Queue, Event, Set
109 derive JSONDecode Queue, Event, Set
110 cloudITasksValues :: SDSLens TaskId (Queue (TaskValue a, UIChange)) (Queue (TaskValue a, UIChange)) | TC, JSONEncode{|*|}, JSONDecode{|*|} a
111 cloudITasksValues = sdsTranslate "" toString
112 $ memoryStore "cloudITasks-values" $ Just newQueue
113
114 cloudITasksEvents :: SDSLens TaskId (Queue Event) (Queue Event)
115 cloudITasksEvents = sdsTranslate "" toString
116 $ memoryStore "cloudITasks-events" $ Just newQueue
117
118 nextTaskIdShare :: SDSSource () TaskId ()
119 nextTaskIdShare = SDSSource
120 { SDSSourceOptions
121 | name = "nextTaskIdShare"
122 , read = \_->appFst Ok o getNextTaskId
123 , write = \_ _->tuple $ Ok (\_ _->True)
124 }
125
126 cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper)
127 cloudITasksQueue =
128 mapReadWrite
129 ( \(nextTaskId, _)->nextTaskId
130 , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks])
131 ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt)
132
133 cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)]
134 cloudITasksQueueInt = sdsFocus "queue" $ memoryStore "cloudITasks" (Just [])
135
136 master :: [String] -> Task ()
137 master args
138 = get applicationOptions
139 >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
140 >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink")
141 @! ()
142
143 blockWait :: Int -> Task Int
144 blockWait i = accWorld (sleep i)
145 where
146 sleep :: !Int !*e -> (!Int, !*e)
147 sleep _ _ = code {
148 ccall sleep "I:I:A"
149 }