Browse Source

fix ordering of messages in mapReduce

addedsingle
Alberto G. Corona 6 years ago
parent
commit
28c094da8f
  1. 24
      src/Transient/MapReduce.hs

24
src/Transient/MapReduce.hs

@ -222,28 +222,31 @@ reduce red (dds@(DDS mx))= loggedc $ do
-- foldAndSend :: (Hashable k, Distributable vector a)=> (Int,[(k,vector a)]) -> Cloud ()
foldAndSend nodes ref= do
nsent <- onAll $ liftIO $ newMVar 0
-- nsent <- onAll $ liftIO $ newMVar 0
pairs <- onAll $ getPartitionData1 ref
<|> return (error $ "DDS computed out of his node:"++ show ref)
let mpairs = groupByDestiny pairs
length <- local . return $ M.size mpairs
mpart <- loggedc $ parallelize foldthem (M.assocs mpairs) <|> return Nothing
loggedc $ mparallelize foldthemAndSend (M.assocs mpairs)
n <- lliftIO $ modifyMVar nsent $ \r -> return (r+1, r+1)
case mpart of
Just (i,folded) ->
runAt (nodes !! i) $ local $ putMailbox box $ Reduce folded
-- !> ("SEND REDUCE DATA",mynode))
Nothing -> return ()
when (n > length) $ sendEnd nodes
sendEnd nodes
where
foldthem (i,kvs)= local . async . return . Just
$ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs)
foldthemAndSend ikvs = do
mpart <- foldthem ikvs
case mpart of
Just (i,folded) ->
runAt (nodes !! i) $ local $ putMailbox box $ Reduce folded
-- !> ("SEND REDUCE DATA",mynode))
Nothing -> return ()
sendEnd nodes = onNodes nodes . local $ putMailbox box (EndReduce `asTypeOf` paramOf dds)
-- !> ("send ENDREDUCE",mynode)
@ -304,6 +307,7 @@ reduce red (dds@(DDS mx))= loggedc $ do
--parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b
parallelize f xs = foldr (<|>) empty $ map f xs
mparallelize f xs = foldr (<>) mempty $ map f xs
@ -465,7 +469,7 @@ generateRef x= do
node <- getMyNode
liftIO $ do
temp <- getTempName
let reg= Part node temp True x
let reg= Part node temp False x
atomically $ newDBRef reg
-- syncCache
(return $ getRef reg) -- !> ("generateRef",reg,node)

Loading…
Cancel
Save