ska::flat_hash_map 源码分析

背景

最近在调研各种hashmap.. 发现ska::flat hash map性能优秀。。于是来看看代码。。 发现最大的特点是,ska::flat_hash_map使用了带probe count上限的robin hood hashing

相关概念

Distance_from_desired

对于采用了open addressing的hash实现,当插入发生冲突时,会以一定方式(如线性探测、平方探测等)来探测下一个可以插入的slot. 因而实际插入的slot位置与理想的slot位置通常不相同,这段距离定义为distance_from_desired 在没有冲突的理想情况下,所有distance_from_desired的值应该都为0 distance_from_desired的一种更常见的说法叫做probe sequence lengths(PSL)

robin hood hashing

robin hood hashing的核心思想是"劫富济贫" distance_from_desired小的slot被认为更"富有",distance_from_desired大的slot被认为更"贫穷" 具体来说,当去插入一个新的元素时,如果当前位置的元素的distance_from_desired要比待插入元素的distance_from_desired要小,那么就将待插入元素放入当前位置,将当前位置的元素取出,寻找一个新的位置。

这样做使得所有元素的distance_from_desired的分布更为平均,variance更小。 这样的分布对cache更友好(几乎全部元素distance_from_desired都小于一个cache line的长度,因此在find的时候只需要fetch一次cache line),从而拥有更好的性能。

一般的robin hashing 在find时,一般用一个全局的最大distance_from_desired作为没有找到该元素终止条件。 一种常见的改进是,不维护全局最大distance_from_desired,而是在看到当前位置元素的distance_from_desired比要插入的元素的distance_from_desired小时终止。

 1
 2  iterator find(const FindKey& key) {
 3    size_t index =
 4        hash_policy.index_for_hash(hash_object(key), num_slots_minus_one);
 5    EntryPointer it = entries + ptrdiff_t(index);
 6    for (int8_t distance = 0; it->distance_from_desired >= distance;
 7         ++distance, ++it) {
 8      if (compares_equal(key, it->value)) return {it};
 9    }
10    return end();
11  }
12

带上限的robin hashing

一般的robin hashing在insert时,会不断进行寻找(包括了可能的swap过程),直到找到一个空的slot为止。该过程在hash table较满时可能接近线性的时间复杂度。 ska::flat_hash_map对这一点的改进是,限制了insert时尝试的上限次数,作者给出的经验值为log(N),其中N为slots的个数。 这样保证每个slot的最大distance_from_desired不会超过log(N)

关键实现

emplace

插入一个元素 分析见注释 其中emplace 函数主要是查找是否已经存在该元素+调整到合适的插入位置 emplace_new_key函数执行真正的emplace操作

 1
 2  template <typename Key, typename... Args>
 3  std::pair<iterator, bool> emplace(Key&& key, Args&&... args) {
 4    size_t index =
 5        hash_policy.index_for_hash(hash_object(key), num_slots_minus_one);
 6    EntryPointer current_entry = entries + ptrdiff_t(index);
 7    int8_t distance_from_desired = 0;
 8    // 插入前先查找是否存在。。。
 9    // 只需要查找有限的距离
10
11    // trick在于。。初始 current_entry->distance_from_desired为-1
12    // 此时不会进入for loop,直接进行emplace_new_key。
13    // 该for loop有两层意义: 1.在index位置不为空时找到合适的位置: 空的slot或者更富有的slot(也就是current_entry->distance_from_desired < distance_from_desired的slot)
14    // 2.在该过程中找一下是否已经插入了该值
15    for (; current_entry->distance_from_desired >= distance_from_desired;
16         ++current_entry, ++distance_from_desired) {
17      if (compares_equal(key, current_entry->value))
18        return {{current_entry}, false};
19    }
20    return emplace_new_key(distance_from_desired, current_entry,
21                           std::forward<Key>(key), std::forward<Args>(args)...);
22  }
23
 1
 2  template <typename Key, typename... Args>
 3  SKA_NOINLINE(std::pair<iterator, bool>)
 4  emplace_new_key(int8_t distance_from_desired, EntryPointer current_entry,
 5                  Key&& key, Args&&... args) {
 6    using std::swap;
 7    // num_slots_minus_one初始值为0,表示第一次进行插入。。需要先进行grow..很合理。。
 8    // 如果得到max_load_factor,或者查找次数达到max_lookups,就进行rehash
 9    if (num_slots_minus_one == 0 || distance_from_desired == max_lookups ||
10        num_elements + 1 >
11            (num_slots_minus_one + 1) * static_cast<double>(_max_load_factor)) {
12      grow();
13      return emplace(std::forward<Key>(key), std::forward<Args>(args)...);
14    } else if (current_entry->is_empty()) {
15      current_entry->emplace(distance_from_desired, std::forward<Key>(key),
16                             std::forward<Args>(args)...);
17      ++num_elements;
18      return {{current_entry}, true};
19    }
20
21    // 执行到这里,说明有更富有的slot。于是进行swap,转而为被换出的pair<key,value>找一个新的slot
22    // to_insert是当前要插入的,由于swap的发生,可能并不是最初要插入的那一对值
23    value_type to_insert(std::forward<Key>(key), std::forward<Args>(args)...);
24    swap(distance_from_desired, current_entry->distance_from_desired);
25    swap(to_insert, current_entry->value);
26    iterator result = {current_entry};
27    for (++distance_from_desired, ++current_entry;; ++current_entry) {
28      if (current_entry->is_empty()) {
29        // 如果被换过的slot后面某个slot是空的。。就直接放置了
30        current_entry->emplace(distance_from_desired, std::move(to_insert));
31        ++num_elements;
32        return {result, true};
33      } else if (current_entry->distance_from_desired < distance_from_desired) {
34        // 在找新slot的过程中,仍然进行劫富济贫的操作, 转而为被换出的pair<key,value>找一个新的slot
35        swap(distance_from_desired, current_entry->distance_from_desired);
36        swap(to_insert, current_entry->value);
37        ++distance_from_desired;
38      } else {
39        // 如果没有空的slot,也没有更富有的slot,那就只能继续往前寻找了,直到达到上限
40        ++distance_from_desired;
41        if (distance_from_desired == max_lookups) {
42          // 如果找了max_lookups个位置还没找到,就进行rehash
43          swap(to_insert, result.current->value);
44          grow();
45          return emplace(std::move(to_insert));
46        }
47      }
48    }
49  }
50

