1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
/***************************************** ConsistentHashing.h Copyright (c) 2018 David Feng Distributed under the MIT License. *****************************************/ #ifndef _CONSISTENT_HASHING_H #define _CONSISTENT_HASHING_H #include <list> #include <map> #include <string> #include <iostream> /* Implement a simple hash map template class that supports uniform distribution (load balance), efficient resizing buckets and rehashing (consistent hashing). */ template<class K, class V, class H = std::hash<K>> //<Key, Val, HashFunc> class ConsistentHashing { size_t m_sizeNodes; std::map<size_t, std::list<std::pair<K, V>>*> m_sortedHashRing; const size_t m_sizeReplicas; const std::hash<std::string> m_hash; const H m_keyHash; size_t m_sizeKeys; double m_loadFactor; public: ConsistentHashing(size_t sizeNodes = 10, size_t sizeReplicas = 10, double loadFactor = 0.75f) : m_sizeNodes(0), m_sortedHashRing(), m_sizeReplicas(sizeReplicas), m_hash(), m_keyHash(), m_sizeKeys(0), m_loadFactor(0.75f) { for (size_t i = 0; i < sizeNodes; ++i) AddNode(i); } ~ConsistentHashing() { for (size_t i = 0; i < m_sizeNodes; ++i) RemoveNode(i); } private: std::vector<std::pair<K, V>> AddNode(size_t nodeIdx) { std::vector<std::pair<K, V>> res; std::list<std::pair<K, V>> * bucket = new std::list<std::pair<K, V>>(); for (size_t i = 0; i < m_sizeReplicas; ++i) //add m_sizeReplicas virtual nodes { size_t curHash = m_hash(std::to_string(nodeIdx) + "," + std::to_string(i)); //need a separator to avoid duplicated composite number which causes same hash std::pair<typename std::map<size_t, std::list<std::pair<K, V>> *>::iterator, bool> curItr = m_sortedHashRing.insert({curHash, bucket}); //all virtual nodes point to a same bucket auto nxtItr = m_sortedHashRing.upper_bound(curHash); //delete and return all the keys that need to be re-hashed if (nxtItr == m_sortedHashRing.end()) nxtItr = m_sortedHashRing.begin(); std::list<std::pair<K, V>> * bucket = nxtItr->second; for (auto itr = bucket->begin(); itr != bucket->end(); ) { size_t nxtHash = m_keyHash(itr->first); if (m_sortedHashRing.lower_bound(nxtHash) == curItr.first //1. nxtHash hits the current node || m_sortedHashRing.lower_bound(nxtHash) == m_sortedHashRing.end()) //2. curHash is m_sortedHashRing.begin() { res.push_back({itr->first, itr->second}); itr = bucket->erase(itr); } else ++itr; } m_sortedHashRing.erase(curItr.first); //erase curHash to make sure later upperBound(curHash) won't hit a newly added hash } for (size_t i = 0; i < m_sizeReplicas; ++i) //add m_sizeReplicas virtual nodes { size_t curHash = m_hash(std::to_string(nodeIdx) + "," + std::to_string(i)); //need a separator to avoid duplicated composite number which causes same hash std::pair<typename std::map<size_t, std::list<std::pair<K, V>> *>::iterator, bool> curItr = m_sortedHashRing.insert({curHash, bucket}); //all virtual nodes point to a same bucket } ++m_sizeNodes; return res; //return <key, val> pairs that need to be re-hashed } std::vector<std::pair<K, V>> RemoveNode(size_t nodeIdx) { std::vector<std::pair<K, V>> res; for (size_t i = 0; i < m_sizeReplicas; ++i) { size_t curHash = m_hash(std::to_string(nodeIdx) + "," + std::to_string(i)); auto curItr = m_sortedHashRing.lower_bound(curHash); if (curItr != m_sortedHashRing.end() && curItr->first == curHash) { if (i == 0) //remove all nodes for once { std::list<std::pair<K, V>> * bucket = curItr->second; for (auto itr = bucket->begin(); itr != bucket->end(); ) { res.push_back({itr->first, itr->second}); itr = bucket->erase(itr); } delete bucket; bucket = NULL; } m_sortedHashRing.erase(curItr); } else std::cout << "Warning: not found virtual node [nodeIdx,replicaIdx,hash]: [" << nodeIdx << "," << i << "," << curHash << "]" << std::endl; } --m_sizeNodes; return res; //return <key, val> pairs that need to be re-hashed } public: void ResizeNode(size_t n) { if (n == m_sizeNodes) return; std::vector<std::pair<K, V>> res; if (n > m_sizeNodes) { for (size_t i = m_sizeNodes; i < n; ++i) { std::vector<std::pair<K, V>> keyVal = AddNode(i); res.insert(res.end(), keyVal.begin(), keyVal.end()); } } else //n < m_size { for (size_t i = m_sizeNodes - 1; i >= n; --i) { std::vector<std::pair<K, V>> keyVal = RemoveNode(i); res.insert(res.end(), keyVal.begin(), keyVal.end()); } } for (auto & p : res) put(p.first, p.second);//re-hash <key, val> pairs } bool Put(const K & key, const V & val) { if (m_sizeNodes == 0) ResizeNode(2); else if (((double)(m_sizeKeys + 1)/(double)m_sizeNodes) > m_loadFactor) ResizeNode(m_sizeNodes*2); return put(key, val); } private: bool put(const K & key, const V & val) { size_t curHash = m_keyHash(key); auto curItr = m_sortedHashRing.lower_bound(curHash); if (curItr == m_sortedHashRing.end()) curItr = m_sortedHashRing.begin(); curHash = curItr->first; std::list<std::pair<K, V>> * bucket = curItr->second; for (auto itr = bucket->begin(); itr != bucket->end(); ++itr) { if (itr->first == key) { itr->second = val; return false; } } bucket->push_front({key, val}); ++m_sizeKeys; return true; } public: std::pair<V, bool> Get(const K & key) const { if (m_sizeNodes == 0) return {V(), false}; size_t curHash = m_keyHash(key); auto curItr = m_sortedHashRing.lower_bound(curHash); if (curItr == m_sortedHashRing.end()) curItr = m_sortedHashRing.begin(); std::list<std::pair<K, V>> * bucket = curItr->second; for (auto itr = bucket->begin(); itr != bucket->end(); ++itr) { if (itr->first == key) return {itr->second, true}; } return {V(), false}; } void Remove(const K & key) { if (m_sizeNodes == 0) return; size_t curHash = m_keyHash(key); auto curItr = m_sortedHashRing.lower_bound(curHash); if (curItr == m_sortedHashRing.end()) curItr = m_sortedHashRing.begin(); std::list<std::pair<K, V>> * bucket = curItr->second; for (auto itr = bucket->begin(); itr != bucket->end(); ) { if (itr->first == key) { itr = bucket->erase(itr); --m_sizeKeys; } else ++itr; } } void SetLoadFactor(double f) { if (f > 0 && f <= 1.0f) m_loadFactor = f; } double GetLoadFactor() const { return m_loadFactor; } size_t GetNodeSize() const { return m_sizeNodes; } size_t GetKeySize() const { return m_sizeKeys; } }; |
Copyright © 2016-2020 Qualgame, LLC