Pyramid Sketch: a Sketch Framework for Frequency Estimation of Data Streams

Tong Yang, Yang Zhou, Hao Jin, Shigang Chen, Xiaoming Li

1

Department of Computer Science and Technology, Peking University, China

2

Collaborative Innovation Center of High Performance Computing, NUDT, Changsha, China

3

Department of Computer & Information of Science & Engineering, University of Florida, USA

Email: yangtongemail@gmail.com, zhou.yang@pku.edu.cn, jin.hao@pku.edu.cn, sgchen@cise.ufl.edu, lxm@pku.edu.cn

ABSTRACT

Sketch is a probabilistic data structure, and is used to store and query the frequency of any item in a given multiset. Due to its high memory efficiency, it has been applied to various fields in computer science, such as stream database, network traffic measurement, etc. The key metrics of sketches for data streams are accuracy, speed, and memory usage. Various sketches have been proposed, but they cannot achieve both high accuracy and high speed using limited memory, especially for skewed datasets. To address this issue, we propose a sketch framework, the Pyramid sketch, which can significantly improve accuracy as well as update and query speed. To verify the effectiveness and efficiency of our framework, we applied our framework to four typical sketches. Extensive experimental results show that the accuracy is improved up to 3.50 times, while the speed is improved up to 2.10 times. We have released our source codes at Github [1].

1.

INTRODUCTION

1.1

Background and Motivation

Given a multiset, estimating the frequency of each item is a critical problem in data stream applications. A multiset refers to a set in which each item can appear multiple times. In scenarios such as real-time IP traffic, graph streams, web clicks and crawls, sensor database, and natural language processing (NLP) [2–6], the massive data are often organized as high-speed streams, requiring servers to record stream information in real time. Due to the high speed of data streams, it is often impractical to achieve accurate recording and estimating of item frequencies. Therefore, estimation of item frequencies by probabilistic data structures becomes popular and gains wide acceptances [7–9]. Sketches are initially designed for the estimation of item frequencies in data streams [10–15], and now have been applied

∗

Shigang Chen is the corresponding author of this paper. This work is partially supported by National Basic Research Program of China (2014CB340400). This work is licensed under the Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License. To view a copy of this license, visit http://creativecommons.org/licenses/by-ncnd/4.0/. For any use beyond those covered by this license, obtain permission by emailing info@vldb.org.

Proceedings of the VLDB Endowment, Vol. 10, No. 11 Copyright 2017 VLDB Endowment 2150-8097/17/07.

to many more fields, such as sparse approximation in compressed sensing [16], natural language processing [17, 18], data graph [19, 20], and more [21]. Note that we mainly focus on the sketches used for frequency estimation in this paper.

According to our analysis of real datasets and literatures [7, 9], the item frequencies in data streams are often highly skewed. In other words, most items are cold (i.e., have a low frequency), while a few items are hot (i.e., have a high frequency). For convenience, we use hot items and cold items to represent them in this paper. All existing sketches use counters to store frequencies, but it is difficult to find a proper size for the counters to fit these highly skewed data streams. For example, the frequencies of most items are cold (< 16), while the frequencies of a few hot items are larger than 40,000. Given the size of memory usage, 1) if each counter is 4 bits wide, the number of counters (C) will be large, and the estimation of cold items will be very accurate. Unfortunately, hot items will incur overflows of counters, and this can hardly be acceptable in many applications. 2) If we allocate 16 bits to each counter, the number of counters will decrease to C/4, and the accuracy of the sketch will drop drastically. What is worse, the frequency of the hottest item is unknown in many applications, which makes it hard to determine the counter size. Unfortunately, existing sketches (CM sketches [8], CU sketches [22], Count sketches [23], and Augmented sketches [7]) have to allocate enough bits for each counter, thus can hardly work well in real data streams that are highly skewed. The design goal of this paper is to devise a framework which not only prevents counters from overflowing without the need of knowing the frequency of the hottest item in advance, but also can achieve high accuracy, high update speed, and high query speed at the same time.

1.2

The Proposed Solution

