Skip to content

Commit

Permalink
server: delay resource allocation only on startup rather than every r…
Browse files Browse the repository at this point in the history
…eallocation (hstreamdb#1826)

Relates to hstreamdb#1824
  • Loading branch information
Commelina authored May 31, 2024
1 parent 87a87f5 commit b1820d8
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 76 deletions.
43 changes: 39 additions & 4 deletions common/server/HStream/Common/Server/HashRing.hs
Original file line number Diff line number Diff line change
@@ -1,33 +1,68 @@
module HStream.Common.Server.HashRing
( LoadBalanceHashRing
, readLoadBalanceHashRing
, initializeHashRing
, updateHashRing
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Data.List (sort)
import Data.Maybe (fromMaybe)
import System.Environment (lookupEnv)
import Text.Read (readMaybe)

import HStream.Common.ConsistentHashing (HashRing, constructServerMap)
import HStream.Gossip.Types (Epoch, GossipContext)
import HStream.Gossip.Utils (getMemberListWithEpochSTM)
import qualified HStream.Logger as Log

type LoadBalanceHashRing = TVar (Epoch, HashRing)
-- FIXME: The 'Bool' flag means "if we think the HashRing can be used for
-- resource allocation now". This is because a server node can
-- only see a part of the cluster during the early stage of startup.
-- FIXME: This is just a mitigation for the consistency problem.
type LoadBalanceHashRing = TVar (Epoch, HashRing, Bool)

readLoadBalanceHashRing :: LoadBalanceHashRing -> STM (Epoch, HashRing)
readLoadBalanceHashRing hashRing = do
(epoch, hashRing, isReady) <- readTVar hashRing
if isReady
then return (epoch, hashRing)
else retry

initializeHashRing :: GossipContext -> IO LoadBalanceHashRing
initializeHashRing gc = atomically $ do
(epoch, serverNodes) <- getMemberListWithEpochSTM gc
newTVar (epoch, constructServerMap . sort $ serverNodes)
newTVar (epoch, constructServerMap . sort $ serverNodes, False)

-- However, reconstruct hashRing every time can be expensive
-- when we have a large number of nodes in the cluster.
-- FIXME: We delayed for several seconds to make sure the node has seen
-- the whole cluster. This is only a mitigation. See the comment
-- above.
-- FIXME: Hard-coded constant.
-- WARNING: This should be called exactly once on startup!
updateHashRing :: GossipContext -> LoadBalanceHashRing -> IO ()
updateHashRing gc hashRing = loop (0,[])
updateHashRing gc hashRing = do
let defaultMs = 5000
delayMs <- lookupEnv "HSTREAM_INTERNAL_STARTUP_EXTRA_DELAY_MS" >>= \case
Nothing -> return defaultMs
Just ms -> return (fromMaybe defaultMs (readMaybe ms))
void $ forkIO (earlyStageDelay delayMs)
loop (0,[])
where
earlyStageDelay timeoutMs = do
Log.info $ "Delaying for " <> Log.buildString' timeoutMs <> "ms before I can make resource allocation decisions..."
threadDelay (timeoutMs * 1000)
atomically $ modifyTVar' hashRing (\(epoch, hashRing, _) -> (epoch, hashRing, True))
Log.info "Cluster is ready!"

loop (epoch, list)=
loop =<< atomically
( do (epoch', list') <- getMemberListWithEpochSTM gc
when (epoch == epoch' && list == list') retry
writeTVar hashRing (epoch', constructServerMap list')
modifyTVar' hashRing
(\(_,_,isReady) -> (epoch', constructServerMap list', isReady))
return (epoch', list')
)
85 changes: 33 additions & 52 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ module HStream.Common.Server.Lookup
, kafkaResourceMetaId
) where

import Control.Concurrent (threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException (..), throwIO,
try)
Expand All @@ -19,7 +18,8 @@ import Data.Text (Text)
import qualified Data.Vector as V

import HStream.Common.ConsistentHashing (getResNode)
import HStream.Common.Server.HashRing (LoadBalanceHashRing)
import HStream.Common.Server.HashRing (LoadBalanceHashRing,
readLoadBalanceHashRing)
import HStream.Common.Server.MetaData (TaskAllocation (..))
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
Expand All @@ -30,7 +30,7 @@ import qualified HStream.Server.HStreamApi as A

lookupNode :: LoadBalanceHashRing -> Text -> Maybe Text -> IO A.ServerNode
lookupNode loadBalanceHashRing key advertisedListenersKey = do
(_, hashRing) <- readTVarIO loadBalanceHashRing
(_, hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing)
theNode <- getResNode hashRing key advertisedListenersKey
return theNode

Expand All @@ -42,62 +42,43 @@ lookupNodePersist
-> Text
-> Maybe Text
-> IO A.ServerNode
lookupNodePersist metaHandle_ gossipContext_ loadBalanceHashRing_
key_ metaId_ advertisedListenersKey_ =
-- FIXME: This is only a mitigation for the case that the node has not
-- known the full cluster info. Reinvestigate it!!!
-- And as you see, a hard-coded constant...
go metaHandle_ gossipContext_ loadBalanceHashRing_ key_ metaId_ advertisedListenersKey_ 5
where
-- TODO: Currerntly, 'leftRetries' only works before a re-allocation. It can be also
-- used on other cases such as encountering an exception.
go metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey leftRetries = do
-- FIXME: it will insert the results of lookup no matter the resource exists
-- or not
M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey = do
-- FIXME: it will insert the results of lookup no matter the resource exists
-- or not
M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
Nothing -> do
(epoch, hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing)
theNode <- getResNode hashRing key advertisedListenersKey
try (M.insertMeta @TaskAllocation
metaId
(TaskAllocation epoch (A.serverNodeId theNode))
metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey)
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
Nothing -> do
(epoch, hashRing) <- readTVarIO loadBalanceHashRing
theNode <- getResNode hashRing key advertisedListenersKey
try (M.insertMeta @TaskAllocation
metaId
(TaskAllocation epoch (A.serverNodeId theNode))
metaHandle) >>= \case
(epoch', hashRing) <- atomically (readLoadBalanceHashRing loadBalanceHashRing)
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey)
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
Nothing -> do
if leftRetries > 0
then do
Log.info $ "<lookupNodePersist> on <key=" <> Log.buildString' key <> ", metaId=" <>
Log.buildString' metaId <> ">: found on Node=" <> Log.buildString' nodeId <>
", but not sure if it's really dead. Left " <> Log.buildString' leftRetries <>
" retries before re-allocate it..."
threadDelay (1 * 1000 * 1000)
go metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey (leftRetries - 1)
else do
(epoch', hashRing) <- readTVarIO loadBalanceHashRing
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'
Right () -> return theNode'

data KafkaResource
= KafkaResTopic Text
Expand Down
33 changes: 16 additions & 17 deletions common/server/HStream/Common/Server/TaskManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@

module HStream.Common.Server.TaskManager where

import Control.Concurrent (forkIO)
import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as C
import qualified Control.Exception as E
import qualified Control.Monad as M
import Control.Concurrent (forkIO)
import qualified Control.Concurrent as C
import qualified Control.Exception as E
import qualified Control.Monad as M
import Data.Int
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word32)
import HStream.Common.ConsistentHashing (HashRing)
import HStream.Common.Server.Lookup (lookupNodePersist)
import qualified HStream.Exception as HE
import HStream.Gossip.Types (Epoch, GossipContext)
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaHandle)
import HStream.Server.HStreamApi (ServerNode (serverNodeId))
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word32)
import HStream.Common.Server.HashRing (LoadBalanceHashRing)
import HStream.Common.Server.Lookup (lookupNodePersist)
import qualified HStream.Exception as HE
import HStream.Gossip.Types (GossipContext)
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaHandle)
import HStream.Server.HStreamApi (ServerNode (serverNodeId))

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -51,7 +50,7 @@ data TaskDetector

