Browse Source

map-reduce (DDS): added processing with unboxed vectors

addedsingle
Alberto G. Corona 6 years ago
parent
commit
7dd58e7d4a
  1. 50
      .gitignore
  2. 15
      .project-settings.yml
  3. 1495
      DistrbDataSets.prof
  4. 48
      Dockerfile
  5. 1348
      LICENSE
  6. 1358
      README.md
  7. 4
      Setup.hs
  8. 8
      buildrun.bat
  9. 4
      buildrun.sh
  10. 92
      examples/DistrbDataSets.hs
  11. 230
      examples/PiDistribCountinuous.hs
  12. 112
      examples/PiDistribOnce.hs
  13. 316
      examples/distributedExamples.hs
  14. BIN
      examples/webapp
  15. BIN
      examples/webapp.exe
  16. 277
      examples/webapp.hs
  17. 788
      src/Transient/DDS.hs
  18. 2535
      src/Transient/Move.hs
  19. 312
      src/Transient/Move/Services.hs
  20. 125
      src/Transient/Stream/Resource.hs
  21. 4
      stack.yaml
  22. 130
      tests/Test.hs
  23. 144
      tests/Test2.hs
  24. 108
      tests/Test3.hs
  25. 174
      tests/TestSuite.hs
  26. 26
      tests/Testspark.hs
  27. 94
      tests/ghcjs-websockets.hs
  28. 22
      tests/snippet
  29. BIN
      tests/test3.exe
  30. 110
      tests/test5.hs
  31. 254
      tests/teststream.hs
  32. 148
      tests/teststreamsocket.hs
  33. 142
      transient-universe.cabal
  34. 24
      transient.lkshf

50
.gitignore

@ -0,0 +1,50 @@
Demos/old-trash
Demos/db
Test
errlog
.tcachedata
.cabal-sandbox
cabal.sandbox*
favicon
IDE.session
MFlow.lkshf
notes.txt
notes.lhs
dist
*.js*
*.o
*.hi
.cabal-sandbox
cabal.sanbox.config
.stack*
# emacs stuff
*~
\#*\#
/.emacs.desktop
/.emacs.desktop.lock
*.elc
auto-save-list
tramp
.\#*
# Org-mode
.org-id-locations
*_archive
# flymake-mode
*_flymake.*
# eshell files
/eshell/history
/eshell/lastdir
# elpa packages
/elpa/
# vim stuff
*.swp
*.swo
*.key
_darcs
darcs*

15
.project-settings.yml

@ -0,0 +1,15 @@
binary-ghc-args:
- -O
- -threaded
module-template: ! 'module MODULE_NAME where
'
extensions: {}
environment: ghc-7.8-stable-14.09
auto-hidden: []
cabal-file: project.cabal
version: 1
extra-packages: ''
ghc-args: []
excluded-modules:
- Setup.hs

1495
DistrbDataSets.prof

File diff suppressed because it is too large

48
Dockerfile

@ -1,24 +1,24 @@
FROM agocorona/herokughcjs
RUN git clone https://github.com/agocorona/transient transgit \
&& cd transgit \
&& git checkout ghcjs \
&& cabal install
RUN cd transgit && cabal install --ghcjs
RUN git clone https://github.com/agocorona/ghcjs-perch \
&& cd ghcjs-perch \
&& cabal install \
&& cabal install --ghcjs #
RUN git clone https://github.com/agocorona/ghcjs-hplay \
&& cd ghcjs-hplay \
&& cabal install --ghcjs \
&& cabal install
ADD . /transient/
CMD cd /transient && chmod 777 buildrun.sh && ./buildrun.sh
FROM agocorona/herokughcjs
RUN git clone https://github.com/agocorona/transient transgit \
&& cd transgit \
&& git checkout ghcjs \
&& cabal install
RUN cd transgit && cabal install --ghcjs
RUN git clone https://github.com/agocorona/ghcjs-perch \
&& cd ghcjs-perch \
&& cabal install \
&& cabal install --ghcjs #
RUN git clone https://github.com/agocorona/ghcjs-hplay \
&& cd ghcjs-hplay \
&& cabal install --ghcjs \
&& cabal install
ADD . /transient/
CMD cd /transient && chmod 777 buildrun.sh && ./buildrun.sh

1348
LICENSE

File diff suppressed because it is too large

1358
README.md

File diff suppressed because it is too large

4
Setup.hs

@ -1,2 +1,2 @@
import Distribution.Simple
main = defaultMain
import Distribution.Simple
main = defaultMain

8
buildrun.bat

@ -1,4 +1,4 @@
ghcjs -isrc -i../ghcjs-hplay/src %1 -o static/out
if %errorlevel% neq 0 exit
runghc -isrc -i../ghcjs-hplay/src %1
ghcjs -isrc -i../ghcjs-hplay/src %1 -o static/out
if %errorlevel% neq 0 exit
runghc -isrc -i../ghcjs-hplay/src %1

4
buildrun.sh

@ -1,5 +1,5 @@
#!/bin/bash
set -e
ghcjs -isrc -i../ghcjs-hplay/src $1 -o static/out
runghc -isrc -i../ghcjs-hplay/src $1
ghcjs -isrc -i../transient/src -i../ghcjs-hplay/src -i../ghcjs-perch/src $1 -o static/out
runghc -isrc -i../transient/src -i../ghcjs-hplay/src -i../ghcjs-perch/src $1

92
examples/DistrbDataSets.hs

File diff suppressed because one or more lines are too long

230
examples/PiDistribCountinuous.hs