In this paper, we propose a sketch framework, namely the Pyramid sketch, as it employs a pyramid-shaped data structure. The key idea of our Pyramid framework is to automatically enlarge the size of the corresponding counters according to the current frequency of the incoming item, while achieving close to 1 memory access and 1 hash computation for each insertion. Our proposed enlarging strategy uses geometric progression to guarantee that any practical large frequency can be represented within bounded memory usage. The pivotal technique is counter-pair sharing, which can significantly improve the accuracy.

Considering the significance of insertion speed in highspeed data streams, we further propose another four techniques: 1) word constraint, 2) word sharing, 3) one hashing, and 4) Ostrich policy, to accelerate the insertion speed, while keeping the high accuracy.

To verify the universality of our sketch framework, we apply Pyramid to four typical sketches: CM sketches, CU sketches, Count sketches, and A sketches. In real data streams, we recommend using Psketch which represents the CU sketch using Pyramid, as it achieves the highest accuracy and highest insertion speed at the same time.

2.

RELATED WORK

The issue of estimation of the item frequency in a multiset is a fundamental problem in databases. The solutions can be divided into the following three categories: sketches, Bloom filter variants, and counter variants.

Sketches: Typical sketches include CM sketches [8], CU sketches [22], Count sketches [23], Augmented sketches [7], and more [24, 25]. A comprehensive survey about sketch algorithms is provided in the literature [9]. A CM sketch [8] consists of d arrays, A...A, and each array consists of w counters. There are d hash functions, h(.) . . . h(.) (1 6 h(.) 6 w). When inserting an item e, the CM sketch increments all the d mapped countersA[h(e)]. . . A[h(e)] by 1. When querying an item e, the CM sketch reports the minimal one among the d mapped counters. The CU sketch [22] is similar to the CM sketch except that it only increments the smallest counter(s)among the d mapped counters for each insertion. The Count sketch [23] is also similar to the CM sketch except that each array is associated with two hash functions. The Augmented sketch targets at improving accuracy by using one additional filter to dynamically capture hot items, suffering from complexities, slow update and query speed. The A sketch adds an additional filter (which is actually a queue with k items) to an existing sketch T . The A sketch is very accurate only for those items in the filter. The authors focus on querying the hot items by using query sets sampled from the whole multiset. That is why their experimental results of the A sketch significantly outperform the CM sketch. Among these sketches, the CU sketch [22] achieves the highest accuracy when using the same amount of memory. Unfortunately, all these sketches have three shortcomings for skewed datasets: 1) not accurate enough; 2) requiring multiple memory accesses and hash computations for each insertion; 3) requiring to know the approximate frequency of the hottest item in advance.

Bloom filter variants: The second kind of solutions is based on Bloom filter [26]. A standard Bloom filter can tell whether an item belongs to a set or not, but cannot estimate its frequency. Counting Bloom filters (CBF) [27] can be used to estimate the frequency of items in a multiset. CBF is quite similar to the CM sketches, except that CBF uses only one array. Several improvements based on CBF have been proposed, such as the Spectral Bloom Filter (SBF) [28], the Dynamic Count Filter (DCF) [29] and the Shifting Bloom filter [30, 31], and they all can store frequencies of items.

Counter variants: Two typical algorithms are the counter braids [32] and Random Counters [2]. Counter braids can

1

For convenience, we call them d mapped counters.

2 We use counter(s) because there may be multiple smallest

counters.

report the frequencies of all items one time by post processing. It needs to know the IDs of all distinct items. The authors of Counter braids also admit that it cannot support instant point query [32]. The estimation method in Random Counters [2] is called CSM. It can achieve fast update speed at the cost of accuracy.

Summary: Although there are various algorithms for frequency estimation of multisets, no existing sketch can achieve high accuracy and one memory access per insertion, especially for skewed datasets.

3.

PYRAMID SKETCH FRAMEWORK

In this section, we present two key techniques of our Pyramid framework: counter-pair sharing and word acceleration. Counter-pair sharing is used to dynamically assign appropriate number of bits for different items with different frequencies. Word acceleration can achieve one memory access and one hash computation per update, so as to significantly accelerate the update speed of sketches. We also present one further optimization method: Ostrich policy. Note that we introduce the techniques not in isolation, but one at a time on top of all previous techniques.

3.1

Technique I: Counter-pair Sharing

