cloud
[clean-tests.git] / cloudiTasks / cloudiTasks.icl
1 module cloudiTasks
2
3 import iTasks.Internal.Serialization
4 import iTasks.Internal.TaskEval
5 import iTasks.UI.Editor.Common
6 import StdEnv
7 import Data.Func
8 import Data.Tuple
9 import Data.Map.GenJSON
10 import iTasks
11 import iTasks.Extensions.DateTime
12 import iTasks.Internal.Distributed.RemoteTask
13 import qualified Data.Map
14 import System.Time
15
16 :: CloudTaskType = ExistingNode String Int
17
18 //mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds
19
20 asyncTask :: CloudTaskType (Task a) -> Task a | iTask a
21 asyncTask (ExistingNode host port) t
22 = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port})
23 >>- \(id, _)->viewInformation [] id <<@ Title "result"
24 @? const NoValue
25 // = upd (\queue
26 // >>- \sp->externalProcess
27 // {tv_sec=10,tv_nsec=0}
28 // eo.appPath
29 // ["--slave", toString sp]
30 // Nothing//(Just tdir)
31 // Nothing
32 // ishare
33 // oshare
34 // -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of
35 // ([], []) = False
36 // x = True) \v->set ([], []) oshare >-| traceValue v])
37 // >>- traceValue
38 // @? const NoValue
39
40 Start w = flip doTasksWithOptions w \args eo
41 # (eo, s) = case args of
42 [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
43 _ = (eo, onRequest "/" o master)
44 = Ok (s args, {eo & distributed=True})
45
46 JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t]
47 JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c)
48 JSONDecode{|TaskWrapper|} _ c = (Nothing, c)
49 gEq{|TaskWrapper|} _ _ = False
50 gEditor{|TaskWrapper|} = emptyEditor
51 gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma
52
53 slave :: [String] -> Task ()
54 slave args
55 = get applicationOptions
56 >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
57 >-| parallel
58 [(Embedded, \stl->flip (@!) () $ forever $
59 watch cloudITasksQueueInt
60 >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]->
61 set xs cloudITasksQueueInt
62 >-| appendTask Embedded (\_->wrapTask tid task) stl
63 ]
64 )] []
65 @? const NoValue
66 where
67 wrapTask :: (Task a) -> Task () | iTask a
68 wrapTask (Task evalorig) = Task eval @! ()
69 where
70 eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld)
71 eval event ops iworld = case evalorig
72
73 nextTaskIdShare :: SDSSource () TaskId ()
74 nextTaskIdShare = SDSSource
75 { SDSSourceOptions
76 | name = "nextTaskIdShare"
77 , read = \_->appFst Ok o getNextTaskId
78 , write = \_ _->tuple $ Ok (\_ _->True)
79 }
80
81 cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper)
82 cloudITasksQueue =
83 mapReadWrite
84 ( \(nextTaskId, _)->nextTaskId
85 , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks])
86 ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt)
87
88 cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)]
89 cloudITasksQueueInt = sharedStore "cloudQueue" []
90
91 master :: [String] -> Task ()
92 master args
93 = get applicationOptions
94 >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
95 >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink")
96 @! ()
97
98 blockWait :: Int -> Task Int
99 blockWait i = accWorld (sleep i)
100 where
101 sleep :: !Int !*e -> (!Int, !*e)
102 sleep _ _ = code {
103 ccall sleep "I:I:A"
104 }