398 lines
9.4 KiB
Go
398 lines
9.4 KiB
Go
package bloom
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"errors"
|
||
"math"
|
||
"os"
|
||
"sort"
|
||
"sync"
|
||
|
||
"github.com/RoaringBitmap/roaring/roaring64"
|
||
"github.com/klauspost/compress/zstd"
|
||
"google.golang.org/protobuf/proto"
|
||
)
|
||
|
||
/*
|
||
主体代码来源于https://github.com/bits-and-blooms/bloom
|
||
在此文件中,将bitmap的管理改用roaring bitmap实现
|
||
algotao 2022-08-29
|
||
*/
|
||
|
||
type BloomFilter struct {
|
||
m uint64 // 存贮空间上限
|
||
k uint64 // hash函数个数
|
||
elementsMax uint64 // 元素数量上限
|
||
elementsAdded uint64 // 已加入的元素数量
|
||
falsePositiveRate float64 // 假阳率
|
||
rb *roaring64.Bitmap // 位图
|
||
chOne chan []uint64 // 接收每个插入的hash索引
|
||
chInsert chan []uint64 // 接收排好序的hash索引进Bitmap
|
||
chSortJobQuota chan int // 排序工作的配额控制
|
||
buf []uint64 // 缓冲
|
||
wgJobs sync.WaitGroup
|
||
}
|
||
|
||
type BloomFilterStat struct {
|
||
M uint64
|
||
K uint64
|
||
ElementsMax uint64
|
||
ElementsAdded uint64
|
||
FalsePositiveRate float64
|
||
}
|
||
|
||
const (
|
||
headerVersion1 = 1
|
||
)
|
||
|
||
// bitmapFileHeader 存贮文件头
|
||
type bitmapFileHeader struct {
|
||
Size uint64 // Header Protobuf size
|
||
}
|
||
|
||
// NewWithEstimates 创建一个BloomFilter,并期望有n个元素,<fp的误匹配率
|
||
func NewWithEstimates(e uint64, fr float64) *BloomFilter {
|
||
m, k := EstimateParameters(e, fr)
|
||
return newBloomFilter(m, k, e, fr)
|
||
}
|
||
|
||
// new 创建一个新的BloomFilter,具有 _m_ bits 和 _k_ hashing 函数
|
||
func newBloomFilter(m uint64, k uint64, e uint64, fr float64) *BloomFilter {
|
||
b := &BloomFilter{
|
||
m: max(1, m),
|
||
k: max(1, k),
|
||
elementsMax: e,
|
||
falsePositiveRate: fr,
|
||
rb: roaring64.New(),
|
||
chOne: make(chan []uint64, 1024*1024), // 索引缓冲区chan
|
||
chInsert: make(chan []uint64, 2), // 插入队列
|
||
chSortJobQuota: make(chan int, 8), // 排序队列
|
||
}
|
||
|
||
for i := 0; i < cap(b.chSortJobQuota); i++ {
|
||
b.chSortJobQuota <- 0
|
||
}
|
||
|
||
//log.Printf("Init quota len(%v), cap(%v)\n", len(b.chSortJobQuota), cap(b.chSortJobQuota))
|
||
|
||
go b.consumeOne()
|
||
go b.consumeInsert()
|
||
|
||
return b
|
||
}
|
||
|
||
// 生成 m 和 k
|
||
func EstimateParameters(n uint64, p float64) (m uint64, k uint64) {
|
||
m = uint64(math.Ceil(-1 * float64(n) * math.Log(p) / math.Pow(math.Log(2), 2)))
|
||
k = uint64(math.Ceil(math.Log(2) * float64(m) / float64(n)))
|
||
return
|
||
}
|
||
|
||
// location 返回当前位置的hash值
|
||
func (b *BloomFilter) location(h [4]uint64, i uint64) uint64 {
|
||
return (h[i%2] + i*h[2+(((i+(i%2))%4)/2)]) % b.m
|
||
//return 0
|
||
}
|
||
|
||
// baseHashes 生成4个hash值,用于生产key
|
||
func (b *BloomFilter) baseHashes(data []byte) [4]uint64 {
|
||
h := New128()
|
||
h.Write(data)
|
||
|
||
h1, h2 := h.Sum128()
|
||
|
||
h.Write([]byte{1})
|
||
h3, h4 := h.Sum128()
|
||
|
||
return [4]uint64{
|
||
h1, h2, h3, h4,
|
||
}
|
||
}
|
||
|
||
// 消费一个计算好的bloom bits对象,并填入缓冲。当缓冲半满时发送给排序队列处理
|
||
func (b *BloomFilter) consumeOne() {
|
||
batchSize := cap(b.chOne) * int(b.k) / 2 //一半buffer满了就开始处理,即一半个数的uint64。或是遇到Flush标志(bits长度0),则刷缓冲
|
||
|
||
for bits := range b.chOne {
|
||
if len(bits) != 0 {
|
||
b.elementsAdded++
|
||
}
|
||
|
||
b.buf = append(b.buf, bits...)
|
||
if len(b.buf) >= batchSize || len(bits) == 0 {
|
||
|
||
buf := b.buf[:]
|
||
b.buf = []uint64{}
|
||
b.wgJobs.Add(1)
|
||
|
||
// 如果接收到了Flush标志,则在处理最后buffer后,减一次waitgroup
|
||
if len(bits) == 0 {
|
||
b.wgJobs.Done()
|
||
}
|
||
|
||
//等待有可用排序配额,如成功则消耗一个配额
|
||
<-b.chSortJobQuota
|
||
|
||
go func() {
|
||
sort.Slice(buf, func(i, j int) bool { return buf[i] < buf[j] })
|
||
|
||
//提交至插入任务
|
||
b.chInsert <- buf
|
||
|
||
//恢复1个配额
|
||
b.chSortJobQuota <- 1
|
||
}()
|
||
|
||
}
|
||
}
|
||
}
|
||
|
||
// 将批量bits写到bitmap
|
||
func (b *BloomFilter) consumeInsert() {
|
||
for bitsBatch := range b.chInsert {
|
||
b.rb.AddMany(bitsBatch)
|
||
b.wgJobs.Done()
|
||
}
|
||
}
|
||
|
||
// Add 添加数据的Hash位图
|
||
func (b *BloomFilter) Add(data []byte) *BloomFilter {
|
||
h := b.baseHashes(data)
|
||
bits := make([]uint64, b.k)
|
||
for i := uint64(0); i < b.k; i++ {
|
||
bits[i] = b.location(h, i)
|
||
}
|
||
b.chOne <- bits //将一个计算好的bloom bits发送到待处理队列
|
||
|
||
return b
|
||
}
|
||
|
||
// AddString 添加字符串的Hash位图
|
||
func (b *BloomFilter) AddString(data string) *BloomFilter {
|
||
return b.Add([]byte(data))
|
||
}
|
||
|
||
// Test 如果命中Hash位图,则返回真 (有误匹配率)
|
||
func (b *BloomFilter) Test(data []byte) bool {
|
||
h := b.baseHashes(data)
|
||
for i := uint64(0); i < b.k; i++ {
|
||
if !b.rb.Contains(b.location(h, i)) {
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
// TestString 如果命中字符串Hash位图,则返回真 (有误匹配率)
|
||
func (b *BloomFilter) TestString(data string) bool {
|
||
return b.Test([]byte(data))
|
||
}
|
||
|
||
// Flush 将缓冲中的待处理Bit写入Bitmap
|
||
func (b *BloomFilter) Flush() {
|
||
b.wgJobs.Add(1)
|
||
|
||
//发出Flush指令
|
||
b.chOne <- []uint64{}
|
||
|
||
b.wgJobs.Wait()
|
||
}
|
||
|
||
// free 将缓冲中的待处理Bit写入Bitmap
|
||
func (b *BloomFilter) free() {
|
||
close(b.chOne)
|
||
close(b.chInsert)
|
||
b.rb.Clear()
|
||
}
|
||
|
||
// Iterator 位图遍历器
|
||
func (b *BloomFilter) Iterator() roaring64.IntPeekable64 {
|
||
return b.rb.Iterator()
|
||
}
|
||
|
||
// GetSizeInBytes 返回位图大小
|
||
func (b *BloomFilter) GetSizeInBytes() uint64 {
|
||
return b.rb.GetSerializedSizeInBytes()
|
||
}
|
||
|
||
// 获得统计信息,主要用于运行期间获取状态
|
||
func (b *BloomFilter) GetStat() BloomFilterStat {
|
||
return BloomFilterStat{
|
||
M: b.m,
|
||
K: b.k,
|
||
ElementsMax: b.elementsMax,
|
||
ElementsAdded: b.elementsAdded,
|
||
FalsePositiveRate: b.falsePositiveRate,
|
||
}
|
||
}
|
||
|
||
// SaveToFile 写入到文件
|
||
func (b *BloomFilter) SaveToFile(filename string) (err error) {
|
||
b.Flush()
|
||
|
||
headerPB := &Header{
|
||
Version: headerVersion1,
|
||
M: b.m,
|
||
K: b.k,
|
||
ElementsMax: b.elementsMax,
|
||
ElementsAdded: b.elementsAdded,
|
||
FalsePositiveRate: b.falsePositiveRate,
|
||
}
|
||
headerData, err := proto.Marshal(headerPB)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
fi, fe := os.Create(filename)
|
||
if fe != nil {
|
||
return fe
|
||
}
|
||
|
||
defer fi.Close()
|
||
|
||
fh := bitmapFileHeader{
|
||
Size: uint64(len(headerData)),
|
||
}
|
||
|
||
//写入文件头(字节数)
|
||
fe = binary.Write(fi, binary.BigEndian, fh)
|
||
if fe != nil {
|
||
return fe
|
||
}
|
||
|
||
//写入文件头(PB详细信息)
|
||
fe = binary.Write(fi, binary.BigEndian, headerData)
|
||
if fe != nil {
|
||
return fe
|
||
}
|
||
|
||
b.rb.RunOptimize()
|
||
|
||
zw, err := zstd.NewWriter(fi)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer zw.Close()
|
||
|
||
_, fe = b.rb.WriteTo(zw)
|
||
|
||
b.free()
|
||
|
||
return fe
|
||
}
|
||
|
||
// LoadFromFile 从文件中读取
|
||
func LoadFromFile(filename string, headerOnly bool) (bft *BloomFilter, err error) {
|
||
fi, fe := os.Open(filename)
|
||
|
||
if fe != nil {
|
||
return nil, fe
|
||
}
|
||
|
||
defer fi.Close()
|
||
|
||
fh := bitmapFileHeader{}
|
||
fe = binary.Read(fi, binary.BigEndian, &fh)
|
||
if fe != nil {
|
||
return nil, fe
|
||
}
|
||
|
||
headerData := make([]byte, fh.Size)
|
||
|
||
n, err := fi.Read(headerData)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if n != len(headerData) {
|
||
return nil, errors.New("unknown file format")
|
||
}
|
||
headerPB := &Header{}
|
||
|
||
err = proto.Unmarshal(headerData, headerPB)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if headerPB.Version != headerVersion1 {
|
||
return nil, errors.New("unsupported version")
|
||
}
|
||
|
||
bft = &BloomFilter{}
|
||
bft.m = headerPB.GetM()
|
||
bft.k = headerPB.GetK()
|
||
bft.elementsMax = headerPB.GetElementsMax()
|
||
bft.elementsAdded = headerPB.GetElementsAdded()
|
||
bft.falsePositiveRate = headerPB.GetFalsePositiveRate()
|
||
|
||
if headerOnly {
|
||
return bft, nil
|
||
}
|
||
|
||
bft.rb = roaring64.New()
|
||
|
||
zr, err := zstd.NewReader(fi)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer zr.Close()
|
||
|
||
_, fe = bft.rb.ReadFrom(zr)
|
||
if fe != nil {
|
||
return nil, fe
|
||
}
|
||
|
||
return bft, nil
|
||
}
|
||
|
||
// And 两个布隆过滤器进行AND运算,返回新的布隆过滤器
|
||
func (b *BloomFilter) And(other *BloomFilter) (*BloomFilter, error) {
|
||
// 检查参数一致性
|
||
if b.k != other.k || b.falsePositiveRate != other.falsePositiveRate {
|
||
return nil, errors.New("bloom filters must have same k and falsePositiveRate for AND operation")
|
||
}
|
||
|
||
// 创建新的布隆过滤器
|
||
result := newBloomFilter(b.m, b.k, b.elementsMax, b.falsePositiveRate)
|
||
|
||
// 执行AND运算
|
||
result.rb = roaring64.And(b.rb, other.rb)
|
||
|
||
// 计算AND后的元素数量(这是一个估计值,因为AND操作后元素数量无法精确计算)
|
||
// 使用两个过滤器中较小的元素数量作为估计
|
||
if b.elementsAdded < other.elementsAdded {
|
||
result.elementsAdded = b.elementsAdded
|
||
} else {
|
||
result.elementsAdded = other.elementsAdded
|
||
}
|
||
|
||
return result, nil
|
||
}
|
||
|
||
// Or 两个布隆过滤器进行OR运算,返回新的布隆过滤器
|
||
func (b *BloomFilter) Or(other *BloomFilter) (*BloomFilter, error) {
|
||
// 检查参数一致性
|
||
if b.k != other.k || b.falsePositiveRate != other.falsePositiveRate {
|
||
return nil, errors.New("bloom filters must have same k and falsePositiveRate for OR operation")
|
||
}
|
||
|
||
// 创建新的布隆过滤器
|
||
result := newBloomFilter(b.m, b.k, b.elementsMax, b.falsePositiveRate)
|
||
|
||
// 执行OR运算
|
||
result.rb = roaring64.Or(b.rb, other.rb)
|
||
|
||
// 计算OR后的元素数量(这是一个估计值)
|
||
// 使用两个过滤器中较大的元素数量作为估计,但不能超过最大容量
|
||
if b.elementsAdded > other.elementsAdded {
|
||
result.elementsAdded = b.elementsAdded
|
||
} else {
|
||
result.elementsAdded = other.elementsAdded
|
||
}
|
||
|
||
if result.elementsAdded > result.elementsMax {
|
||
result.elementsAdded = result.elementsMax
|
||
}
|
||
|
||
return result, nil
|
||
}
|