Browse Source

restore previous code

addedsingle
Alberto G. Corona 6 years ago
parent
commit
cf46e19067
  1. 42
      src/Transient/MapReduce.hs

42
src/Transient/MapReduce.hs

@ -33,7 +33,7 @@ data PartRef a=PartRef a
#else
import Transient.Base
--import Transient.Internals((!>))
import Transient.Internals((!>))
import Transient.Move hiding (pack)
import Transient.Logged
import Transient.Indeterminism
@ -207,7 +207,7 @@ reduce red (dds@(DDS mx))= loggedc $ do
let lengthNodes = length nodes
shuffler nodes = do
ref@(Ref node path sav) <- mx
return () -- !> ref
-- return () !> ref
runAt node $ foldAndSend nodes ref
stop
@ -222,30 +222,42 @@ reduce red (dds@(DDS mx))= loggedc $ do
-- foldAndSend :: (Hashable k, Distributable vector a)=> (Int,[(k,vector a)]) -> Cloud ()
foldAndSend nodes ref= do
-- nsent <- onAll $ liftIO $ newMVar 0
pairs <- onAll $ getPartitionData1 ref
<|> return (error $ "DDS computed out of his node:"++ show ref)
let mpairs = groupByDestiny pairs
length <- local . return $ M.size mpairs
nsent <- onAll $ liftIO $ newMVar 0
(i,folded) <- local $ parallelize foldthem (M.assocs mpairs) -- <|> return Nothing
loggedc $ mparallelize foldthemAndSend (M.assocs mpairs)
n <- lliftIO $ modifyMVar nsent $ \r -> return (r+1, r+1)
runAt (nodes !! i) $ local $ putMailbox box $ Reduce folded
sendEnd nodes
when (n == length) $ sendEnd nodes
where
foldthem (i,kvs)= local . async . return . Just
count n proc proc2= do
nsent <- onAll $ liftIO $ newMVar 0
proc
n' <- lliftIO $ modifyMVar nsent $ \r -> return (r+1, r+1)
when (n'==n) proc2
foldthem (i,kvs)= async . return
$ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs)
foldthemAndSend ikvs = do
mpart <- foldthem ikvs
case mpart of
Just (i,folded) ->
runAt (nodes !! i) $ local $ putMailbox box $ Reduce folded
-- !> ("SEND REDUCE DATA",mynode))
Nothing -> return ()
foldthemAndSend ikvs = loggedc $ do
(i,folded) <- local $ foldthem ikvs
runAt (nodes !! i) $ local $ putMailbox box $ Reduce folded
sendEnd nodes = onNodes nodes . local $ putMailbox box (EndReduce `asTypeOf` paramOf dds)
@ -307,7 +319,7 @@ reduce red (dds@(DDS mx))= loggedc $ do
--parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b
parallelize f xs = foldr (<|>) empty $ map f xs
mparallelize f xs = foldr (<>) mempty $ map f xs
mparallelize f xs = loggedc $ foldr (<>) mempty $ map f xs
@ -329,7 +341,7 @@ getPartitionData1 (Ref node path save) = Transient $ do
$ readDBRef
$ getDBRef
$ keyp path save
-- `onNothing` error ("not found DDS data: "++ keyp path save)
case mp of
Just (Part _ _ _ xs) -> return $ Just xs
Nothing -> return Nothing

Loading…
Cancel
Save