From b7f9b83f9e8f1d954044c271f516a939cbee3a22 Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Wed, 7 Dec 2022 15:50:09 +0800 Subject: [PATCH] Fix RegionAwareEnsemblePlacementPolicy update rack info problem. (#3666) (cherry picked from commit 4574ba02333308ad648c9b78963a381ad83ea564) --- .../RegionAwareEnsemblePlacementPolicy.java | 85 ++++++++-- ...estRegionAwareEnsemblePlacementPolicy.java | 154 ++++++++++++++++++ 2 files changed, 225 insertions(+), 14 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java index 383260dc80a..b7a7f0e48ee 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java @@ -20,6 +20,7 @@ import io.netty.util.HashedWheelTimer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.bookkeeper.net.BookieNode; import org.apache.bookkeeper.net.DNSToSwitchMapping; import org.apache.bookkeeper.net.NetworkTopology; +import org.apache.bookkeeper.net.NetworkTopologyImpl; import org.apache.bookkeeper.net.Node; import org.apache.bookkeeper.net.NodeBase; import org.apache.bookkeeper.proto.BookieAddressResolver; @@ -85,30 +87,34 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme address2Region = new ConcurrentHashMap(); } + protected String getLocalRegion(BookieNode node) { + if (null == node || null == node.getAddr()) { + return UNKNOWN_REGION; + } + return getRegion(node.getAddr()); + } + protected String getRegion(BookieId addr) { String region = address2Region.get(addr); if (null == region) { - String networkLocation = resolveNetworkLocation(addr); - if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) { - region = UNKNOWN_REGION; - } else { - String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR); - if (parts.length <= 1) { - region = UNKNOWN_REGION; - } else { - region = parts[1]; - } - } + region = parseBookieRegion(addr); address2Region.putIfAbsent(addr, region); } return region; } - protected String getLocalRegion(BookieNode node) { - if (null == node || null == node.getAddr()) { + protected String parseBookieRegion(BookieId addr) { + String networkLocation = resolveNetworkLocation(addr); + if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) { return UNKNOWN_REGION; + } else { + String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR); + if (parts.length <= 1) { + return UNKNOWN_REGION; + } else { + return parts[1]; + } } - return getRegion(node.getAddr()); } @Override @@ -163,6 +169,57 @@ public void handleBookiesThatJoined(Set joinedBookies) { } } + @Override + public void onBookieRackChange(List bookieAddressList) { + rwLock.writeLock().lock(); + try { + bookieAddressList.forEach(bookieAddress -> { + try { + BookieNode node = knownBookies.get(bookieAddress); + if (node != null) { + // refresh the rack info if its a known bookie + BookieNode newNode = createBookieNode(bookieAddress); + if (!newNode.getNetworkLocation().equals(node.getNetworkLocation())) { + topology.remove(node); + topology.add(newNode); + knownBookies.put(bookieAddress, newNode); + historyBookies.put(bookieAddress, newNode); + } + //Handle per region placement policy. + String oldRegion = getRegion(bookieAddress); + String newRegion = parseBookieRegion(newNode.getAddr()); + if (oldRegion.equals(newRegion)) { + TopologyAwareEnsemblePlacementPolicy regionPlacement = perRegionPlacement.get(oldRegion); + regionPlacement.onBookieRackChange(Collections.singletonList(bookieAddress)); + } else { + address2Region.put(bookieAddress, newRegion); + TopologyAwareEnsemblePlacementPolicy oldRegionPlacement = perRegionPlacement.get(oldRegion); + oldRegionPlacement.handleBookiesThatLeft(Collections.singleton(bookieAddress)); + TopologyAwareEnsemblePlacementPolicy newRegionPlacement = perRegionPlacement.get( + newRegion); + if (newRegionPlacement == null) { + newRegionPlacement = new RackawareEnsemblePlacementPolicy() + .initialize(dnsResolver, timer, this.reorderReadsRandom, + this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests, + this.isWeighted, this.maxWeightMultiple, + this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, + this.ignoreLocalNodeInPlacementPolicy, statsLogger, + bookieAddressResolver) + .withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK); + perRegionPlacement.put(newRegion, newRegionPlacement); + } + newRegionPlacement.handleBookiesThatJoined(Collections.singleton(bookieAddress)); + } + } + } catch (IllegalArgumentException | NetworkTopologyImpl.InvalidTopologyException e) { + LOG.error("Failed to update bookie rack info: {} ", bookieAddress, e); + } + }); + } finally { + rwLock.writeLock().unlock(); + } + } + @Override public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional optionalDnsResolver, diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java index fc4e02cae91..7fe5b83db48 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java @@ -28,6 +28,7 @@ import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues; import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.HashedWheelTimer; import java.net.InetAddress; @@ -1528,6 +1529,7 @@ public void testNodeWithFailures() throws Exception { assertEquals(ensemble.get(reoderSet.get(7)), addr4.toBookieId()); } + @Test public void testNewEnsembleSetWithFiveRegions() throws Exception { repp.uninitalize(); repp = new RegionAwareEnsemblePlacementPolicy(); @@ -1573,6 +1575,7 @@ public void testNewEnsembleSetWithFiveRegions() throws Exception { } } + @Test public void testRegionsWithDiskWeight() throws Exception { repp.uninitalize(); repp = new RegionAwareEnsemblePlacementPolicy(); @@ -1607,4 +1610,155 @@ public void testRegionsWithDiskWeight() throws Exception { assertEquals(3, ensemble.size()); } + + @Test + public void testNotifyRackChangeWithOldRegion() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181); + + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-1"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-1"); + + // Update cluster + Set addrs = Sets.newHashSet(addr1.toBookieId(), + addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<>()); + + assertEquals(4, repp.knownBookies.size()); + assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region1/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region2/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals(2, repp.perRegionPlacement.size()); + TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1"); + assertEquals(2, region1Placement.knownBookies.keySet().size()); + assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + + TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2"); + assertEquals(2, region2Placement.knownBookies.keySet().size()); + assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals("region1", repp.address2Region.get(addr1.toBookieId())); + assertEquals("region1", repp.address2Region.get(addr2.toBookieId())); + assertEquals("region2", repp.address2Region.get(addr3.toBookieId())); + assertEquals("region2", repp.address2Region.get(addr4.toBookieId())); + + // Update the rack. + // change addr2 rack info. /region1/rack-1 -> /region1/rack-2. + // change addr4 rack info. /region2/rack-1 -> /region1/rack-2 + List bookieAddressList = new ArrayList<>(); + List rackList = new ArrayList<>(); + bookieAddressList.add(addr2); + rackList.add("/region1/rack-2"); + bookieAddressList.add(addr4); + rackList.add("/region1/rack-2"); + StaticDNSResolver.changeRack(bookieAddressList, rackList); + + assertEquals(4, repp.knownBookies.size()); + assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region1/rack-2", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region1/rack-2", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals(2, repp.perRegionPlacement.size()); + region1Placement = repp.perRegionPlacement.get("region1"); + assertEquals(3, region1Placement.knownBookies.keySet().size()); + assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + region2Placement = repp.perRegionPlacement.get("region2"); + assertEquals(1, region2Placement.knownBookies.keySet().size()); + assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + + assertEquals("region1", repp.address2Region.get(addr1.toBookieId())); + assertEquals("region1", repp.address2Region.get(addr2.toBookieId())); + assertEquals("region2", repp.address2Region.get(addr3.toBookieId())); + assertEquals("region1", repp.address2Region.get(addr4.toBookieId())); + } + + @Test + public void testNotifyRackChangeWithNewRegion() throws Exception { + BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181); + BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181); + BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181); + BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181); + + // update dns mapping + StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1"); + StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-1"); + StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-1"); + StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-1"); + + // Update cluster + Set addrs = Sets.newHashSet(addr1.toBookieId(), + addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId()); + repp.onClusterChanged(addrs, new HashSet<>()); + + assertEquals(4, repp.knownBookies.size()); + assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region1/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region2/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals(2, repp.perRegionPlacement.size()); + TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1"); + assertEquals(2, region1Placement.knownBookies.keySet().size()); + assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + + TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2"); + assertEquals(2, region2Placement.knownBookies.keySet().size()); + assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals("region1", repp.address2Region.get(addr1.toBookieId())); + assertEquals("region1", repp.address2Region.get(addr2.toBookieId())); + assertEquals("region2", repp.address2Region.get(addr3.toBookieId())); + assertEquals("region2", repp.address2Region.get(addr4.toBookieId())); + + // Update the rack. + // change addr2 rack info. /region1/rack-1 -> /region3/rack-1. + // change addr4 rack info. /region2/rack-1 -> /region3/rack-1 + List bookieAddressList = new ArrayList<>(); + List rackList = new ArrayList<>(); + bookieAddressList.add(addr2); + rackList.add("/region3/rack-1"); + bookieAddressList.add(addr4); + rackList.add("/region3/rack-1"); + StaticDNSResolver.changeRack(bookieAddressList, rackList); + + assertEquals(4, repp.knownBookies.size()); + assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + assertEquals("/region3/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + assertEquals("/region3/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals(3, repp.perRegionPlacement.size()); + region1Placement = repp.perRegionPlacement.get("region1"); + assertEquals(1, region1Placement.knownBookies.keySet().size()); + assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation()); + + region2Placement = repp.perRegionPlacement.get("region2"); + assertEquals(1, region2Placement.knownBookies.keySet().size()); + assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation()); + + TopologyAwareEnsemblePlacementPolicy region3Placement = repp.perRegionPlacement.get("region3"); + assertEquals(2, region3Placement.knownBookies.keySet().size()); + assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation()); + assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation()); + + assertEquals("region1", repp.address2Region.get(addr1.toBookieId())); + assertEquals("region3", repp.address2Region.get(addr2.toBookieId())); + assertEquals("region2", repp.address2Region.get(addr3.toBookieId())); + assertEquals("region3", repp.address2Region.get(addr4.toBookieId())); + } }