cloud
authorMart Lubbers <mart@martlubbers.net>
Wed, 26 Feb 2020 14:28:44 +0000 (15:28 +0100)
committerMart Lubbers <mart@martlubbers.net>
Wed, 26 Feb 2020 14:28:44 +0000 (15:28 +0100)
cloudiTasks/cloudiTasks.icl [new file with mode: 0644]
cloudiTasks/slave/cloudiTasks [new file with mode: 0755]

diff --git a/cloudiTasks/cloudiTasks.icl b/cloudiTasks/cloudiTasks.icl
new file mode 100644 (file)
index 0000000..c5c3375
--- /dev/null
@@ -0,0 +1,104 @@
+module cloudiTasks
+
+import iTasks.Internal.Serialization
+import iTasks.Internal.TaskEval
+import iTasks.UI.Editor.Common
+import StdEnv
+import Data.Func
+import Data.Tuple
+import Data.Map.GenJSON
+import iTasks
+import iTasks.Extensions.DateTime
+import iTasks.Internal.Distributed.RemoteTask
+import qualified Data.Map
+import System.Time
+
+:: CloudTaskType = ExistingNode String Int
+
+//mapReadWrite :: !(!r -> r`,!w` r -> Maybe w) !(Maybe (SDSReducer p w w`)) !(sds p r w) -> SDSLens p r` w` | gText{|*|} p & TC p & TC r & TC w & RWShared sds
+
+asyncTask :: CloudTaskType (Task a) -> Task a | iTask a
+asyncTask (ExistingNode host port) t
+       = upd (\tid->(tid, TaskWrapper t)) (remoteShare cloudITasksQueue {domain=host, port=port})
+       >>- \(id, _)->viewInformation [] id <<@ Title "result"
+       @? const NoValue
+//     = upd (\queue
+//             >>- \sp->externalProcess
+//                             {tv_sec=10,tv_nsec=0}
+//                             eo.appPath
+//                             ["--slave", toString sp]
+//                             Nothing//(Just tdir)
+//                             Nothing
+//                             ishare
+//                             oshare
+//             -|| traceValue ("Slave started at port " +++ toString sp) >-| forever (watch oshare >>* [OnValue $ ifValue (\t->case t of
+//                             ([], []) = False
+//                             x = True) \v->set ([], []) oshare >-| traceValue v])
+//             >>- traceValue
+//             @? const NoValue
+
+Start w = flip doTasksWithOptions w \args eo
+       # (eo, s) = case args of
+               [argv0,"--slave",p] = ({eo & sdsPort=toInt p}, onStartup o slave)
+               _ = (eo, onRequest "/" o master)
+       = Ok (s args, {eo & distributed=True})
+
+JSONEncode{|TaskWrapper|} _ t = [dynamicJSONEncode t]
+JSONDecode{|TaskWrapper|} _ [t:c] = (dynamicJSONDecode t, c)
+JSONDecode{|TaskWrapper|} _ c = (Nothing, c)
+gEq{|TaskWrapper|} _ _ = False
+gEditor{|TaskWrapper|} = emptyEditor
+gText{|TaskWrapper|} tf ma = maybe [] (\_->["TaskWrapper"]) ma
+
+slave :: [String] -> Task ()
+slave args
+       =   get applicationOptions
+       >>- \eo->traceValue ("Slave started on port " +++ toString eo.sdsPort)
+       >-| parallel
+               [(Embedded, \stl->flip (@!) () $ forever $
+                           watch cloudITasksQueueInt
+                       >>* [OnValue $ ifValue (not o isEmpty) \[(tid, TaskWrapper task):xs]->
+                                   set xs cloudITasksQueueInt
+                               >-| appendTask Embedded (\_->wrapTask tid task) stl
+                       ]
+               )] []
+       @? const NoValue
+where
+       wrapTask :: (Task a) -> Task () | iTask a
+       wrapTask (Task evalorig) = Task eval @! ()
+       where
+               eval :: Event TaskEvalOpts *IWorld -> *(TaskResult a, *IWorld)
+               eval event ops iworld = case evalorig 
+
+nextTaskIdShare :: SDSSource () TaskId ()
+nextTaskIdShare = SDSSource
+       { SDSSourceOptions
+       | name  = "nextTaskIdShare"
+       , read  = \_->appFst Ok o getNextTaskId
+       , write = \_ _->tuple $ Ok (\_ _->True)
+       }
+
+cloudITasksQueue :: SDSLens () TaskId (TaskId, TaskWrapper)
+cloudITasksQueue =
+       mapReadWrite
+               ( \(nextTaskId, _)->nextTaskId
+               , \newTask (nextTaskId, tasks)->Just ((), [newTask:tasks])
+               ) Nothing (nextTaskIdShare >*< cloudITasksQueueInt)
+
+cloudITasksQueueInt :: SimpleSDSLens [(TaskId, TaskWrapper)]
+cloudITasksQueueInt = sharedStore "cloudQueue" []
+
+master :: [String] -> Task ()
+master args
+       = get applicationOptions
+       >>- \eo->traceValue ("Master started on port " +++ toString eo.serverPort)
+       >-| asyncTask (ExistingNode "localhost" 9099) (traceValue "boink")
+       @! ()
+
+blockWait :: Int -> Task Int
+blockWait i = accWorld (sleep i)
+where
+       sleep :: !Int !*e -> (!Int, !*e)
+       sleep _ _ = code {
+                       ccall sleep "I:I:A"
+               }
diff --git a/cloudiTasks/slave/cloudiTasks b/cloudiTasks/slave/cloudiTasks
new file mode 100755 (executable)
index 0000000..6c6bf4d
Binary files /dev/null and b/cloudiTasks/slave/cloudiTasks differ