@ -1,115 +1,115 @@
-- Distributed streaming using Transient
-- See the article: https://www.fpcomplete.com/user/agocorona/streaming-transient-effects-vi
{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable, MonadComprehensions #-}
module MainCountinuous where
import Transient.Base
import Transient.Move
import Transient.Indeterminism
import Transient.Logged
import Transient.Stream.Resource
import Control.Applicative
import Data.Char
import Control.Monad
import Control.Monad.IO.Class
import System.Random
import Data.IORef
import System.IO
import GHC.Conc
import System.Environment
-- continuos streaming version
-- Perform the same calculation but it does not stop, and the results are accumulated in in a mutable reference within the calling node,
-- so the precision in the value of pi is printed with more and more precision. every 1000 calculations.
-- Here instead of `collect` that finish the calculation when the number of samples has been reached, i use `group` which simply
-- group the number of results in a list and then sum of the list is returned to the calling node.
-- Since `group` do not finish the calculation, new sums are streamed from the nodes again and again.
main= do
let numNodes= 5
numCalcsNode= 100
ports= [2000.. 2000+ numNodes -1]
createLocalNode p= createNode "localhost" p
nodes= map createLocalNode ports
rresults <- newIORef (0,0)
keep $ freeThreads $ threads 10 $ runCloud $ do
--setBufSize 1024
local $ addNodes nodes
foldl (<|>) empty (map listen nodes) <|> return()
r <- clustered $ do
--Connection (Just (_,h,_,_)) _ <- getSData <|> error "no connection"
--liftIO $ hSetBuffering h $ BlockBuffering Nothing
r <- local $ group numCalcsNode $ do
n <- liftIO getNumCapabilities
threads n .
spawn $ do
x <- randomIO :: IO Double
y <- randomIO
return $ if x * x + y * y < 1 then 1 else (0 :: Int)
return $ sum r
(n,c) <- local $ liftIO $ atomicModifyIORef' rresults $ \(num, count) ->
let num' = num + r
count'= count + numCalcsNode
in ((num', count'),(num',count'))
when ( c `rem` 1000 ==0) $ local $ liftIO $ do
th <- myThreadId
putStrLn $ "Samples: "++ show c ++ " -> " ++
show( 4.0 * fromIntegral n / fromIntegral c)++ "\t" ++ show th
-- really distributed version
-- generate an executable with this main and invoke it as such:
--
-- program myport remotehost remoteport
--
-- where remotehost remoteport are from a previously initialized node
-- The first node initialize it with:
--
-- program myport localhost myport
mainDistributed= do
args <- getArgs
let localPort = read (args !! 0)
seedHost = read (args !! 1)
seedPort = read (args !! 2)
mynode = createNode "localhost" localPort
seedNode = createNode seedHost seedPort
numCalcsNode= 100
rresults <- liftIO $ newIORef (0,0)
runCloud' $ do
connect mynode seedNode
local $ option "start" "start the calculation once all the nodes have been started" :: Cloud String
r <- clustered $ do
--Connection (Just (_,h,_,_)) _ <- getSData <|> error "no connection"
--liftIO $ hSetBuffering h $ BlockBuffering Nothing
r <- local $ group numCalcsNode $ do
n <- liftIO getNumCapabilities
threads n .
spawn $ do
x <- randomIO :: IO Double
y <- randomIO
return $ if x * x + y * y < 1 then 1 else (0 :: Int)
return $ sum r
(n,c) <- local $ liftIO $ atomicModifyIORef' rresults $ \(num, count) ->
let num' = num + r
count'= count + numCalcsNode
in ((num', count'),(num',count'))
when ( c `rem` 1000 ==0) $ local $ liftIO $ do
th <- myThreadId
putStrLn $ "Samples: "++ show c ++ " -> " ++
show( 4.0 * fromIntegral n / fromIntegral c)++ "\t" ++ show th
-- Distributed streaming using Transient
-- See the article: https://www.fpcomplete.com/user/agocorona/streaming-transient-effects-vi
{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable, MonadComprehensions #-}
module MainCountinuous where
import Transient.Base
import Transient.Move
import Transient.Indeterminism
import Transient.Logged
import Transient.Stream.Resource
import Control.Applicative
import Data.Char
import Control.Monad
import Control.Monad.IO.Class
import System.Random
import Data.IORef
import System.IO
import GHC.Conc
import System.Environment
-- continuos streaming version
-- Perform the same calculation but it does not stop, and the results are accumulated in in a mutable reference within the calling node,
-- so the precision in the value of pi is printed with more and more precision. every 1000 calculations.
-- Here instead of `collect` that finish the calculation when the number of samples has been reached, i use `group` which simply
-- group the number of results in a list and then sum of the list is returned to the calling node.
-- Since `group` do not finish the calculation, new sums are streamed from the nodes again and again.
main= do
let numNodes= 5
numCalcsNode= 100
ports= [2000.. 2000+ numNodes -1]
createLocalNode p= createNode "localhost" p
nodes= map createLocalNode ports
rresults <- newIORef (0,0)
keep $ freeThreads $ threads 10 $ runCloud $ do
--setBufSize 1024
local $ addNodes nodes
foldl (<|>) empty (map listen nodes) <|> return()
r <- clustered $ do
--Connection (Just (_,h,_,_)) _ <- getSData <|> error "no connection"
--liftIO $ hSetBuffering h $ BlockBuffering Nothing
r <- local $ group numCalcsNode $ do
n <- liftIO getNumCapabilities
threads n .
spawn $ do
x <- randomIO :: IO Double
y <- randomIO
return $ if x * x + y * y < 1 then 1 else (0 :: Int)
return $ sum r
(n,c) <- local $ liftIO $ atomicModifyIORef' rresults $ \(num, count) ->
let num' = num + r
count'= count + numCalcsNode
in ((num', count'),(num',count'))
when ( c `rem` 1000 ==0) $ local $ liftIO $ do
th <- myThreadId
putStrLn $ "Samples: "++ show c ++ " -> " ++
show( 4.0 * fromIntegral n / fromIntegral c)++ "\t" ++ show th
-- really distributed version
-- generate an executable with this main and invoke it as such:
--
-- program myport remotehost remoteport
--
-- where remotehost remoteport are from a previously initialized node
-- The first node initialize it with:
--
-- program myport localhost myport
mainDistributed= do
args <- getArgs
let localPort = read (args !! 0)
seedHost = read (args !! 1)
seedPort = read (args !! 2)
mynode = createNode "localhost" localPort
seedNode = createNode seedHost seedPort
numCalcsNode= 100
rresults <- liftIO $ newIORef (0,0)
runCloud' $ do
connect mynode seedNode
local $ option "start" "start the calculation once all the nodes have been started" :: Cloud String
r <- clustered $ do
--Connection (Just (_,h,_,_)) _ <- getSData <|> error "no connection"
--liftIO $ hSetBuffering h $ BlockBuffering Nothing
r <- local $ group numCalcsNode $ do
n <- liftIO getNumCapabilities
threads n .
spawn $ do
x <- randomIO :: IO Double
y <- randomIO
return $ if x * x + y * y < 1 then 1 else (0 :: Int)
return $ sum r
(n,c) <- local $ liftIO $ atomicModifyIORef' rresults $ \(num, count) ->
let num' = num + r
count'= count + numCalcsNode
in ((num', count'),(num',count'))
when ( c `rem` 1000 ==0) $ local $ liftIO $ do
th <- myThreadId
putStrLn $ "Samples: "++ show c ++ " -> " ++
show( 4.0 * fromIntegral n / fromIntegral c)++ "\t" ++ show th

112
examples/PiDistribOnce.hs

@ -1,56 +1,56 @@
-- Distributed streaming using Transient
-- See the article: https://www.fpcomplete.com/user/agocorona/streaming-transient-effects-vi
{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable, MonadComprehensions #-}
module MainOnce where
import Transient.Base
import Transient.Move
import Transient.Indeterminism
import Transient.Logged
import Transient.Stream.Resource
import Control.Applicative
import Data.Char
import Control.Monad
import Control.Monad.IO.Class
import System.Random
import Data.IORef
import System.IO
import GHC.Conc
import System.Environment
-- distributed calculation of PI
-- This example program is the closest one to the defined in the spark examples: http://tldrify.com/bpr
-- But while the spark example does not contain the setup of the cluster and the confuguration/initalization
-- this examples includes everything
-- The nodes are simulated within the local process, but they communicate trough sockets and serialize data
-- just like real nodes. Each node spawn threads and return the result to the calling node.
-- when the number of result are reached `colect` kill the threads, the sockets are closed and the stream is stopped
-- for more details look at the article: https://www.fpcomplete.com/tutorial-edit/streaming-transient-effects-vi
--
main= do
let numNodes= 5
numSamples= 1000
ports= [2000.. 2000 + numNodes -1]
createLocalNode p= createNode "localhost" p
nodes= map createLocalNode ports
keep $ do
addNodes nodes
-- option "start" "start"
xs <- collect numSamples $ runCloud $ do
foldl (<|>) empty (map listen nodes) <|> return()
local $ threads 2 $ runCloud $
clustered[if x * x + y * y < (1 :: Double) then 1 else (0 :: Int)| x <- random, y <-random]
liftIO $ print (4.0 * (fromIntegral $ sum xs) / (fromIntegral numSamples) :: Double)
exit
where
random= local $ waitEvents' randomIO :: Cloud Double
-- Distributed streaming using Transient
-- See the article: https://www.fpcomplete.com/user/agocorona/streaming-transient-effects-vi
{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable, MonadComprehensions #-}
module MainOnce where
import Transient.Base
import Transient.Move
import Transient.Indeterminism
import Transient.Logged
import Transient.Stream.Resource
import Control.Applicative
import Data.Char
import Control.Monad
import Control.Monad.IO.Class
import System.Random
import Data.IORef
import System.IO
import GHC.Conc
import System.Environment
-- distributed calculation of PI
-- This example program is the closest one to the defined in the spark examples: http://tldrify.com/bpr
-- But while the spark example does not contain the setup of the cluster and the confuguration/initalization
-- this examples includes everything
-- The nodes are simulated within the local process, but they communicate trough sockets and serialize data
-- just like real nodes. Each node spawn threads and return the result to the calling node.
-- when the number of result are reached `colect` kill the threads, the sockets are closed and the stream is stopped
-- for more details look at the article: https://www.fpcomplete.com/tutorial-edit/streaming-transient-effects-vi
--
main= do
let numNodes= 5
numSamples= 1000
ports= [2000.. 2000 + numNodes -1]
createLocalNode p= createNode "localhost" p
nodes= map createLocalNode ports
keep $ do
addNodes nodes
-- option "start" "start"
xs <- collect numSamples $ runCloud $ do
foldl (<|>) empty (map listen nodes) <|> return()
local $ threads 2 $ runCloud $
clustered[if x * x + y * y < (1 :: Double) then 1 else (0 :: Int)| x <- random, y <-random]
liftIO $ print (4.0 * (fromIntegral $ sum xs) / (fromIntegral numSamples) :: Double)
exit
where
random= local $ waitEvents' randomIO :: Cloud Double

316
examples/distributedExamples.hs

@ -1,158 +1,158 @@
{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable, MonadComprehensions #-}
module Main where
import Transient.Move
import Transient.Logged
import Transient.Base
import Transient.Indeterminism
import Transient.EVars
import Network
import Control.Applicative
import Control.Monad.IO.Class
import System.Environment
import System.IO.Unsafe
import Data.Monoid
import System.IO
import Control.Monad
import Data.Maybe
import Control.Exception
import Control.Concurrent (threadDelay)
import Data.Typeable
import Data.IORef
import Data.List((\\))
-- to be executed with two or more nodes
main = do
args <- getArgs
if length args < 2
then do
putStrLn "The program need at least two parameters: localHost localPort remoteHost RemotePort"
putStrLn "Start one node alone. The rest, connected to this node."
return ()
else keep $ do
let localHost= head args
localPort= read $ args !! 1
(remoteHost,remotePort) =
if length args >=4
then(args !! 2, read $ args !! 3)
else (localHost,localPort)
let localNode= createNode localHost localPort
remoteNode= createNode remoteHost remotePort
connect localNode remoteNode
examples
examples = do
logged $ option "main" "to see the menu" <|> return ""
r <- logged $ option "move" "move to another node"
<|> option "call" "call a function in another node"
<|> option "chat" "chat"
<|> option "netev" "events propagating trough the network"
case r of
"call" -> callExample
"move" -> moveExample
"chat" -> chat
"netev" -> networkEvents
data Environ= Environ (IORef String) deriving Typeable
callExample = do
node <- logged $ do
nodes <- getNodes
myNode <- getMyNode
return . head $ nodes \\ [myNode]
logged $ putStrLnhp node "asking for the remote data"
s <- callTo node $ do
putStrLnhp node "remote callTo request"
liftIO $ readIORef environ
liftIO $ putStrLn $ "resp=" ++ show s
{-# NOINLINE environ #-}
environ= unsafePerformIO $ newIORef "Not Changed"
moveExample = do
node <- logged $ do
nodes <- getNodes
myNode <- getMyNode
return . head $ nodes \\ [myNode]
putStrLnhp node "enter a string. It will be inserted in the other node by a migrating program"
name <- logged $ input (const True)
beamTo node
putStrLnhp node "moved!"
putStrLnhp node $ "inserting "++ name ++" as new data in this node"
liftIO $ writeIORef environ name
return()
chat :: TransIO ()
chat = do
name <- logged $ do liftIO $ putStrLn "Name?" ; input (const True)
text <- logged $ waitEvents $ putStr ">" >> hFlush stdout >> getLine' (const True)
let line= name ++": "++ text
clustered $ liftIO $ putStrLn line
networkEvents = do
node <- logged $ do
nodes <- getNodes
myNode <- getMyNode
return . head $ nodes \\ [myNode]
logged $ putStrLnhp node "<- write \"fire\" in this other node"
r <- callTo node $ do
option "fire" "fire event"
return "event fired"
putStrLnhp node $ r ++ " in remote node"
putStrLnhp p msg= liftIO $ putStr (show p) >> putStr " ->" >> putStrLn msg
--call host port proc params= do
-- port <- getPort
-- listen port <|> return
-- parms <- logged $ return params
-- callTo host port proc parms
-- close
--
--distribute proc= do
-- case dataFor proc
-- Nothing -> proc
-- Just dataproc -> do
-- (h,p) <- bestNode dataproc
-- callTo h p proc
--
--bestNode dataproc=
-- nodes <- getNodes
-- (h,p) <- bestMatch dataproc nodes <- user defined
--
--bestMatch (DataProc nodesAccesed cpuLoad resourcesNeeded) nodes= do
-- nodesAccesed: node, response
--
--bestMove= do
-- thisproc <- gets myProc
-- case dataFor thisproc
-- Nothing -> return ()
-- Just dataproc -> do
-- (h,p) <- bestNode dataproc
-- moveTo h p
--
--
--inNetwork= do
-- p <- getPort
-- listen p <|> return ()
{-# LANGUAGE ScopedTypeVariables, DeriveDataTypeable, MonadComprehensions #-}
module Main where
import Transient.Move
import Transient.Logged
import Transient.Base
import Transient.Indeterminism
import Transient.EVars
import Network
import Control.Applicative
import Control.Monad.IO.Class
import System.Environment
import System.IO.Unsafe
import Data.Monoid
import System.IO
import Control.Monad
import Data.Maybe
import Control.Exception
import Control.Concurrent (threadDelay)
import Data.Typeable
import Data.IORef
import Data.List((\\))
-- to be executed with two or more nodes
main = do
args <- getArgs
if length args < 2
then do
putStrLn "The program need at least two parameters: localHost localPort remoteHost RemotePort"
putStrLn "Start one node alone. The rest, connected to this node."
return ()
else keep $ do
let localHost= head args
localPort= read $ args !! 1
(remoteHost,remotePort) =
if length args >=4
then(args !! 2, read $ args !! 3)
else (localHost,localPort)
let localNode= createNode localHost localPort
remoteNode= createNode remoteHost remotePort
connect localNode remoteNode
examples
examples = do
logged $ option "main" "to see the menu" <|> return ""
r <- logged $ option "move" "move to another node"
<|> option "call" "call a function in another node"
<|> option "chat" "chat"
<|> option "netev" "events propagating trough the network"
case r of
"call" -> callExample
"move" -> moveExample
"chat" -> chat
"netev" -> networkEvents
data Environ= Environ (IORef String) deriving Typeable
callExample = do
node <- logged $ do
nodes <- getNodes
myNode <- getMyNode
return . head $ nodes \\ [myNode]
logged $ putStrLnhp node "asking for the remote data"
s <- callTo node $ do
putStrLnhp node "remote callTo request"
liftIO $ readIORef environ
liftIO $ putStrLn $ "resp=" ++ show s
{-# NOINLINE environ #-}
environ= unsafePerformIO $ newIORef "Not Changed"
moveExample = do
node <- logged $ do
nodes <- getNodes
myNode <- getMyNode
return . head $ nodes \\ [myNode]
putStrLnhp node "enter a string. It will be inserted in the other node by a migrating program"
name <- logged $ input (const True)
beamTo node
putStrLnhp node "moved!"
putStrLnhp node $ "inserting "++ name ++" as new data in this node"
liftIO $ writeIORef environ name
return()
chat :: TransIO ()
chat = do
name <- logged $ do liftIO $ putStrLn "Name?" ; input (const True)
text <- logged $ waitEvents $ putStr ">" >> hFlush stdout >> getLine' (const True)
let line= name ++": "++ text
clustered $ liftIO $ putStrLn line
networkEvents = do
node <- logged $ do
nodes <- getNodes
myNode <- getMyNode
return . head $ nodes \\ [myNode]
logged $ putStrLnhp node "<- write \"fire\" in this other node"
r <- callTo node $ do
option "fire" "fire event"
return "event fired"
putStrLnhp node $ r ++ " in remote node"
putStrLnhp p msg= liftIO $ putStr (show p) >> putStr " ->" >> putStrLn msg
--call host port proc params= do
-- port <- getPort
-- listen port <|> return
-- parms <- logged $ return params
-- callTo host port proc parms
-- close
--
--distribute proc= do
-- case dataFor proc
-- Nothing -> proc
-- Just dataproc -> do
-- (h,p) <- bestNode dataproc
-- callTo h p proc
--
--bestNode dataproc=
-- nodes <- getNodes
-- (h,p) <- bestMatch dataproc nodes <- user defined
--
--bestMatch (DataProc nodesAccesed cpuLoad resourcesNeeded) nodes= do
-- nodesAccesed: node, response
--
--bestMove= do
-- thisproc <- gets myProc
-- case dataFor thisproc
-- Nothing -> return ()
-- Just dataproc -> do
-- (h,p) <- bestNode dataproc
-- moveTo h p
--
--
--inNetwork= do
-- p <- getPort
-- listen p <|> return ()

BIN
examples/webapp

Binary file not shown.

BIN
examples/webapp.exe

Binary file not shown.

277
examples/webapp.hs

@ -1,141 +1,136 @@
{-# LANGUAGE CPP #-}
module Main where
import Prelude hiding (div,id,span)
import Transient.Base
#ifdef ghcjs_HOST_OS
hiding ( option,runCloud')
#endif
import GHCJS.HPlay.View
#ifdef ghcjs_HOST_OS
hiding (map)
#else
hiding (map, option,runCloud')
#endif
import Transient.Move
import Transient.Indeterminism
import Control.Applicative
import Control.Monad
import Data.Typeable
import Data.IORef
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class
-- Show the composability of transient web aplications
-- with three examples composed together, each one is a widget that execute
-- code in the browser AND the server.
main = simpleWebApp 2020 $ demo <|> demo2 <|> counters
notebook= do
r <- local . render $ textArea <*** wbutton () "send"
atServer $ do
-- execute with a pipe
exec
demo= do
name <- local . render $ do
rawHtml $ do
hr
p "this snippet captures the essence of this demonstration"
p $ span "it's a blend of server and browser code in a "
>> (span $ b "composable") >> span " piece"
div ! id (fs "fibs") $ i "Fibonacci numbers should appear here"
local . render $ wlink () (p " stream fibonacci numbers")
-- stream fibonancci
r <- atServer $ do
let fibs= 0 : 1 : zipWith (+) fibs (tail fibs) :: [Int] --fibonacci numb. definition
r <- local . threads 1 . choose $ take 10 fibs
lliftIO $ print r
lliftIO $ threadDelay 1000000
return r
local . render . at (fs "#fibs") Append $ rawHtml $ (h2 r)
demo2= do
name <- local . render $ do
rawHtml $ do
hr
br;br
p "In this example you enter your name and the server will salute you"
br
-- inputString (Just "Your name") `fire` OnKeyUp -- send once a char is entered
inputSubmit "sand" `fire` OnClick
<*** inputSubmit "send" `fire` OnClick -- optional, to resend it
<++ br -- new line
r <- atServer $ lliftIO $ print (name ++ " calling") >> return ("Hi " ++ name)
local . render . rawHtml $ do
p " returned"
h2 r
fs= toJSString
-- To demonstrate wormhole, teleport, widgets, interactive streaming
-- and composability in a web application.
--
-- This is one of the most complicated interactions: how to control a stream in the server
-- by means of a web interface without loosing composability.
--
-- in this example, events flow from the server to the browser (a counter) and back from
-- the browser to the server (initiating and cancelling the counters)
counters= do
local . render . rawHtml $ do
hr
p "To demonstrate wormhole, teleport, widgets, interactive streaming"
p "and composability in a web application."
br
p "This is one of the most complicated interactions: how to control a stream in the server"
p "by means of a web interface without loosing composability."
br
p "in this example, events flow from the server to the browser (a counter) and back from"
p "the browser to the server (initiating and cancelling the counters)"
server <- local $ getSData <|> error "no server???"
counter server <|> counter server
where
counter server = wormhole server $ do
op <- startOrCancel
teleport -- translates the computation to the server
r <- local $ case op of
"start" -> killChilds >> stream
"cancel" -> killChilds >> stop
teleport -- back to the browser again
local $ render $ rawHtml $ h1 r
-- generates a sequence of numbers
stream= do
counter <- liftIO $ newIORef (0 :: Int)
waitEvents $ do
n <- atomicModifyIORef counter $ \r -> (r +1,r)
threadDelay 1000000
putStr "generating: " >> print n
return n
startOrCancel :: Cloud String
startOrCancel= local $ render $ (inputSubmit "start" `fire` OnClick)
<|> (inputSubmit "cancel" `fire` OnClick)
<++ br
{-# LANGUAGE CPP #-}
module Main where
import Prelude hiding (div,id,span)
import Transient.Base
#ifdef ghcjs_HOST_OS
hiding ( option,runCloud')
#endif
import GHCJS.HPlay.View
#ifdef ghcjs_HOST_OS
hiding (map)
#else
hiding (map, option,runCloud')
#endif
import Transient.Move
import Transient.Indeterminism
import Control.Applicative
import Control.Monad
import Data.Typeable
import Data.IORef
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class
-- Show the composability of transient web aplications
-- with three examples composed together, each one is a widget that execute
-- code in the browser AND the server.
main = simpleWebApp 2020 $ demo <|> demo2 <|> counters
demo= do
name <- local . render $ do
rawHtml $ do
hr
p "this snippet captures the essence of this demonstration"
p $ span "it's a blend of server and browser code in a "
>> (span $ b "composable") >> span " piece"
div ! id (fs "fibs") $ i "Fibonacci numbers should appear here"
local . render $ wlink () (p " stream fibonacci numbers")
-- stream fibonancci
r <- atServer $ do
let fibs= 0 : 1 : zipWith (+) fibs (tail fibs) :: [Int] --fibonacci numb. definition
r <- local . threads 1 . choose $ take 10 fibs
lliftIO $ print r
lliftIO $ threadDelay 1000000
return r
local . render . at (fs "#fibs") Append $ rawHtml $ (h2 r)
demo2= do
name <- local . render $ do
rawHtml $ do
hr
br;br
p "In this example you enter your name and the server will salute you"
br
-- inputString (Just "Your name") `fire` OnKeyUp -- send once a char is entered
inputString (Just "enter your name") `fire` OnKeyUp
<++ br -- new line
r <- atServer $ lliftIO $ print (name ++ " calling") >> return ("Hi " ++ name)
local . render . rawHtml $ do
p " returned"
h2 r
fs= toJSString
-- To demonstrate wormhole, teleport, widgets, interactive streaming
-- and composability in a web application.
--
-- This is one of the most complicated interactions: how to control a stream in the server
-- by means of a web interface without loosing composability.
--
-- in this example, events flow from the server to the browser (a counter) and back from
-- the browser to the server (initiating and cancelling the counters)
counters= do
local . render . rawHtml $ do
hr
p "To demonstrate wormhole, teleport, widgets, interactive streaming"
p "and composability in a web application."
br
p "This is one of the most complicated interactions: how to control a stream in the server"
p "by means of a web interface without loosing composability."
br
p "in this example, events flow from the server to the browser (a counter) and back from"
p "the browser to the server (initiating and cancelling the counters)"
server <- local $ getSData <|> error "no server???"
counter server <|> counter server
where
counter server = wormhole server $ do
op <- startOrCancel
teleport -- translates the computation to the server
r <- local $ case op of
"start" -> killChilds >> stream
"cancel" -> killChilds >> stop
teleport -- back to the browser again
local $ render $ rawHtml $ h1 r
-- generates a sequence of numbers
stream= do
counter <- liftIO $ newIORef (0 :: Int)
waitEvents $ do
n <- atomicModifyIORef counter $ \r -> (r +1,r)
threadDelay 1000000
putStr "generating: " >> print n
return n
startOrCancel :: Cloud String
startOrCancel= local $ render $ (inputSubmit "start" `fire` OnClick)
<|> (inputSubmit "cancel" `fire` OnClick)
<++ br

788
src/Transient/DDS.hs

@ -1,386 +1,402 @@
{-# LANGUAGE ExistentialQuantification, DeriveDataTypeable
, FlexibleInstances, FlexibleContexts, UndecidableInstances, MultiParamTypeClasses #-}
module Transient.DDS (distribute, getText, getUrl, getFile,textUrl, textFile, mapKey, reduce) where
import Transient.Base
import Transient.Move hiding (pack)
import Transient.Logged
import Transient.Indeterminism
import Control.Applicative
import System.Random
import Control.Monad.IO.Class
import System.IO
import Control.Monad
import Data.Monoid
import Data.Maybe
import Data.Typeable
import Data.List hiding (delete, foldl')
import Control.Exception
import Control.Concurrent
--import Data.Time.Clock
import Network.HTTP
import Data.TCache hiding (onNothing)
import Data.TCache.Defs
import Data.ByteString.Lazy.Char8 (pack,unpack)
import Control.Monad.STM
import qualified Data.Map as M
import Control.Arrow (second)
import qualified Data.Vector.Unboxed as DVU
import qualified Data.Vector as DV
import Data.Hashable
import System.IO.Unsafe
import qualified Data.Foldable as F
import qualified Data.Text as Text
data DDS a= Loggable a => DDS (Cloud (PartRef a))
data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show)
data Partition a= Part Node Path Save a deriving (Typeable,Read,Show)
type Save= Bool
instance Indexable (Partition a) where
key (Part _ string b _)= keyp string b
keyp s True= "PartP@"++s
keyp s False="PartT@"++s
instance Loggable a => IResource (Partition a) where
keyResource= key
readResourceByKey k= r
where
typePart :: IO (Maybe a) -> a
typePart = undefined
r = if k !! 4 /= 'P' then return Nothing else
defaultReadByKey (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack)
writeResource (s@(Part _ _ save _))=
unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s)
eval :: DDS a -> TransIO (PartRef a)
eval (DDS mx) = runCloud mx
type Path=String
instance F.Foldable DVU.Vector where
{-# INLINE foldr #-}
foldr = foldr
{-# INLINE foldl #-}
foldl = foldl
{-# INLINE foldr1 #-}
foldr1 = foldr1
{-# INLINE foldl1 #-}
foldl1 = foldl1
--foldlIt' :: V.Unbox a => (b -> a -> b) -> b -> V.Vector a -> b
--foldlIt' f z0 xs= V.foldr f' id xs z0
-- where f' x k z = k $! f z x
--
--foldlIt1 :: V.Unbox a => (a -> a -> a) -> V.Vector a -> a
--foldlIt1 f xs = fromMaybe (error "foldl1: empty structure")
-- (V.foldl mf Nothing xs)
-- where
-- mf m y = Just (case m of
-- Nothing -> y
-- Just x -> f x y)
class (F.Foldable c, Typeable c, Typeable a, Monoid (c a), Loggable (c a)) => Distributable c a where
singleton :: a -> c a
splitAt :: Int -> c a -> (c a, c a)
fromList :: [a] -> c a
instance (Loggable a) => Distributable DV.Vector a where
singleton = DV.singleton
splitAt= DV.splitAt
fromList = DV.fromList
instance (Loggable a,DVU.Unbox a) => Distributable DVU.Vector a where
singleton= DVU.singleton
splitAt= DVU.splitAt
fromList= DVU.fromList
-- | perform a map and partition the result with different keys.
-- The result will be used by reduce.
mapKey :: (Distributable vector a,Distributable vector b, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (vector a)
-> DDS (M.Map k(vector b))
mapKey f (DDS mx)= DDS $ do
refs <- mx
process refs
where
-- process :: Partition a -> Cloud [Partition b]
process (ref@(Ref node path sav))= runAt node $ local $ do
xs <- getPartitionData ref -- !> "CMAP"
generateRef node $ map1 f xs
-- map1 :: (Ord k, F.Foldable vector) => (a -> (k,b)) -> vector a -> M.Map k(vector b)
map1 f v= F.foldl' f1 M.empty v
where
f1 map x=
let (k,r) = f x
in M.insertWith (<>) k (Transient.DDS.singleton r) map
--instance Show a => Show (MVar a) where
-- show mvx= "MVar " ++ show (unsafePerformIO $ readMVar mvx)
--
--instance Read a => Read (MVar a) where
-- readsPrec n ('M':'V':'a':'r':' ':s)=
-- let [(x,s')]= readsPrec n s
-- in [(unsafePerformIO $ newMVar x,s')]
data ReduceChunk a= EndReduce | Reduce a deriving (Typeable, Read, Show)
reduce :: (Hashable k,Ord k, Distributable vector a, Loggable k,Loggable a)
=> (a -> a -> a) -> DDS (M.Map k (vector a)) ->Cloud (M.Map k a)
reduce red (dds@(DDS mx))= do
box <- local newMailBox
nodes <- onAll getNodes
let lengthNodes = length nodes
shuffler= do
ref <- mx
local $ do
m <- getPartitionData ref -- !> "GETPARTITIONDATA"
let ass= M.assocs m
runCloud (parallelize shuffle ass) `atEnd'` runCloud sendEnd
stop
-- shuffle ::(k,vector a) -> Cloud ()
shuffle part | null part = empty
| otherwise = do
let (k,vs)= part
v= foldl1 red vs
i= abs $ hash k `rem` length nodes
runAt (nodes !! i) $ local $ putMailBox box $ Reduce (k,v) -- !> (" PUTMAILBOX ",i,v)
empty :: Cloud ()
sendEnd = (clustered $ local $ putMailBox box (EndReduce `asTypeOf` paramOf dds)) -- !> "ENDREDUCE"
reducer= mclustered reduce -- a reduce process in each node
-- reduce :: (Ord k) => Cloud (M.Map k v)
reduce = local $ do
reduceResults <- liftIO $ newMVar M.empty -- !> "CREATE MVAR for results"
numberSent <- liftIO $ newMVar 0
minput <- getMailBox box -- get the chunk once it arrives to the mailbox
case minput of -- !> ("received",minput) of
Reduce (k,inp) -> do
let input= inp `asTypeOf` atype dds
liftIO $ modifyMVar_ reduceResults
$ \map -> do
let maccum = M.lookup k map
return $ M.insert k (case maccum of
Just accum -> red input accum
Nothing -> input) map
empty
EndReduce -> do
n <- liftIO $ modifyMVar numberSent $ \r -> return (r+1, r+1)
if n == lengthNodes -- !> ( n, lengthNodes)
then liftIO $ readMVar reduceResults -- !> "END reduce"
else empty
reducer <|> shuffler
where
atype ::DDS(M.Map k (vector a)) -> a
atype = undefined -- type level
paramOf :: DDS (M.Map k (vector a)) -> ReduceChunk( k, a)
paramOf = undefined -- type level
--parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b
parallelize f xs = foldr (<|>) empty $ map f xs
getPartitionData :: Loggable a => PartRef a -> TransIO a
getPartitionData (Ref node path save) = do
(Part _ _ _ xs) <- (liftIO $ atomically
$ readDBRef
$ getDBRef
$ keyp path save)
`onNothing` error ("not found DDS data: "++ keyp path save)
return xs -- !> "getPartitionData"
-- en caso de fallo de Node, se lanza un clustered en busca del path
-- si solo uno lo tiene, se copia a otro
-- se pone ese nodo de referencia en Part
runAtP :: Loggable a => Node -> (Path -> IO a) -> Path -> Cloud a
runAtP node f uuid= do
r <- streamFrom node $ onAll . liftIO $ (SLast <$> f uuid) `catch` sendAnyError
case r of
SLast r -> return r
SError e -> do
nodes <- mclustered $ search uuid
when(length nodes < 1) $ asyncDuplicate node uuid
runAtP ( head nodes) f uuid
search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid
asyncDuplicate node uuid= do
forkTo node
nodes <- onAll getNodes
let node'= head $ nodes \\ [node]
content <- onAll . liftIO $ readFile uuid
runAt node' $ local $ liftIO $ writeFile uuid content
sendAnyError :: SomeException -> IO (StreamData a)
sendAnyError e= return $ SError e
-- | distribute a vector of values among many nodes.
-- If the vector is static and sharable, better use the get* primitives
-- since each node will load the data independently.
distribute :: (Loggable a, Distributable vector a ) => vector a -> DDS (vector a)
distribute = DDS . distribute'
distribute' xs= loggedc $ do
nodes <- local getNodes -- !> "DISTRIBUTE"
let lnodes = length nodes
let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n
xss= split size lnodes 1 xs -- !> size
distribute'' xss nodes
where
split n s s' xs | s==s' = [xs]
split n s s' xs=
let (h,t)= Transient.DDS.splitAt n xs
in h : split n s (s'+1) t
distribute'' :: (Loggable a, Distributable vector a) => [vector a] -> [Node] -> Cloud (PartRef (vector a))
distribute'' xss nodes =
parallelize move $ zip nodes xss -- !> show xss
where
move (node, xs)= runAt node $ local $ do
par <- generateRef node xs
return par
-- | input data from a text that must be static and shared by all the nodes
getText :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getText part str= DDS $ do
nodes <- onAll getNodes -- !> "DISTRIBUTE"
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes] -- !> show xss
where
process lnodes (node,i)= runAt node $ local $ do
let xs = part str
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss= Transient.DDS.fromList $ take size $ drop (i * size) xs
par <- generateRef node xss
return par
-- | get the worlds of an URL
textUrl :: String -> DDS (DV.Vector Text.Text)
textUrl= getUrl (map Text.pack . words)
-- | generate a DDS from the content of a URL
getUrl :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getUrl partitioner url= DDS $ do
nodes <- onAll getNodes -- !> "DISTRIBUTE"
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes] -- !> show xss
where
process lnodes (node,i)= runAt node $ local $ do
r <- liftIO . simpleHTTP $ getRequest url
body <- liftIO $ getResponseBody r
let xs = partitioner body
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss= Transient.DDS.fromList $ take size $ drop (i * size) xs
generateRef node xss
-- | get the words of a file
textFile :: String -> DDS (DV.Vector Text.Text)
textFile= getFile (map Text.pack . words)
-- | generate a DDS from a file. All the nodes must access the file with the same path
-- the first parameter is the parser that generates elements from the content
getFile :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getFile partitioner file= DDS $ do
nodes <- local getNodes -- !> "DISTRIBUTE"
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes] -- !> show xss
where
process lnodes (node, i)= runAt node $ local $ do
content <- liftIO $ readFile file
let xs = partitioner content
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss=Transient.DDS.fromList $ take size $ drop (i * size) xs -- !> size
generateRef node xss
generateRef :: Loggable a => Node -> a -> TransIO (PartRef a)
generateRef node x= liftIO $ do
temp <- getTempName
let reg= Part node temp True x
atomically $ newDBRef reg
return $ getRef reg
getRef (Part n t s x)= Ref n t s
getTempName :: IO String
getTempName= ("DDS" ++) <$> replicateM 5 (randomRIO ('a','z'))
-------------- Distributed Datasource Streams ---------
-- | produce a stream of DDS's that can be map-reduced. Similar to spark streams.
-- each interval of time,a new DDS is produced.
streamDDS
:: (Loggable a, Distributable vector a) =>
Integer -> IO (StreamData a) -> DDS (vector a)
streamDDS time io= DDS $ do
xs <- local . groupByTime time $ do
r <- parallel io
case r of
SDone -> empty
SLast x -> return x
SMore x -> return x
SError e -> error $ show e
distribute' $ Transient.DDS.fromList xs
--data CloudArray a= Cloud [a]
--
--instance functor CloudArray where
-- fmap f mx= do
-- xs <- mx
-- xss <- partition xs
-- rss <- clustered f xss
-- return $ concat rss
{-# LANGUAGE ExistentialQuantification, DeriveDataTypeable
, FlexibleInstances, FlexibleContexts, UndecidableInstances, MultiParamTypeClasses #-}
module Transient.DDS (distribute, getText, getUrl, getFile,textUrl, textFile, mapKeyB, mapKeyU, reduce) where
import Transient.Base
import Transient.Move hiding (pack)
import Transient.Logged
import Transient.Indeterminism
import Control.Applicative
import System.Random
import Control.Monad.IO.Class
import System.IO
import Control.Monad
import Data.Monoid
import Data.Maybe
import Data.Typeable
import Data.List hiding (delete, foldl')
import Control.Exception
import Control.Concurrent
--import Data.Time.Clock
import Network.HTTP
import Data.TCache hiding (onNothing)
import Data.TCache.Defs
import Data.ByteString.Lazy.Char8 (pack,unpack)
import Control.Monad.STM
import qualified Data.Map as M
import Control.Arrow (second)
import qualified Data.Vector.Unboxed as DVU
import qualified Data.Vector as DV
import Data.Hashable
import System.IO.Unsafe
import qualified Data.Foldable as F
import qualified Data.Text as Text
data DDS a= Loggable a => DDS (Cloud (PartRef a))
data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show)
data Partition a= Part Node Path Save a deriving (Typeable,Read,Show)
type Save= Bool
instance Indexable (Partition a) where
key (Part _ string b _)= keyp string b
keyp s True= "PartP@"++s
keyp s False="PartT@"++s
instance Loggable a => IResource (Partition a) where
keyResource= key
readResourceByKey k= r
where
typePart :: IO (Maybe a) -> a
typePart = undefined
r = if k !! 4 /= 'P' then return Nothing else
defaultReadByKey (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack)
writeResource (s@(Part _ _ save _))=
unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s)
eval :: DDS a -> TransIO (PartRef a)
eval (DDS mx) = runCloud mx
type Path=String
instance F.Foldable DVU.Vector where
{-# INLINE foldr #-}
foldr = foldr
{-# INLINE foldl #-}
foldl = foldl
{-# INLINE foldr1 #-}
foldr1 = foldr1
{-# INLINE foldl1 #-}
foldl1 = foldl1
--foldlIt' :: V.Unbox a => (b -> a -> b) -> b -> V.Vector a -> b
--foldlIt' f z0 xs= V.foldr f' id xs z0
-- where f' x k z = k $! f z x
--
--foldlIt1 :: V.Unbox a => (a -> a -> a) -> V.Vector a -> a
--foldlIt1 f xs = fromMaybe (error "foldl1: empty structure")
-- (V.foldl mf Nothing xs)
-- where
-- mf m y = Just (case m of
-- Nothing -> y
-- Just x -> f x y)
class (F.Foldable c, Typeable c, Typeable a, Monoid (c a), Loggable (c a)) => Distributable c a where
singleton :: a -> c a
splitAt :: Int -> c a -> (c a, c a)
fromList :: [a] -> c a
instance (Loggable a) => Distributable DV.Vector a where
singleton = DV.singleton
splitAt= DV.splitAt
fromList = DV.fromList
instance (Loggable a,DVU.Unbox a) => Distributable DVU.Vector a where
singleton= DVU.singleton
splitAt= DVU.splitAt
fromList= DVU.fromList
-- | perform a map and partition the result with different keys using boxed vectors
-- The final result will be used by reduce.
mapKeyB :: (Loggable a, Loggable b, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (DV.Vector a)
-> DDS (M.Map k(DV.Vector b))
mapKeyB= mapKey
-- | perform a map and partition the result with different keys using unboxed vectors
-- The final result will be used by reduce.
mapKeyU :: (Loggable a, Loggable b, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (DVU.Vector a)
-> DDS (M.Map k(DVU.Vector b))
mapKeyU= mapKey
-- | perform a map and partition the result with different keys.
-- The final result will be used by reduce.
mapKey :: (Distributable vector a,Distributable vector b, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (vector a)
-> DDS (M.Map k (vector b))
mapKey f (DDS mx)= DDS $ do
refs <- mx
process refs
where
-- process :: Partition a -> Cloud [Partition b]
process (ref@(Ref node path sav))= runAt node $ local $ do
xs <- getPartitionData ref -- !> "CMAP"
generateRef node $ map1 f xs
-- map1 :: (Ord k, F.Foldable vector) => (a -> (k,b)) -> vector a -> M.Map k(vector b)
map1 f v= F.foldl' f1 M.empty v
where
f1 map x=
let (k,r) = f x
in M.insertWith (<>) k (Transient.DDS.singleton r) map