rehash

 1  void rehash(size_t num_buckets) {
 2    num_buckets = std::max(
 3        num_buckets,
 4        static_cast<size_t>(
 5            std::ceil(num_elements / static_cast<double>(_max_load_factor))));
 6    if (num_buckets == 0) {
 7      reset_to_empty_state();
 8      return;
 9    }
10    auto new_prime_index = hash_policy.next_size_over(num_buckets);
11    if (num_buckets == bucket_count()) return;
12    int8_t new_max_lookups = compute_max_lookups(num_buckets);
13    // 额外分配了max_lookups个slots,避免了find时bound checking的开销
14    EntryPointer new_buckets(
15        AllocatorTraits::allocate(*this, num_buckets + new_max_lookups));
16    EntryPointer special_end_item =
17        new_buckets + static_cast<ptrdiff_t>(num_buckets + new_max_lookups - 1);
18    for (EntryPointer it = new_buckets; it != special_end_item; ++it)
19      it->distance_from_desired = -1;
20    special_end_item->distance_from_desired = Entry::special_end_value;
21    std::swap(entries, new_buckets);
22    std::swap(num_slots_minus_one, num_buckets);
23    --num_slots_minus_one;
24    hash_policy.commit(new_prime_index);
25    int8_t old_max_lookups = max_lookups;
26    max_lookups = new_max_lookups;
27    num_elements = 0;
28    // new_buckets其实是旧的entries..
29    // num_buckets其实也是旧的值。。因为已经被swap了
30    for (EntryPointer
31             it = new_buckets,
32             end = it + static_cast<ptrdiff_t>(num_buckets + old_max_lookups);
33         it != end; ++it) {
34      if (it->has_value()) {
35        emplace(std::move(it->value));
36        it->destroy_value();
37      }
38    }
39    deallocate_data(new_buckets, num_buckets, old_max_lookups);
40  }
41

一些其他trick

通过多分配log(N)的slot消除bound checking的开销

代码见上面的rehash部分 由于每个元素的最大distance_from_desired不会超过log(N),因此可以保证查找时不需要做Bound checking. 使得find部分的实现非常简洁:

使用素数个slot而不是2的整数次幂个slot

2的整数次幂个slot是一种很常见的实现。 这种实现的主要好处是在将hash转换为index时,避免了代价高昂取模操作,而是用代价很小的按位与(&)替代

但是使用2的整数次幂个slot的缺点是,取模后得到的结果较少,比起使用素数个slot更容易发生冲突。 ska::flat_hash_map的作者借鉴了boost::multi_index中的做法,将变量展开为compile time const(对compile time const做取模运算要远远快于变量的取模运算),从而减小了这部分开销的影响。

 1
 2struct prime_number_hash_policy {
 3  static size_t mod0(size_t) {
 4    return 0llu;
 5  }
 6  static size_t mod2(size_t hash) {
 7    return hash % 2llu;
 8  }
 9  static size_t mod3(size_t hash) {
10    return hash % 3llu;
11  }
12  static size_t mod5(size_t hash) {
13    return hash % 5llu;
14  }
15  static size_t mod7(size_t hash) {
16    return hash % 7llu;
17  }
18  static size_t mod11(size_t hash) {
19    return hash % 11llu;
20  }
21  static size_t mod13(size_t hash) {
22    return hash % 13llu;
23  }
24  static size_t mod17(size_t hash) {
25    return hash % 17llu;
26  }
27  static size_t mod23(size_t hash) {
28    return hash % 23llu;
29  }
30  static size_t mod29(size_t hash) {
31    return hash % 29llu;
32  }
33  static size_t mod37(size_t hash) {
34    return hash % 37llu;
35  }
36  static size_t mod47(size_t hash) {
37    return hash % 47llu;
38  }
39  ...
40