Data Structure: As shown in Figure 1, our Pyramid framework consists of λ layers, where we represent the i layer with L. Lconsists of wcounters where w= w/2 (1 6 i 6 λ − 1), and each counter contains δ bits. We represent the jcounter of the ilayer with L[j]. The first layer Lis associated with d independent hash functions h(.) (1 6 i 6 d), whose outputs are uniformly distributed in the range [1, w]. The ilayer Lis associated with the i + 1layer Lin the following way: two adjacent counters at Lare associated with one counter at L. In other words, L[2j − 1] and L[2j] are associated with L[j]. L[2j − 1] and L[2j] are defined as the sibling counters. L[j] is defined as the parent counter of L[2j − 1] and L[2j]. L[2j − 1] is defined as the left child counter of L[j], and L[2j] is defined as the right child counter of L[j].

There are two types of counters: pure counters and hybrid counters. The first layer Lis composed of pure counters, while the other layers are composed of hybrid counters. The pure counter is only used for recording the frequencies. In other words, all the δ bits of the pure counters are used for counting, representing a range [0, 2− 1]. The hybrid counter with δ bits is split into three parts: the left flag, the counting part, the right flag. The left flag (1 bit) indicates whether its left child counter is overflowed, while the right flag (1 bit) indicates whether its right child counter is overflowed. The counting part (δ −2 bits) ranging from 0 to 2− 1 is used for counting. For convenience, we use L[j].lf lag, L[j].count, L[j].rf lag to represent the three parts of counter L[j].

There are following three primary operations in our Pyramid framework: insertion, deletion, and query. Initially, all the counters at all the layers are 0. Insertion: When inserting an item e, we first compute the d hash functions h(e), h(e), ..., h(e) (1 6 h(.) 6 w) to locate the d mapped counters L[h(e)], L[h(e)], ..., L[h(e)] at layer L. Different sketches will perform different incrementing operations on these d counters. During

… …

… …

...

Hybrid Counter

Pure Counter

𝐿𝐿

𝐿𝐿

𝐿𝐿

...

𝐿𝐿

𝐿𝐿

parent counter

left child counter

right child counter

left flag

right flag

counting part

e

𝐿𝐿

3

1

1

2

1

0

(a)

𝐿𝐿[𝑗𝑗 ]

𝐿𝐿[𝑗𝑗 ]

𝐿𝐿[𝑗𝑗]

4

0

1

1

3

1

0

e

(b)

0

+ e*12

- e*1

3

1

1

2

1

0

(c)

15

0

0

0

0

0

0

0

0

0

𝐿𝐿[𝑗𝑗 ]

e

e

… …

Figure 1: Counter-pair sharing technique.

Different sketches will perform different incrementing operations on these d counters the incrementing process, if any of the d counters overflows, we simply record the overflow in the parent counter. This is called carryin.

The carryin operation is based on the following observation about practical datasets that are skewed: the number of overflowed counters is small, and in most cases at most one of the sibling counters overflows. Consider that L[j] overflows. Let its parent counter be L[j]. Without loss of generality, assume L[j] is the left child of L[j]. We check L[j].lf lag. If the flag is off, we turn it on and increment L[j].count; if the flag is on, we only increment L[j].count. If L[j].count does not overflow, insertion ends. Otherwise, we repeat the carryin operation at layer L, and the operation will be performed layer by layer until there is no overflow. In this way, we dynamically assign an appropriate number of higher-order bits to record the incoming items, so as to minimize memory waste.

Example I: As shown in Figure 1 (a)(b), each pure counter consists of 4 bits, so does each hybrid counter. In each hybrid counter, the counting parts consist of 2 bits. As shown in the figure, the value of L[j], the three parts of L[j] and L[j] and L[j] are 4, < 1, 3, 1 >, < 0, 2, 1 >, < 0, 0, 0 >, respectively. Suppose L[j] is incremented by 12, L[j] overflows, and the carryin operations are performed as follows.

1)

L[j] is set to 0;

2)

L[j].lf lag keeps on;

3)

L[j].count is set to 0;

4)

L[j].rf lag keeps on;

5)

L[j].count is incremented to 3.

