diff --git a/sharding/src/distribution/mod.rs b/sharding/src/distribution/mod.rs index a3ffb65f7..f53f2d87b 100644 --- a/sharding/src/distribution/mod.rs +++ b/sharding/src/distribution/mod.rs @@ -76,6 +76,7 @@ impl Distribute { "secmod" => Self::SecMod(SecMod::from(names.len())), _ => { log::warn!("'{}' is not valid , use modula instead", distribution); + println!("'{}' is not valid , use modula instead!", distribution); Self::Modula(Modula::from(names.len(), false)) } } diff --git a/sharding/src/hash/fnv1.rs b/sharding/src/hash/fnv1.rs index f2272459e..27d4039fe 100644 --- a/sharding/src/hash/fnv1.rs +++ b/sharding/src/hash/fnv1.rs @@ -1,5 +1,26 @@ -/// 按需支持fnv1系列所有相关的hash算法,目前支持fnv1a_64 +/// 按需支持fnv1系列所有相关的hash算法,目前支持fnv1_32、fnv1a_64; +/// 对应算法源自twemproxy +/// fnv1_32相关 +#[derive(Debug, Default, Clone)] +pub struct Fnv1F32; + +const FNV_32_INIT: u32 = 2166136261; +const FNV_32_PRIME: u32 = 16777619; + +impl super::Hash for Fnv1F32 { + fn hash(&self, key: &S) -> i64 { + let mut hash = FNV_32_INIT; + for i in 0..key.len() { + hash = hash.wrapping_mul(FNV_32_PRIME); + hash = hash ^ (key.at(i) as u32); + } + hash as i64 + } +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +/// fnv1a_64相关 #[derive(Debug, Default, Clone)] pub struct Fnv1aF64; @@ -13,7 +34,6 @@ impl super::Hash for Fnv1aF64 { hash ^= key.at(i) as u32; hash = hash.wrapping_mul(FNV_64_PRIME as u32); } - hash as i64 } } diff --git a/sharding/src/hash/mod.rs b/sharding/src/hash/mod.rs index 58b388fbf..a8214ba43 100644 --- a/sharding/src/hash/mod.rs +++ b/sharding/src/hash/mod.rs @@ -26,8 +26,7 @@ pub use rawsuffix::RawSuffix; pub mod crc; use enum_dispatch::enum_dispatch; - -use self::{bkdrsub::Bkdrsub, crc64::Crc64, fnv1::Fnv1aF64}; +use self::{bkdrsub::Bkdrsub, crc64::Crc64, fnv1::Fnv1F32, fnv1::Fnv1aF64}; // 占位hash,主要用于兼容服务框架,供mq等业务使用 pub const HASH_PADDING: &str = "padding"; @@ -88,6 +87,7 @@ pub enum Hasher { Fnv1aF64(Fnv1aF64), Random(RandomHash), // random hash RawSuffix(RawSuffix), + Fnv1_32(Fnv1F32), } impl Hasher { @@ -132,10 +132,12 @@ impl Hasher { "crc32abs" => Self::Crc32Abs(Default::default()), "crc64" => Self::Crc64(Default::default()), "random" => Self::Random(Default::default()), + "fnv1_32" => Self::Fnv1_32(Default::default()), "fnv1a_64" => Self::Fnv1aF64(Default::default()), _ => { // 默认采用mc的crc32-s hash log::error!("found unknown hash:{}, use crc32-short instead", alg); + println!("found unknown hash:{}, use crc32-short instead!", alg); return Self::Crc32Short(Default::default()); } }; diff --git a/tests/sharding_datas/common/all_data.txt b/tests/sharding_datas/common/all_data.txt new file mode 100644 index 000000000..96ba82009 --- /dev/null +++ b/tests/sharding_datas/common/all_data.txt @@ -0,0 +1,30 @@ +# 文件格式:首先设置hash、distribution、shard_count,然后设置每个分片的数据 +# header +hash=crc64 +distribution=modula +shard_count=6 + +shard_idx=0 +hot_band_conf_6041884361 +hot_band_conf_7457077031 +hot_band_conf_5983133201 + +shard_idx=1 +hot_band_conf_5658472751 +hot_band_conf_3306842351 +hot_band_conf_7550365451 +hot_band_conf_3306842351 + +shard_idx=2 +hot_band_conf_2759747141 +hot_band_conf_7202166971 +hot_band_conf_7723136021 + +shard_idx=3 +hot_band_conf_7816767401 + +shard_idx=4 +hot_band_conf_5211889061 + +shard_idx=5 +hot_band_conf_2739628991 diff --git a/tests/sharding_datas/common/compare.txt b/tests/sharding_datas/common/compare.txt new file mode 100644 index 000000000..cea3138e6 --- /dev/null +++ b/tests/sharding_datas/common/compare.txt @@ -0,0 +1,7 @@ +4467599223029960 +3735267922960943 +3543172671127490 +3542600890981532 +3736215076519398 +4480826111688800 +4673790045847657 diff --git a/tests/sharding_datas/common/redis.data b/tests/sharding_datas/common/redis.data new file mode 100644 index 000000000..357162ff2 --- /dev/null +++ b/tests/sharding_datas/common/redis.data @@ -0,0 +1,72 @@ +# 文件格式:首先设置hash、dist、shard_count,然后设置每个分片的数据 +# header +hash=fnv1_32 +distribution=modula +shard_count=16 + +shard_idx=0 +ocpx_app_22320534_hot_tweets_feed_20240307 +ocpx_ctcvr_cache_v2_22333703_20240313 + +shard_idx=1 +ocpx_cpl_22268383_hot_tweets_feed_20240305lastslot + +shard_idx=2 +ocpx_cpl_22332413_hot_tweets_feed_20240311lastslot + +shard_idx=3 +ocpx_cpl_22304432_discover_hotspot_feed_20240311lastslot +coldstart:ad_info:22315012 +ocpx_cpl_22229734_queryfeed_20240307lastslot + +shard_idx=4 +ocpx_app_22292303_other_20240313 + +shard_idx=5 +ocpx_24h_pid_22309234_20240309 +ocpx_app_22216665_other_20240311lastslot + +shard_idx=6 +ocpx_cpl_22186766_discover_hotspot_feed_20240307 +ocpx_cpl_22333450_general_queryfeed_20240311lastslot + +shard_idx=7 +ocpx_app_22216118_hot_tweets_feed_20240313 +ocpx_cpl_22302263_vertical_video_20240308 +ocpx_cpl_22280249_mainfeed_20240302lastslot + +shard_idx=8 +ocpx_cpl_22255305_discover_hotspot_feed_20240312 +ocpx_ctcvr_v2_22309626 + +shard_idx=9 +ocpx_creative_custopt_3275145341_86004001_20240224 +ocpx_fan_19964051_other_20240310lastslot +ocpx_fan_22287547_other_20240307 + +shard_idx=10 +ocpx_cpl_22327987_other_20240310 +ocpx_cpl_22025614_vertical_video_20240309lastslot +ocpx_app_22202692_other_20240308 + +shard_idx=11 +ocpx_app_22243530_mainfeed_20240306 +ocpx_app_18774341_other_20240305lastslot + +shard_idx=12 +ocpx_app_22276505_other_20240302lastslot +ocpx_app_20041469_vertical_video_20240310lastslot + +shard_idx=13 +ocpx_cpl_22316879_queryfeed_20240308 +ocpx_app_22306747_other_20240307 + +shard_idx=14 +ocpx_app_22024254_other_20240308 +ocpx_app_21877105_comments_list_20240303lastslot + +shard_idx=15 +ocpx_adjustcost_22134355_20240312 +ocpx_app_22257806_other_20240308 +ocpx_cpl_21862394_hot_tweets_feed_20240306 + diff --git a/tests/src/all.rs b/tests/src/all.rs index 24db6ab1b..0ee6426d5 100644 --- a/tests/src/all.rs +++ b/tests/src/all.rs @@ -27,5 +27,6 @@ mod mysql_strategy; mod number; mod ring_buffer; mod select; +mod shard_checker; mod time; mod tx_buffer; diff --git a/tests/src/bkdrsub.rs b/tests/src/bkdrsub.rs index 44a0b7b0a..b312f0b49 100644 --- a/tests/src/bkdrsub.rs +++ b/tests/src/bkdrsub.rs @@ -65,6 +65,8 @@ fn check_file(idx: usize, fname: &str, hasher: Hasher, dist: Distribute) { let idx_line = dist.index(hash); if idx_line != idx { println!("line:{}, hash:{}, idx:{}", line, hash, idx_line); + } else { + println!("succeed line:{}, hash:{}, idx:{}", line, hash, idx_line); } assert_eq!( idx, idx_line, diff --git a/tests/src/dns.rs b/tests/src/dns.rs index df758d790..d6bf65f5d 100644 --- a/tests/src/dns.rs +++ b/tests/src/dns.rs @@ -29,6 +29,7 @@ impl Lookup for Dns { } } } +#[ignore = "暂时注释掉,#439 有修正"] #[test] fn dns_lookup() { let mut query: Vec> = Vec::new(); diff --git a/tests/src/shard_checker.rs b/tests/src/shard_checker.rs new file mode 100644 index 000000000..3cddb9c4c --- /dev/null +++ b/tests/src/shard_checker.rs @@ -0,0 +1,245 @@ +use core::panic; +use std::{ + fs::File, + io::{BufRead, BufReader, BufWriter, Write}, +}; + +use sharding::{ + distribution::Distribute, + hash::{Hash, Hasher}, +}; + +/// shard_checker 用于校验任何hash/distribution/shard_count 的正确性,校验数据的格式如下: +/// 1. 首先设置好待check文件的header: +/// hash=bkdirsub +/// distribution=modula +/// shard_count=180 +/// 2. 为每一个分片的第一行记录分片idx,格式如: +/// shard_idx=0 +/// 3. 每个分片后续的行,记录该分片的key,格式如: +/// 123456.abcd +/// 456789.abcd +/// +/// 备注: 对分片的顺序无要求,但分片需要从0开始计数。 + +const HASH_PREIFX: &str = "hash="; +const DISTRIBUTION_PREIFX: &str = "distribution="; +const SHARD_COUNT_PREFIX: &str = "shard_count="; +const IDX_SHARD_PREFIX: &str = "shard_idx="; + +/// 使用姿势很简单,按指定格式准备好分片文件,然后设置文件名,调用check_shard_data即可 +#[test] +fn check_shard_data() { + let root_dir = "sharding_datas/common"; + let data_file = "redis.data"; + + shard_checker(root_dir, data_file); +} + +#[test] +fn build_shard_data() { + let shard_conf = ShardConf { + hash: "crc32-point".to_string(), + distribution: "secmod".to_string(), + shards: 2, + }; + + let root_dir = "sharding_datas/common"; + let src_data_file = "compare.txt"; + let dest_data_file = "compare_shard.txt"; + + write_shard_data(&shard_conf, root_dir, src_data_file, dest_data_file); +} + +/// shard校验的基础配置 +#[derive(Debug, Default)] +struct ShardConf { + hash: String, + distribution: String, + shards: usize, +} + +impl ShardConf { + /// check 是否parse 完毕 + fn parsed(&self) -> bool { + if self.hash.len() == 0 || self.distribution.len() == 0 || self.shards == 0 { + return false; + } + true + } +} + +fn parse_header(reader: &mut BufReader) -> ShardConf { + let mut shard_conf: ShardConf = Default::default(); + loop { + let mut line = String::with_capacity(64); + match reader.read_line(&mut line) { + Ok(len) => { + line = line.trim().to_string(); + // len 为0,说明读到文件末尾,退出loop + if len == 0 { + println!("completed parse header!"); + break; + } + + // parse header + if line.trim().len() == 0 || line.starts_with("#") { + // 读到空行或者注释行,跳过 + // println!("ignoe line: {}", line); + } else if line.starts_with(HASH_PREIFX) { + // 读到配置项,解析配置项 + shard_conf.hash = line.split("=").nth(1).unwrap().trim().to_string(); + // println!("hash: {}", shard_conf.hash); + } else if line.starts_with(DISTRIBUTION_PREIFX) { + // 解析dist + shard_conf.distribution = line.split("=").nth(1).unwrap().trim().to_string(); + // println!("distribution: {}", shard_conf.distribution); + } else if line.starts_with(SHARD_COUNT_PREFIX) { + // 解析shard_count + shard_conf.shards = line.split("=").nth(1).unwrap().trim().parse().unwrap(); + // println!("shards: {}", shard_conf.shards); + } + if shard_conf.parsed() { + break; + } + } + Err(err) => { + panic!("read file error: {}", err); + } + } + } + if shard_conf.hash.len() == 0 || shard_conf.distribution.len() == 0 || shard_conf.shards == 0 { + panic!("parse header failed: {:?}", shard_conf); + } + shard_conf +} +fn shard_checker(root_dir: &str, data_file: &str) { + // 首先读取文件header配置 + let data_file = format!("{}/{}", root_dir, data_file); + let file = File::open(&data_file).unwrap(); + let mut reader = BufReader::new(file); + + let shard_conf = parse_header(&mut reader); + println!("+++ hash file header: {:?}", shard_conf); + + // 做check的初始化:读取文件,初始化hash、dist等 + + let shards = mock_servers(shard_conf.shards); + let mut success_count = 0; + let hasher = Hasher::from(shard_conf.hash.as_str()); + let dist = Distribute::from(shard_conf.distribution.as_str(), &shards); + + // 开始loop文件,check key + let mut shard_idx_real = 0; + loop { + let mut line = String::with_capacity(64); + match reader.read_line(&mut line) { + Ok(len) => { + // len 为0,说明读到文件末尾,停止loop + if len == 0 { + println!("read all data"); + break; + } + + // 忽略空行和注释行 + line = line.trim().to_string(); + if line.len() == 0 || line.starts_with("#") { + // println!("ignore line:{}", line); + continue; + } + + // 确认shard idx + if line.starts_with(IDX_SHARD_PREFIX) { + shard_idx_real = line.split("=").nth(1).unwrap().parse::().unwrap(); + // println!("will start check new shard: {}...", shard_idx_real); + continue; + } + + // 把每行作为一个key,计算hash和dist + let key = line; + let hash = hasher.hash(&key.as_bytes()); + let idx = dist.index(hash); + assert_eq!( + shard_idx_real, idx, + "key: {} - {}:{}, expected shard: {}", + key, hash, idx, shard_idx_real + ); + success_count += 1; + + println!("proc succeed line:{}", key); + } + Err(e) => { + println!("found err: {:?}", e); + break; + } + } + } + println!("file:{}, succeed count:{}", data_file, success_count); + assert!(success_count > 0); +} + +fn mock_servers(shard_count: usize) -> Vec { + // let shard_count = 8; + let mut servers = Vec::with_capacity(shard_count); + for i in 0..shard_count { + servers.push(format!("192.168.0.{}", i).to_string()); + } + servers +} + +fn write_shard_data( + shard_conf: &ShardConf, + root_dir: &str, + src_data_file: &str, + dest_data_file: &str, +) { + let src = format!("{}/{}", root_dir, src_data_file); + let dest = format!("{}/{}", root_dir, dest_data_file); + let src_file = File::open(&src).unwrap(); + + let mut writer = BufWriter::new(File::create(&dest).unwrap()); + let header = format!( + "# 文件格式:首先设置hash、dist、shard_count,然后设置每个分片的数据\n# header\nhash={}\ndistribution={}\nshard_count={}\n", + shard_conf.hash, shard_conf.distribution, shard_conf.shards + ); + writer.write(header.as_bytes()).unwrap(); + + // 计算每一行的key,记录到对应的分片位置 + let hasher = Hasher::from(&shard_conf.hash); + let mock_servers = mock_servers(shard_conf.shards); + let dist = Distribute::from(&shard_conf.distribution, &mock_servers); + let mut shard_keys = Vec::new(); + for _i in 0..shard_conf.shards { + shard_keys.push(Vec::with_capacity(128)); + } + let mut reader = BufReader::new(src_file); + loop { + let mut line = String::with_capacity(64); + match reader.read_line(&mut line) { + Ok(len) => { + if len == 0 { + // 读到文件末尾 + break; + } + line = line.trim().to_string(); + let hash = hasher.hash(&line.as_bytes()); + let idx = dist.index(hash); + shard_keys.get_mut(idx).unwrap().push(line); + } + Err(err) => { + panic!("read error: {}", err); + } + } + } + + // 将每个分片的key记录入目标文件 + for (i, keys) in shard_keys.into_iter().enumerate() { + let shard_str = format!("\nshard_idx={}\n", i); + writer.write_all(shard_str.as_bytes()).unwrap(); + for key in keys { + writer.write(key.as_bytes()).unwrap(); + writer.write(b"\n").unwrap(); + } + writer.flush().unwrap(); + } +}