Browse Source

some cleanup

addedsingle
Alberto G. Corona 6 years ago
parent
commit
ac79e07e61
  1. 1
      examples/DistrbDataSets.hs
  2. 68
      src/Transient/Move.hs
  3. 6
      src/Transient/Move/Utils.hs

1
examples/DistrbDataSets.hs

@ -20,7 +20,6 @@ import qualified Data.Vector as V
main= do
let numNodes = 5
runCloudIO $ do
setData $ defConnection 2000
runTestNodes [2000 .. 2000 + numNodes - 1]

68
src/Transient/Move.hs

@ -18,18 +18,23 @@ module Transient.Move(
Cloud(..),runCloudIO, runCloudIO',local,onAll,lazy, loggedc, lliftIO,localIO,
listen, Transient.Move.connect, connect', fullStop,
-- * primitives for communication
wormhole, teleport, copyData,
-- * single node invocation
beamTo, forkTo, streamFrom, callTo, runAt, atRemote,
clustered, mclustered,
-- * invocation of many nodes
clustered, mclustered, callNodes,
-- * messaging
newMailbox, putMailbox,getMailbox,cleanMailbox,
#ifndef ghcjs_HOST_OS
setBuffSize, getBuffSize,
#endif
-- * node management
createNode, createWebNode, createNodeServ, getMyNode, getNodes,
addNodes, shuffleNodes,
@ -42,7 +47,7 @@ addNodes, shuffleNodes,
) where
import Transient.Base
import Transient.Internals((!>),IDynamic(..),killChildren,getCont,runCont,EventF(..),LogElem(..),Log(..)
import Transient.Internals(IDynamic(..),killChildren,getCont,runCont,EventF(..),LogElem(..),Log(..)
,onNothing,RemoteStatus(..),getCont,StateIO,readsPrec')
import Transient.Logged
import Transient.Indeterminism(choose)
@ -444,7 +449,7 @@ wormhole node (Cloud comp) = local $ Transient $ do
if not rec -- !> ("wormhole recovery", rec)
then runTrans $ (do
conn <- mconnect node !> (mynode,"connecting node ", node)
conn <- mconnect node -- !> (mynode,"connecting node ", node)
setData conn{calling= True}
#ifdef ghcjs_HOST_OS
addPrefix -- for the DOM identifiers
@ -571,7 +576,7 @@ mclose (Connection _ (Just (Web2Node sconn)) _ _ blocked _ _)=
mconnect :: Node -> TransIO Connection
mconnect node@(Node _ _ _ _ )= do
nodes <- getNodes !> ("connecting node", node)
nodes <- getNodes -- !> ("connecting node", node)
let fnode = filter (==node) nodes
case fnode of
@ -583,7 +588,7 @@ mconnect node@(Node _ _ _ _ )= do
case plist of
handle:_ -> do
delData $ Closure undefined
return handle !> ("REUSED!", node)
return handle -- !> ("REUSED!", node)
_ -> do
-- liftIO $ putStr "*****CONNECTING NODE: " >> print node
@ -595,7 +600,7 @@ mconnect node@(Node _ _ _ _ )= do
conn <- liftIO $ do
let size=8192
sock <- connectTo' size host $ PortNumber $ fromIntegral port
!> ("CONNECTING ",port)
-- !> ("CONNECTING ",port)
conn <- defConnection >>= \c -> return c{myNode=my,comEvent= ev,connData= Just $ Node2Node u sock (error $ "addr: outgoing connection")}
@ -694,18 +699,6 @@ newMailbox :: T.Text -> TransIO ()
newMailbox name= do
-- return () -- !> "newMailBox"
Connection{comEvent= mv} <- getData `onNothing` errorMailBox
-- onFinish . const $ liftIO $ do
---- return () -- !> "NEWMAILBOX finish"
-- mailboxes <- readIORef mv
-- let me = M.lookup name mailboxes
-- case me of
-- Nothing -> empty -- !> "EMPTY"
-- Just (EVar id rn ref1) -> do
-- n <- atomically $ do
-- (n,n') <- readTVar rn
-- writeTVar rn (n-1,n'-1) -- !> ("decreased rn",n-1)
-- return $ n-1
-- when (n==0) $ atomicModifyIORef mv $ \mboxes -> (M.delete name mboxes,())
ev <- newEVar
liftIO $ atomicModifyIORef mv $ \mailboxes -> (M.insert name ev mailboxes,())
@ -739,45 +732,6 @@ getMailbox name= do
Nothing -> empty
Just x -> return x
--getMailBox :: Typeable a => String -> TransIO a
--getMailBox name= Transient $ do
-- return () !> "getMailBox"
-- Connection{comEvent=(EVar id rn ref1)} <- getData `onNothing` error "getMailBox: accessing network events out of listen"
-- runTrans $ do
-- liftIO $ atomically $ readTVar rn >>= \(n,n') -> writeTVar rn (n +1,n'+1)
-- r <- parallel $ do
-- n' <- atomically $ do
-- (n,n') <- readTVar rn
-- writeTVar rn $ if n' > 1 then (n,n'-1) else (n, n)
-- return n'
-- return () !> ("rear rn",name,n')
-- atomically $ do
-- d <- if n'> 1 then peekTChan ref1 else readTChan ref1
--
--
-- return () !> ("readmailbox",name,n')
-- case d of
-- SDone -> return SDone
-- SMore x -> case fromDynamic x of
-- Nothing -> return $ SMore Nothing
-- Just (nam, Nothing) -> do
-- readTVar rn >>= \(n,n') -> writeTVar rn $ (n,n' -1)
-- return SDone
-- Just (nam,Just dat) ->
-- return $ SMore $ if nam /= name then Nothing else Just dat
---- SLast x -> case fromDynamic x of
---- Nothing -> retry
---- Just (nam, Nothing) -> return SDone
---- Just (nam,Just dat) ->
---- if nam /= name !>(nam,name) then retry else return $ SLast dat
-- SError e -> return $ SError e
--
-- case r of
-- SDone -> empty
-- SMore Nothing -> empty
-- SMore (Just x) -> return x
-- SLast (Just x) -> return x
-- SError e -> error $ show e
-- | delete all subscriptions for that mailbox expecting this kind of data.
cleanMailbox :: Typeable a => T.Text -> a -> TransIO ()

6
src/Transient/Move/Utils.hs

@ -50,13 +50,13 @@ import Data.IORef
--
initNode :: Cloud () -> TransIO ()
initNode app= do
node <- getPort
node <- getNodeParams
initWebApp node app
where
getPort :: TransIO Node
getPort =
getNodeParams :: TransIO Node
getNodeParams =
if isBrowserInstance then liftIO createWebNode else do
oneThread $ option "start" "re/start node"
host <- input (const True) "hostname of this node (must be reachable): "

Loading…
Cancel
Save