Browse Source

added single

addedsingle
Alberto G. Corona 5 years ago
parent
commit
6a51f0e1df
  1. 278
      src/Transient/Move.hs

278
src/Transient/Move.hs

@ -10,15 +10,9 @@
--
-- | see <https://www.fpcomplete.com/user/agocorona/moving-haskell-processes-between-nodes-transient-effects-iv>
-----------------------------------------------------------------------------
{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveDataTypeable , ExistentialQuantification, OverloadedStrings
,ScopedTypeVariables, StandaloneDeriving, RecordWildCards, FlexibleContexts, CPP
,GeneralizedNewtypeDeriving #-}
module Transient.Move(
Cloud(..),runCloudIO, runCloudIO',local,onAll,lazy, loggedc, lliftIO,localIO,
@ -36,6 +30,9 @@ clustered, mclustered, callNodes,
-- * messaging
newMailbox, putMailbox,getMailbox,cleanMailbox,
-- * thread control
single,
#ifndef ghcjs_HOST_OS
setBuffSize, getBuffSize,
#endif
@ -52,8 +49,8 @@ addNodes, shuffleNodes,
) where
import Transient.Base
import Transient.Internals hiding ((!>))
import Transient.Internals hiding ((!>))
import Transient.Logged
import Transient.Indeterminism(choose)
import Transient.Backtrack
@ -63,7 +60,7 @@ import Control.Applicative
#ifndef ghcjs_HOST_OS
import Network
import Network.Info
import qualified Data.IP as IP
--import qualified Data.IP as IP
import qualified Network.Socket as NS
import qualified Network.BSD as BSD
import qualified Network.WebSockets as NWS(RequestHead(..))
@ -128,10 +125,8 @@ import System.Random
import Data.Dynamic
import Data.String
-- * Handful Aliases.
type EventMapRef = IORef (M.Map T.Text (EVar Dynamic))
import System.Mem.StableName
import Unsafe.Coerce
#ifdef ghcjs_HOST_OS
type HostName = String
@ -189,12 +184,12 @@ local = Cloud . logged
-- #ifndef ghcjs_HOST_OS
-- | run the cloud computation.
runCloudIO :: Typeable a => Cloud a -> IO a
runCloudIO (Cloud mx) = keep mx
runCloudIO :: Typeable a => Cloud a -> IO a
runCloudIO (Cloud mx)= keep mx
-- | run the cloud computation with no console input
runCloudIO' :: Typeable a => Cloud a -> IO a
runCloudIO' (Cloud mx) = keep' mx
runCloudIO' :: Typeable a => Cloud a -> IO a
runCloudIO' (Cloud mx)= keep' mx
-- #endif
@ -280,13 +275,42 @@ atRemote proc= loggedc $ do
runAt :: Loggable a => Node -> Cloud a -> Cloud a
runAt= callTo
-- | execute a single thread for each connection
--
-- > box <- foo
-- > r <- runAt node . local . single $ getMailbox box
-- > localIO $ print r
--
-- if foo would return many values, the above code would monitor one remote mailbox
-- each time: the last one entered.
-- Without single, it would monitor all of them.
single :: TransIO a -> TransIO a
single f= do
con@Connection{closChildren=rmap} <- getSData <|> error "single: only works within a wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ makeStableName f >>= return . hashStableName
chs <-
let mx = M.lookup id mapth
in case mx of
Just tv -> return tv
Nothing -> liftIO $ newTVarIO []
modify $ \ s -> s{children= chs} -- to allow his own thread control
liftIO $ killChildren chs
f <** do
id <- liftIO $ makeStableName f >>= return . hashStableName
liftIO $ modifyIORef rmap $ \mapth -> M.insert id chs mapth
msend :: Loggable a => Connection -> StreamData a -> TransIO ()
#ifndef ghcjs_HOST_OS
msend (Connection _(Just (Node2Node _ sock _)) _ _ blocked _ _ ) r= do
msend (Connection _(Just (Node2Node _ sock _)) _ _ blocked _ _ _) r= do
r <- liftIO $ do
withMVar blocked $
const $ do
@ -298,20 +322,20 @@ msend (Connection _(Just (Node2Node _ sock _)) _ _ blocked _ _ ) r= do
juste -> finish juste
msend (Connection _(Just (Node2Web sconn)) _ _ blocked _ _) r=liftIO $
msend (Connection _(Just (Node2Web sconn)) _ _ blocked _ _ _) r=liftIO $
withMVar blocked $ const $ WS.sendTextData sconn $ BS.pack (show r)
#else
msend (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _) r= liftIO $
msend (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _ _) r= liftIO $
withMVar blocked $ const $ JavaScript.Web.WebSocket.send (JS.pack $ show r) sconn -- !!> "MSEND SOCKET"
#endif
msend (Connection _ Nothing _ _ _ _ _ ) _= error "msend out of wormhole context"
msend (Connection _ Nothing _ _ _ _ _ _) _= error "msend out of wormhole context"
mread :: Loggable a => Connection -> TransIO (StreamData a)
@ -319,7 +343,7 @@ mread :: Loggable a => Connection -> TransIO (StreamData a)
#ifdef ghcjs_HOST_OS
mread (Connection _ (Just (Web2Node sconn)) _ _ _ _ _)= wsRead sconn
mread (Connection _ (Just (Web2Node sconn)) _ _ _ _ _ _)= wsRead sconn
@ -328,7 +352,7 @@ wsRead ws= do
dat <- react (hsonmessage ws) (return ())
case JM.getData dat of
JM.StringData str -> return (read' $ JS.unpack str)
-- !> ("Browser webSocket read", str) !> "<------<----<----<------"
-- !> ("Browser webSocket read", str) !> "<------<----<----<------"
JM.BlobData blob -> error " blob"
JM.ArrayBufferData arrBuffer -> error "arrBuffer"
@ -427,10 +451,10 @@ foreign import javascript safe
#else
mread (Connection _(Just (Node2Node _ _ _)) _ _ blocked _ _ ) = parallelReadHandler -- !> "mread"
mread (Connection _(Just (Node2Node _ _ _)) _ _ blocked _ _ _) = parallelReadHandler -- !> "mread"
mread (Connection node (Just (Node2Web sconn )) bufSize events blocked _ _)=
mread (Connection node (Just (Node2Web sconn )) bufSize events blocked _ _ _)=
parallel $ do
s <- WS.receiveData sconn
return . read' $ BS.unpack s
@ -585,72 +609,90 @@ mclose :: Connection -> IO ()
#ifndef ghcjs_HOST_OS
mclose (Connection _
(Just (Node2Node _ sock _ )) _ _ _ _ _)= NS.sClose sock
(Just (Node2Node _ sock _ )) _ _ _ _ _ _)= NS.sClose sock
mclose (Connection node
(Just (Node2Web sconn ))
bufSize events blocked _ _)=
bufSize events blocked _ _ _)=
WS.sendClose sconn ("closemsg" :: BS.ByteString)
#else
mclose (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _)=
mclose (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _ _)=
JavaScript.Web.WebSocket.close Nothing Nothing sconn
#endif
liftIOF :: IO b -> TransIO b
liftIOF mx =
do ex <- liftIO $
fmap Right mx `catch` (\(e :: SomeException) -> return $ Left e)
case ex of
Right x -> return x
Left e -> finish (Just e)
mconnect :: Node -> TransIO Connection
mconnect node@Node{} =
do nodes <- getNodes
let fnode = filter (==node) nodes
case fnode of
[] -> addNodes [node] >> mconnect node
Node{..}:_ ->
do let pool = connection -- `connection` is a function of `Node`.
plist <- liftIO $ readMVar pool
conn <- case plist of
handle':_ -> return handle'
_ ->
do my <- getMyNode
Connection { comEvent = ev } <-
getSData <|> error "connect: listen not set for this node"
c <- getConnection my nodeHost ev nodePort
liftIO . modifyMVar_ pool $ \cs -> return (c:cs)
putMailbox "connections" (c, node)
return c
delData $ Closure undefined
return conn
where
getConnection :: Node -> HostName -> EventMapRef -> Int -> TransIO Connection
liftIOF mx=do
ex <- liftIO $ (mx >>= return . Right) `catch` (\(e :: SomeException) -> return $ Left e)
case ex of
Left e -> finish $ Just e
Right x -> return x
mconnect :: Node -> TransIO Connection
mconnect node@(Node _ _ _ _ )= do
nodes <- getNodes -- !> ("connecting node", node)
let fnode = filter (==node) nodes
case fnode of
[] -> addNodes [node] >> mconnect node
[Node host port pool _] -> do
plist <- liftIO $ readMVar pool
case plist of
handle:_ -> do
delData $ Closure undefined
return handle -- !> ("REUSED!", node)
_ -> do
-- liftIO $ putStr "*****CONNECTING NODE: " >> print node
my <- getMyNode
-- liftIO $ putStr "OPENING CONNECTION WITH :" >> print port
Connection{comEvent= ev} <- getSData <|> error "connect: listen not set for this node"
#ifndef ghcjs_HOST_OS
getConnection n h ev p = liftIOF $
do let s = 8192
u = undefined
sock <- connectTo' s h (PortNumber $ fromIntegral p)
conn <- defConnection >>= \c ->
return c { myNode = n
, comEvent = ev
, connData = Just $
Node2Node u sock (error "addr: outgoing connection")
}
SBS.send sock "CLOS a b\n\n"
return conn
conn <- liftIOF $ do
let size=8192
sock <- connectTo' size host $ PortNumber $ fromIntegral port
-- !> ("CONNECTING ",port)
conn <- defConnection >>= \c -> return c{myNode=my,comEvent= ev,connData= Just $ Node2Node u sock (error $ "addr: outgoing connection")}
SBS.send sock "CLOS a b\n\n" -- !> "sending CLOS"
return conn
#else
getConnection _ h ev p =
do ws <- connectToWS h (PortNumber $ fromIntegral p)
defConnection >>= \c ->
return c { comEvent = ev
, connData = Just (Web2Node ws)
}
conn <- do
ws <- connectToWS host $ PortNumber $ fromIntegral port
conn <- defConnection >>= \c -> return c{comEvent= ev,connData= Just $ Web2Node ws}
return conn -- !> ("websocker CONNECION")
#endif
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
liftIO $ modifyMVar_ pool $ \plist -> return $ conn':plist
putMailbox "connections" (conn',node)
delData $ Closure undefined
return conn
where u= undefined
-- mconnect _ = empty
#ifndef ghcjs_HOST_OS
@ -664,13 +706,13 @@ connectTo' bufSize hostname (PortNumber port) = do
NS.setSocketOption sock NS.SendBuffer bufSize
he <- BSD.getHostByName hostname
NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he))
return sock
-- NS.socketToHandle sock ReadWriteMode
)
return sock)
#else
connectToWS h (PortNumber p) =
wsOpen $ JS.pack $ "ws://"++ h++ ":"++ show p
#endif
#ifndef ghcjs_HOST_OS
-- | A connectionless version of callTo for long running remote calls
callTo' :: (Show a, Read a,Typeable a) => Node -> Cloud a -> Cloud a
@ -702,10 +744,19 @@ data ConnectionData=
data Connection= Connection{myNode :: Node
,connData :: Maybe(ConnectionData)
,bufferSize :: BuffSize
-- Used by getMailBox, putMailBox
,comEvent :: IORef (M.Map T.Text (EVar Dynamic))
-- multiple wormhole/teleport use the same connection concurrently
,blocked :: Blocked
,calling :: Bool
,closures :: MVar (M.Map Int ([LogElem], EventF))}
-- local closures with his log and his continuation
,closures :: MVar (M.Map IdClosure ([LogElem], EventF))
-- for each remote closure that points to local closure 0,
-- a new container of child processes
-- in order to treat them separately
-- so that 'killChilds' do not kill unrelated processes
,closChildren :: IORef (M.Map Int (TVar[EventF]))}
deriving Typeable
@ -773,9 +824,10 @@ defConnection :: MonadIO m => m Connection
defConnection = liftIO $ do
x <- newMVar ()
y <- newMVar M.empty
z <- return $ error "closchildren newIORef M.empty"
return $ Connection (error "node in default connection") Nothing 8192
(error "defConnection: accessing network events out of listen")
x False (y)
x False y z
@ -800,14 +852,15 @@ listen (node@(Node _ port _ _ )) = onAll $ do
conn' <- getSData <|> defConnection
ev <- liftIO $ newIORef M.empty
let conn= conn'{myNode=node, comEvent=ev}
chs <- liftIO $ newIORef M.empty
let conn= conn'{myNode=node, comEvent=ev,closChildren=chs}
setData conn
addNodes [node]
mlog <- listenNew (fromIntegral port) conn <|> listenResponses
execLog mlog
execLog mlog
@ -823,7 +876,7 @@ listenNew port conn= do
(sock,addr) <- waitEvents $ NS.accept sock -- !!> "BEFORE ACCEPT"
chs <- liftIO $ newIORef M.empty
-- case addr of
-- NS.SockAddrInet port host -> liftIO $ print("connection from", port, host)
-- NS.SockAddrInet6 a b c d -> liftIO $ print("connection from", a, b,c,d)
@ -842,7 +895,7 @@ listenNew port conn= do
"CLOS" ->
do
-- return () !> "CLOS detected"
setData $ conn{connData=Just (Node2Node (PortNumber port) sock addr)}
setData $ conn{connData=Just (Node2Node (PortNumber port) sock addr),closChildren=chs}
-- killOnFinish $ parallel $ readHandler -- !> "read Listen" -- :: TransIO (StreamData [LogElem])
parallelReadHandler
@ -850,7 +903,7 @@ listenNew port conn= do
_ -> do
sconn <- httpMode (method, uri, headers) sock -- stay serving pages until a websocket request is received
setData conn{connData= Just (Node2Web sconn )}
setData conn{connData= Just (Node2Web sconn ),closChildren=chs}
killOnFinish $ parallel $ do
@ -879,7 +932,7 @@ listenResponses= do
setData conn
#ifndef ghcjs_HOST_OS
case conn of
Connection _(Just (Node2Node _ sock _)) _ _ _ _ _ -> do
Connection _(Just (Node2Node _ sock _)) _ _ _ _ _ _ -> do
input <- liftIO $ SBSL.getContents sock
setData $ (ParseContext (error "SHOULD NOT READ 2") input :: ParseContext BS.ByteString)
#endif
@ -901,7 +954,7 @@ type IdClosure= Int
data Closure= Closure IdClosure
execLog mlog = Transient $ do
execLog mlog = Transient $ do
case mlog of -- !> ("RECEIVED ", mlog ) of
SError e -> do
runTrans $ finish $ Just e
@ -913,21 +966,36 @@ execLog mlog = Transient $ do
where
process (closl,closr,log) deleteClosure= do
Connection {closures=closures} <- getData `onNothing` error "Listen: myNode not set"
conn@Connection {closures=closures,closChildren=mapThreads} <- getData `onNothing` error "Listen: myNode not set"
if closl== 0 then do
setData $ Log True log $ reverse log
setData $ Closure closr
return $ Just () -- !> "executing top level closure"
--
-- chs <- liftIO $ atomicModifyIORef mapThreads
-- $ \mapth ->
-- let mx = M.lookup closr mapth
-- in case mx of
-- Just tv -> (mapth,tv) !> "FOUND"
-- Nothing ->
-- let tv = unsafePerformIO $ newTVarIO []
-- in (M.insert closr tv mapth, tv)
--
-- modify $ \ s -> s{children= chs} -- to allow his own thread control
return $ Just () -- !> "executing top level closure"
else do
mcont <- liftIO $ modifyMVar closures $ \map ->
return (if deleteClosure then
M.delete closl map
else map, M.lookup closl map)
-- !> ("closures=", M.size map)
mcont <- liftIO $ modifyMVar closures
$ \map -> return (if deleteClosure then
M.delete closl map
else map, M.lookup closl map)
-- !> ("closures=", M.size map)
case mcont of
Nothing -> error ("request received for non existent closure: " ++ show closl)
Nothing -> do
runTrans $ msend conn $ SLast (closr,closl, [] :: [()] )
-- to delete the remote closure
error ("request received for non existent closure: " ++ show closl)
-- execute the closure
Just (fulLog,cont) -> liftIO $ runStateT (do
let nlog= reverse log ++ fulLog
@ -942,11 +1010,13 @@ execLog mlog = Transient $ do
#ifdef ghcjs_HOST_OS
listen node = onAll $ do
addNodes [node]
events <- liftIO $ newIORef M.empty
conn <- defConnection >>= \c -> return c{myNode=node,comEvent=events}
setData conn
r <- listenResponses
execLog r
execLog r
#endif
type Pool= [Connection]
@ -1134,7 +1204,7 @@ connect' remotenode= do
newNodes <- runAt remotenode $ do
local $ do
conn@(Connection _(Just (Node2Node _ _ _)) _ _ _ _ _) <- getSData <|>
conn@(Connection _(Just (Node2Node _ _ _)) _ _ _ _ _ _) <- getSData <|>
error ("connect': need to be connected to a node: use wormhole/connect/listen")
let nodeConnecting= head nodes
liftIO $ modifyMVar_ (connection nodeConnecting) $ const $ return [conn]
@ -1292,12 +1362,12 @@ endline c= c== '\n' || c =='\r'
--tGetLine= tTakeWhile . not . endline
readStream :: Read a => BS.ByteString -> [StreamData a]
readStream :: (Typeable a, Read a) => BS.ByteString -> [StreamData a]
readStream s= readStream1 $ BS.unpack s
where
readStream1 s=
let [(x,r)] = reads s
in x : readStream1 r
let [(x,r)] = readsPrec' 0 s
in x : readStream1 r

Loading…
Cancel
Save