, metaHandle :: MetaHandle
, gossipContext :: GossipContext
, loadBalanceHashRing :: C.TVar (Epoch, HashRing)
, loadBalanceHashRing :: LoadBalanceHashRing
, advertisedListenersKey :: Maybe T.Text
, serverID :: Word32
}
Expand Down
2 changes: 1 addition & 1 deletion hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ serve sc@ServerContext{..} rpcOpts enableStreamV2 = do
#endif
<> ", waiting for cluster to get ready"
void $ forkIO $ do
void (readMVar (clusterReady gossipContext)) >> Log.info "Cluster is ready!"
void (readMVar (clusterReady gossipContext))
readMVar (clusterInited gossipContext) >>= \case
Gossip -> return ()
_ -> do
Expand Down
4 changes: 2 additions & 2 deletions hstream/src/HStream/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import Control.Monad (when)
import Data.IORef (IORef)
import Data.Maybe (fromJust)
import HStream.Base.Timer (CompactedWorker)
import HStream.Common.ConsistentHashing (HashRing)
import HStream.Common.Server.HashRing (LoadBalanceHashRing)
import HStream.Common.Types (ShardKey)
import qualified HStream.Exception as HE
import HStream.Gossip.Types (Epoch, GossipContext)
Expand Down Expand Up @@ -100,7 +100,7 @@ data ServerContext = ServerContext
, headerConfig :: AA.HeaderConfig AA.AdminAPI
#endif
, scStatsHolder :: Stats.StatsHolder
, loadBalanceHashRing :: TVar (Epoch, HashRing)
, loadBalanceHashRing :: LoadBalanceHashRing
, scIOWorker :: IO.Worker
, gossipContext :: GossipContext
, serverOpts :: ServerOpts
Expand Down

0 comments on commit b1820d8

Please sign in to comment.