Deletion: This framework supports deletions if and only if the sketch used in our framework supports deletions, such as the CM sketch [8], the Count sketch [23], the CSM sketch [2], etc. To delete an item e, we first check the d mapped counters L[h(e)], L[h(e)], ..., L[h(e)], and perform different deletion strategies according to the specific sketch. The operation of decrementing a counter is exactly the reverse process of incrementing a counter. Specifically, to decrement a pure counter L[j], if it is non-zero, we just decrement it by 1. Otherwise, we set L[j] to the maximum value (2− 1), and then decrement its parent counter recursively. Without loss of generality, we only show the situation of only accessing or modifing left flags. To decrement a hybrid counter L[j], L[j].lf lag must be on. There are the following three cases. 1) If L[j].count is larger than 1, we just decrement it by 1. 2) If L[j].count is 0, we just set it to 2−1, and then decrement its parent counter recursively. 3) If L[j].count is 1, we first set it to 0, and turn L[j].lf lag off if the left flag of its parent counter is off.

Example II: As shown in Figure 1 (b)(c), the parameters of pure counter and hybrid counters are same as those of Example I. As shown in the figure, the value of L[j], the three parts of L[j] and L[j] and L[j] are 0, < 1, 0, 1 >, < 0, 3, 1 >, < 0, 0, 0 >, respectively. Suppose L[j] is decremented by 1 and the deletion operations are performed as follows.

1)

L[j] is set to 15;

2)

L[j].lf lag keeps on, and L[j].count is set to 3;

3)

L[j].rf lag keeps on, and L[j].count is set to 2.

Algorithm 1: ReportVal(i, j).

/* assume that all the child counters are the left child counters

*/

1 if i==1 then

2

return L[j] + ReportV al(i + 1, j);

3 if L[j].lf lag==false then

4

return 0;

5 if L[j].lf lag==true && L[j].rf lag==true then

6

return (L[j].count − 1) × 2+

ReportV al(i + 1, j)

7 else

8

return

L[j].count × 2+ ReportV al(i + 1, j)

Query: When querying an item e, we first compute d hash functions to find the d mapped counters L[h(e)], L[h(e)], ..., L[h(e)]. These d mapped counters at layer Lshare the same array of wcounters. Below we focus on describing the operations of querying the first mapped counter L[j] (j= h(e)), and the operations for other d − 1 counters are analogous. Let the parent counter and the ancestor counters of L[j] be L[j], L[j], ..., L[j], respectively. Without loss of generality, we consider the case where L[j] is the left child of L[j], f or1 ≤ i < λ. According to Algorithm 1, we recursively assemble the counter value top-down, layer by layer, until the left flag is off. The result is a final reported value ReportV al(1, j), also denoted as R(L[j]) for convenience. In line 5 − 8, if the left flag and right flag of L[j] are both on, its left child counter and right child counter must have both overflowed at least once. Therefore, we subtract 1 from L[j].count, so as to reduce the overestimation error incurred due to collision in counter-pair sharing. After obtaining the d reported value: R(L[h(e)]), R(L[h(e)]), ..., R(L[h(e)]), we produce the query output based on the specific sketch under

R(L[h(e)]) use. For example, for CM and CU, we simply report the minimum value of the d reported values.

Example III: As shown in Figure 1 (c), the parameters of pure counter and hybrid counters are same as those of Example I. As shown in the figure, the value of L[j], the three parts of L[j] and of L[j] and L[j] are 15, < 1, 3, 1 >, < 0, 2, 1 >, < 0, 0, 0 >. The process of getting R(L[j]) is shown as follows.

1)

L[j] is 15;

2)

L[j].lf lag is on and L[j].count is 3;

3)

L[j].rf lag is on and L[j].count is 2;

4)

L[j].lf lag is off and the recursive operation ends; 5) The reported value is 15+(3−1)∗2+(2−0)∗2= 175.

3.2 Technique II: Word Acceleration

Word Constraint Technique: In the word constraint technique, we make two minor modifications: 1) we set the counter size δ to 4 bits; 2) we confine the d mapped counters at layer Lto a single machine word. In this way, the average number of memory accesses per insertion or query is significantly reduced. Let the size of a machine word be W bits. Each machine word is comprised of W/4 counters. Because there are wcounters at layer L, that translates to 4w/W machine words. Layer Lis associated with d + 1 hash functions h(.) (1 6 i 6 d + 1). The first hash function is used to associate each item with a specific word Ω, and the remaining d hash functions are used to identify d mapped counters in the word Ω. Our word constraint technique is based on the following facts: 1) In our framework, each counter is small (e.g., 4 bits), while a machine word is usually 64 bits wide on many of today’s CPUs. 2) The size of a machine word can be much larger on GPU platforms. For example, one memory access can read up to 1024 bits in some commodity GPUs [33].

