Skip to content

Commit

Permalink
Fix RegionAwareEnsemblePlacementPolicy update rack info problem. (#3666)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4574ba0)
  • Loading branch information
horizonzy authored and hangc0276 committed Dec 8, 2022
1 parent 8e352d4 commit b7f9b83
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.util.HashedWheelTimer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -36,6 +37,7 @@
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.NetworkTopologyImpl;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.net.NodeBase;
import org.apache.bookkeeper.proto.BookieAddressResolver;
Expand Down Expand Up @@ -85,30 +87,34 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
address2Region = new ConcurrentHashMap<BookieId, String>();
}

protected String getLocalRegion(BookieNode node) {
if (null == node || null == node.getAddr()) {
return UNKNOWN_REGION;
}
return getRegion(node.getAddr());
}

protected String getRegion(BookieId addr) {
String region = address2Region.get(addr);
if (null == region) {
String networkLocation = resolveNetworkLocation(addr);
if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) {
region = UNKNOWN_REGION;
} else {
String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
if (parts.length <= 1) {
region = UNKNOWN_REGION;
} else {
region = parts[1];
}
}
region = parseBookieRegion(addr);
address2Region.putIfAbsent(addr, region);
}
return region;
}

protected String getLocalRegion(BookieNode node) {
if (null == node || null == node.getAddr()) {
protected String parseBookieRegion(BookieId addr) {
String networkLocation = resolveNetworkLocation(addr);
if (NetworkTopology.DEFAULT_REGION_AND_RACK.equals(networkLocation)) {
return UNKNOWN_REGION;
} else {
String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
if (parts.length <= 1) {
return UNKNOWN_REGION;
} else {
return parts[1];
}
}
return getRegion(node.getAddr());
}

@Override
Expand Down Expand Up @@ -163,6 +169,57 @@ public void handleBookiesThatJoined(Set<BookieId> joinedBookies) {
}
}

@Override
public void onBookieRackChange(List<BookieId> bookieAddressList) {
rwLock.writeLock().lock();
try {
bookieAddressList.forEach(bookieAddress -> {
try {
BookieNode node = knownBookies.get(bookieAddress);
if (node != null) {
// refresh the rack info if its a known bookie
BookieNode newNode = createBookieNode(bookieAddress);
if (!newNode.getNetworkLocation().equals(node.getNetworkLocation())) {
topology.remove(node);
topology.add(newNode);
knownBookies.put(bookieAddress, newNode);
historyBookies.put(bookieAddress, newNode);
}
//Handle per region placement policy.
String oldRegion = getRegion(bookieAddress);
String newRegion = parseBookieRegion(newNode.getAddr());
if (oldRegion.equals(newRegion)) {
TopologyAwareEnsemblePlacementPolicy regionPlacement = perRegionPlacement.get(oldRegion);
regionPlacement.onBookieRackChange(Collections.singletonList(bookieAddress));
} else {
address2Region.put(bookieAddress, newRegion);
TopologyAwareEnsemblePlacementPolicy oldRegionPlacement = perRegionPlacement.get(oldRegion);
oldRegionPlacement.handleBookiesThatLeft(Collections.singleton(bookieAddress));
TopologyAwareEnsemblePlacementPolicy newRegionPlacement = perRegionPlacement.get(
newRegion);
if (newRegionPlacement == null) {
newRegionPlacement = new RackawareEnsemblePlacementPolicy()
.initialize(dnsResolver, timer, this.reorderReadsRandom,
this.stabilizePeriodSeconds, this.reorderThresholdPendingRequests,
this.isWeighted, this.maxWeightMultiple,
this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum,
this.ignoreLocalNodeInPlacementPolicy, statsLogger,
bookieAddressResolver)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
perRegionPlacement.put(newRegion, newRegionPlacement);
}
newRegionPlacement.handleBookiesThatJoined(Collections.singleton(bookieAddress));
}
}
} catch (IllegalArgumentException | NetworkTopologyImpl.InvalidTopologyException e) {
LOG.error("Failed to update bookie rack info: {} ", bookieAddress, e);
}
});
} finally {
rwLock.writeLock().unlock();
}
}

@Override
public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.net.InetAddress;
Expand Down Expand Up @@ -1528,6 +1529,7 @@ public void testNodeWithFailures() throws Exception {
assertEquals(ensemble.get(reoderSet.get(7)), addr4.toBookieId());
}

@Test
public void testNewEnsembleSetWithFiveRegions() throws Exception {
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
Expand Down Expand Up @@ -1573,6 +1575,7 @@ public void testNewEnsembleSetWithFiveRegions() throws Exception {
}
}

@Test
public void testRegionsWithDiskWeight() throws Exception {
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
Expand Down Expand Up @@ -1607,4 +1610,155 @@ public void testRegionsWithDiskWeight() throws Exception {

assertEquals(3, ensemble.size());
}

@Test
public void testNotifyRackChangeWithOldRegion() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);

// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-1");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-1");

// Update cluster
Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
repp.onClusterChanged(addrs, new HashSet<>());

assertEquals(4, repp.knownBookies.size());
assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals(2, repp.perRegionPlacement.size());
TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1");
assertEquals(2, region1Placement.knownBookies.keySet().size());
assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());

TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2");
assertEquals(2, region2Placement.knownBookies.keySet().size());
assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));

// Update the rack.
// change addr2 rack info. /region1/rack-1 -> /region1/rack-2.
// change addr4 rack info. /region2/rack-1 -> /region1/rack-2
List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
List<String> rackList = new ArrayList<>();
bookieAddressList.add(addr2);
rackList.add("/region1/rack-2");
bookieAddressList.add(addr4);
rackList.add("/region1/rack-2");
StaticDNSResolver.changeRack(bookieAddressList, rackList);

assertEquals(4, repp.knownBookies.size());
assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-2", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-2", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals(2, repp.perRegionPlacement.size());
region1Placement = repp.perRegionPlacement.get("region1");
assertEquals(3, region1Placement.knownBookies.keySet().size());
assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-2", region1Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

region2Placement = repp.perRegionPlacement.get("region2");
assertEquals(1, region2Placement.knownBookies.keySet().size());
assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());

assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region1", repp.address2Region.get(addr4.toBookieId()));
}

@Test
public void testNotifyRackChangeWithNewRegion() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.1.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.1.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.1.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.1.4", 3181);

// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/rack-1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/rack-1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/rack-1");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/rack-1");

// Update cluster
Set<BookieId> addrs = Sets.newHashSet(addr1.toBookieId(),
addr2.toBookieId(), addr3.toBookieId(), addr4.toBookieId());
repp.onClusterChanged(addrs, new HashSet<>());

assertEquals(4, repp.knownBookies.size());
assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals(2, repp.perRegionPlacement.size());
TopologyAwareEnsemblePlacementPolicy region1Placement = repp.perRegionPlacement.get("region1");
assertEquals(2, region1Placement.knownBookies.keySet().size());
assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());

TopologyAwareEnsemblePlacementPolicy region2Placement = repp.perRegionPlacement.get("region2");
assertEquals(2, region2Placement.knownBookies.keySet().size());
assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
assertEquals("region1", repp.address2Region.get(addr2.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr4.toBookieId()));

// Update the rack.
// change addr2 rack info. /region1/rack-1 -> /region3/rack-1.
// change addr4 rack info. /region2/rack-1 -> /region3/rack-1
List<BookieSocketAddress> bookieAddressList = new ArrayList<>();
List<String> rackList = new ArrayList<>();
bookieAddressList.add(addr2);
rackList.add("/region3/rack-1");
bookieAddressList.add(addr4);
rackList.add("/region3/rack-1");
StaticDNSResolver.changeRack(bookieAddressList, rackList);

assertEquals(4, repp.knownBookies.size());
assertEquals("/region1/rack-1", repp.knownBookies.get(addr1.toBookieId()).getNetworkLocation());
assertEquals("/region3/rack-1", repp.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region2/rack-1", repp.knownBookies.get(addr3.toBookieId()).getNetworkLocation());
assertEquals("/region3/rack-1", repp.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals(3, repp.perRegionPlacement.size());
region1Placement = repp.perRegionPlacement.get("region1");
assertEquals(1, region1Placement.knownBookies.keySet().size());
assertEquals("/region1/rack-1", region1Placement.knownBookies.get(addr1.toBookieId()).getNetworkLocation());

region2Placement = repp.perRegionPlacement.get("region2");
assertEquals(1, region2Placement.knownBookies.keySet().size());
assertEquals("/region2/rack-1", region2Placement.knownBookies.get(addr3.toBookieId()).getNetworkLocation());

TopologyAwareEnsemblePlacementPolicy region3Placement = repp.perRegionPlacement.get("region3");
assertEquals(2, region3Placement.knownBookies.keySet().size());
assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr2.toBookieId()).getNetworkLocation());
assertEquals("/region3/rack-1", region3Placement.knownBookies.get(addr4.toBookieId()).getNetworkLocation());

assertEquals("region1", repp.address2Region.get(addr1.toBookieId()));
assertEquals("region3", repp.address2Region.get(addr2.toBookieId()));
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region3", repp.address2Region.get(addr4.toBookieId()));
}
}

0 comments on commit b7f9b83

Please sign in to comment.