Files
bloomtool/internal/bloom/bloom.go
2025-11-03 14:37:59 +08:00

346 lines
7.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package bloom
import (
"encoding/binary"
"errors"
"math"
"os"
"sort"
"sync"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/klauspost/compress/zstd"
"google.golang.org/protobuf/proto"
)
/*
主体代码来源于https://github.com/bits-and-blooms/bloom
在此文件中将bitmap的管理改用roaring bitmap实现
algotao 2022-08-29
*/
type BloomFilter struct {
m uint64 // 存贮空间上限
k uint64 // hash函数个数
elementsMax uint64 // 元素数量上限
elementsAdded uint64 // 已加入的元素数量
falsePositiveRate float64 // 假阳率
rb *roaring64.Bitmap // 位图
chOne chan []uint64 // 接收每个插入的hash索引
chInsert chan []uint64 // 接收排好序的hash索引进Bitmap
chSortJobQuota chan int // 排序工作的配额控制
buf []uint64 // 缓冲
wgJobs sync.WaitGroup
}
type BloomFilterStat struct {
M uint64
K uint64
ElementsMax uint64
ElementsAdded uint64
FalsePositiveRate float64
}
const (
headerVersion1 = 1
)
// bitmapFileHeader 存贮文件头
type bitmapFileHeader struct {
Size uint64 // Header Protobuf size
}
// NewWithEstimates 创建一个BloomFilter并期望有n个元素<fp的误匹配率
func NewWithEstimates(e uint64, fr float64) *BloomFilter {
m, k := EstimateParameters(e, fr)
return newBloomFilter(m, k, e, fr)
}
// new 创建一个新的BloomFilter具有 _m_ bits 和 _k_ hashing 函数
func newBloomFilter(m uint64, k uint64, e uint64, fr float64) *BloomFilter {
b := &BloomFilter{
m: max(1, m),
k: max(1, k),
elementsMax: e,
falsePositiveRate: fr,
rb: roaring64.New(),
chOne: make(chan []uint64, 1024*1024), // 索引缓冲区chan
chInsert: make(chan []uint64, 2), // 插入队列
chSortJobQuota: make(chan int, 8), // 排序队列
}
for i := 0; i < cap(b.chSortJobQuota); i++ {
b.chSortJobQuota <- 0
}
//log.Printf("Init quota len(%v), cap(%v)\n", len(b.chSortJobQuota), cap(b.chSortJobQuota))
go b.consumeOne()
go b.consumeInsert()
return b
}
// 生成 m 和 k
func EstimateParameters(n uint64, p float64) (m uint64, k uint64) {
m = uint64(math.Ceil(-1 * float64(n) * math.Log(p) / math.Pow(math.Log(2), 2)))
k = uint64(math.Ceil(math.Log(2) * float64(m) / float64(n)))
return
}
// location 返回当前位置的hash值
func (b *BloomFilter) location(h [4]uint64, i uint64) uint64 {
return (h[i%2] + i*h[2+(((i+(i%2))%4)/2)]) % b.m
//return 0
}
// baseHashes 生成4个hash值用于生产key
func (b *BloomFilter) baseHashes(data []byte) [4]uint64 {
h := New128()
h.Write(data)
h1, h2 := h.Sum128()
h.Write([]byte{1})
h3, h4 := h.Sum128()
return [4]uint64{
h1, h2, h3, h4,
}
}
// 消费一个计算好的bloom bits对象并填入缓冲。当缓冲半满时发送给排序队列处理
func (b *BloomFilter) consumeOne() {
batchSize := cap(b.chOne) * int(b.k) / 2 //一半buffer满了就开始处理即一半个数的uint64。或是遇到Flush标志bits长度0则刷缓冲
for bits := range b.chOne {
if len(bits) != 0 {
b.elementsAdded++
}
b.buf = append(b.buf, bits...)
if len(b.buf) >= batchSize || len(bits) == 0 {
buf := b.buf[:]
b.buf = []uint64{}
b.wgJobs.Add(1)
// 如果接收到了Flush标志则在处理最后buffer后减一次waitgroup
if len(bits) == 0 {
b.wgJobs.Done()
}
//等待有可用排序配额,如成功则消耗一个配额
<-b.chSortJobQuota
go func() {
sort.Slice(buf, func(i, j int) bool { return buf[i] < buf[j] })
//提交至插入任务
b.chInsert <- buf
//恢复1个配额
b.chSortJobQuota <- 1
}()
}
}
}
// 将批量bits写到bitmap
func (b *BloomFilter) consumeInsert() {
for bitsBatch := range b.chInsert {
b.rb.AddMany(bitsBatch)
b.wgJobs.Done()
}
}
// Add 添加数据的Hash位图
func (b *BloomFilter) Add(data []byte) *BloomFilter {
h := b.baseHashes(data)
bits := make([]uint64, b.k)
for i := uint64(0); i < b.k; i++ {
bits[i] = b.location(h, i)
}
b.chOne <- bits //将一个计算好的bloom bits发送到待处理队列
return b
}
// AddString 添加字符串的Hash位图
func (b *BloomFilter) AddString(data string) *BloomFilter {
return b.Add([]byte(data))
}
// Test 如果命中Hash位图则返回真 (有误匹配率)
func (b *BloomFilter) Test(data []byte) bool {
h := b.baseHashes(data)
for i := uint64(0); i < b.k; i++ {
if !b.rb.Contains(b.location(h, i)) {
return false
}
}
return true
}
// TestString 如果命中字符串Hash位图则返回真 (有误匹配率)
func (b *BloomFilter) TestString(data string) bool {
return b.Test([]byte(data))
}
// Flush 将缓冲中的待处理Bit写入Bitmap
func (b *BloomFilter) Flush() {
b.wgJobs.Add(1)
//发出Flush指令
b.chOne <- []uint64{}
b.wgJobs.Wait()
}
// free 将缓冲中的待处理Bit写入Bitmap
func (b *BloomFilter) free() {
close(b.chOne)
close(b.chInsert)
b.rb.Clear()
}
// Iterator 位图遍历器
func (b *BloomFilter) Iterator() roaring64.IntPeekable64 {
return b.rb.Iterator()
}
// GetSizeInBytes 返回位图大小
func (b *BloomFilter) GetSizeInBytes() uint64 {
return b.rb.GetSerializedSizeInBytes()
}
// 获得统计信息,主要用于运行期间获取状态
func (b *BloomFilter) GetStat() BloomFilterStat {
return BloomFilterStat{
M: b.m,
K: b.k,
ElementsMax: b.elementsMax,
ElementsAdded: b.elementsAdded,
FalsePositiveRate: b.falsePositiveRate,
}
}
// SaveToFile 写入到文件
func (b *BloomFilter) SaveToFile(filename string) (err error) {
b.Flush()
headerPB := &Header{
Version: headerVersion1,
M: b.m,
K: b.k,
ElementsMax: b.elementsMax,
ElementsAdded: b.elementsAdded,
FalsePositiveRate: b.falsePositiveRate,
}
headerData, err := proto.Marshal(headerPB)
if err != nil {
return err
}
fi, fe := os.Create(filename)
if fe != nil {
return fe
}
defer fi.Close()
fh := bitmapFileHeader{
Size: uint64(len(headerData)),
}
//写入文件头(字节数)
fe = binary.Write(fi, binary.BigEndian, fh)
if fe != nil {
return fe
}
//写入文件头PB详细信息
fe = binary.Write(fi, binary.BigEndian, headerData)
if fe != nil {
return fe
}
b.rb.RunOptimize()
zw, err := zstd.NewWriter(fi)
if err != nil {
return err
}
defer zw.Close()
_, fe = b.rb.WriteTo(zw)
b.free()
return fe
}
// LoadFromFile 从文件中读取
func LoadFromFile(filename string, headerOnly bool) (bft *BloomFilter, err error) {
fi, fe := os.Open(filename)
if fe != nil {
return nil, fe
}
defer fi.Close()
fh := bitmapFileHeader{}
fe = binary.Read(fi, binary.BigEndian, &fh)
if fe != nil {
return nil, fe
}
headerData := make([]byte, fh.Size)
n, err := fi.Read(headerData)
if err != nil {
return nil, err
}
if n != len(headerData) {
return nil, errors.New("unknown file format")
}
headerPB := &Header{}
err = proto.Unmarshal(headerData, headerPB)
if err != nil {
return nil, err
}
if headerPB.Version != headerVersion1 {
return nil, errors.New("unsupported version")
}
bft = &BloomFilter{}
bft.m = headerPB.GetM()
bft.k = headerPB.GetK()
bft.elementsMax = headerPB.GetElementsMax()
bft.elementsAdded = headerPB.GetElementsAdded()
bft.falsePositiveRate = headerPB.GetFalsePositiveRate()
if headerOnly {
return bft, nil
}
bft.rb = roaring64.New()
zr, err := zstd.NewReader(fi)
if err != nil {
return nil, err
}
defer zr.Close()
_, fe = bft.rb.ReadFrom(zr)
if fe != nil {
return nil, fe
}
return bft, nil
}