Therefore, one machine word on CPU or GPU can typically contain a reasonably large number of small counters used in our framework. Note that the actual operations of insertion, deletion and query stay the same under the word constraint.

Obviously, after using the word constraint technique, the average numbers of memory accesses per insertion, deletion or query are reduced to around 1/d, as all the d mapped counters can be read/written within one memory access. Next, we derive an upper-bound of the average number of memory accesses for each insertion as follows. When inserting an item, suppose Pr( layer Loverflows ) < ρ and Pr(layer Loverflows | layer Loverflows) < σ (< 1) (1 6 i < λ). The average number of memory accesses represented by t is determined by the following formula.

t < 1 +

∞

∑

k=0

ρσ

= 1 +

ρ

1

− σ

(1)

In our experiments of IP traces, ρ is approximate to 0.05 and σ is approximate to 0.25. Therefore, the average number of memory accesses for each insertion t is less than 1 + 0.05/(1 − 0.25) ≈ 1.07, which is consistent with our experimental results shown in Figure 19. Note that one can simply improve the accuracy by confining the d mapped counters into two or more separated machine words.

On top of the Pyramid sketch with only counter-pair sharing, adding the technique of word constraint helps reduce memory accesses per insertion or query, but incurs severe

e

𝐿

𝐿

𝐿

e

𝐿

𝐿

𝐿

Word sharing

A machine word

Figure 2: Example of word sharing technique.

accuracy loss in the meantime. The main reason for accuracy loss is that after implementing the carryin operation, the probability of collision among counters in the same machine word increases sharply at higher layers. More specifically, given an item e, its d counters at layer Lare mapped to one machine word, while the parent counters of these d counters are mapped to half of a word at layer L. The ancestor counters are mapped to even small ranges in a word at higher layers, resulting in more collisions. To address this issue, we propose a new technique as follows. Word Sharing Technique: The methodology of this technique is managing to make the parent counters and ancestor counters of the d mapped counters always fall in a constant range (i.e., a machine word) instead of smaller and smaller ranges, so as to reduce collisions. Specifically, our word sharing technique works as follows: 1) Similar to the definition of parent counter, left child counter, right child counter, we have left child word, right child word, and parent word, where the left child word and the right child word are adjacent at layer L, sharing the same parent word at the next layer L. 2) The icounter in the left child word and the icounter in the right child word share the icounter in the parent word. In this way, the collisions in counter-pair sharing are alleviated, and the accuracy is significantly improved.

Example IV: In Figure 2, we set λ = 3, d = 2, and w= 16. Each machine word consists of 4 counters. Without the word sharing technique, the d mapped counters at layer Lbelong to a machine word, their parent counters belong to half of a machine word at layer L, and their layer-3 ancestors belong to a quarter word, which is one counter in this example. In contrast, with the word sharing technique, the parent/ancestor counters of the d mapped counters always fall in a single machine word at each layer. One Hashing Technique: More hash computations result in slower speeds for update and query. Ideally, only one hash computation is performed for each insertion, deletion or query. Towards this goal, we propose to use one hashing technique. The idea is to split the value that a hash function produces into several segments, and each segment is used to locate a word or a counter, so as to reduce the hash computation. A hash function usually produces a value of 32 or 64 bits. Given a hash function with 32-bit output, we may use the first 16 bits to locate a word in a Pyramid sketch. Suppose a word is 64 bits long. Locating one of the 16 counters in a word requires 4 hash bits. In total, we need 16 hash bits to locate 4 counters in this word. In this way, we can use only one hash computation to handle a Pyramid sketch which originally requires 4 hash computations with at most 2words at the layer L. Similarly, we can use a hash functions with 64-bit output to support a Pyramid sketch (d = 4) with at most 2words, i.e., 2048 TB memory at the layer L, which should be large enough for all practical cases.