Files
bloomtool/internal/bloom/bloom.go
2025-11-05 16:41:06 +08:00

398 lines
9.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package bloom
import (
"encoding/binary"
"errors"
"math"
"os"
"sort"
"sync"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/klauspost/compress/zstd"
"google.golang.org/protobuf/proto"
)
/*
主体代码来源于https://github.com/bits-and-blooms/bloom
在此文件中将bitmap的管理改用roaring bitmap实现
algotao 2022-08-29
*/
type BloomFilter struct {
m uint64 // 存贮空间上限
k uint64 // hash函数个数
elementsMax uint64 // 元素数量上限
elementsAdded uint64 // 已加入的元素数量
falsePositiveRate float64 // 假阳率
rb *roaring64.Bitmap // 位图
chOne chan []uint64 // 接收每个插入的hash索引
chInsert chan []uint64 // 接收排好序的hash索引进Bitmap
chSortJobQuota chan int // 排序工作的配额控制
buf []uint64 // 缓冲
wgJobs sync.WaitGroup
}
type BloomFilterStat struct {
M uint64
K uint64
ElementsMax uint64
ElementsAdded uint64
FalsePositiveRate float64
}
const (
headerVersion1 = 1
)
// bitmapFileHeader 存贮文件头
type bitmapFileHeader struct {
Size uint64 // Header Protobuf size
}
// NewWithEstimates 创建一个BloomFilter并期望有n个元素<fp的误匹配率
func NewWithEstimates(e uint64, fr float64) *BloomFilter {
m, k := EstimateParameters(e, fr)
return newBloomFilter(m, k, e, fr)
}
// new 创建一个新的BloomFilter具有 _m_ bits 和 _k_ hashing 函数
func newBloomFilter(m uint64, k uint64, e uint64, fr float64) *BloomFilter {
b := &BloomFilter{
m: max(1, m),
k: max(1, k),
elementsMax: e,
falsePositiveRate: fr,
rb: roaring64.New(),
chOne: make(chan []uint64, 1024*1024), // 索引缓冲区chan
chInsert: make(chan []uint64, 2), // 插入队列
chSortJobQuota: make(chan int, 8), // 排序队列
}
for i := 0; i < cap(b.chSortJobQuota); i++ {
b.chSortJobQuota <- 0
}
//log.Printf("Init quota len(%v), cap(%v)\n", len(b.chSortJobQuota), cap(b.chSortJobQuota))
go b.consumeOne()
go b.consumeInsert()
return b
}
// 生成 m 和 k
func EstimateParameters(n uint64, p float64) (m uint64, k uint64) {
m = uint64(math.Ceil(-1 * float64(n) * math.Log(p) / math.Pow(math.Log(2), 2)))
k = uint64(math.Ceil(math.Log(2) * float64(m) / float64(n)))
return
}
// location 返回当前位置的hash值
func (b *BloomFilter) location(h [4]uint64, i uint64) uint64 {
return (h[i%2] + i*h[2+(((i+(i%2))%4)/2)]) % b.m
//return 0
}
// baseHashes 生成4个hash值用于生产key
func (b *BloomFilter) baseHashes(data []byte) [4]uint64 {
h := New128()
h.Write(data)
h1, h2 := h.Sum128()
h.Write([]byte{1})
h3, h4 := h.Sum128()
return [4]uint64{
h1, h2, h3, h4,
}
}
// 消费一个计算好的bloom bits对象并填入缓冲。当缓冲半满时发送给排序队列处理
func (b *BloomFilter) consumeOne() {
batchSize := cap(b.chOne) * int(b.k) / 2 //一半buffer满了就开始处理即一半个数的uint64。或是遇到Flush标志bits长度0则刷缓冲
for bits := range b.chOne {
if len(bits) != 0 {
b.elementsAdded++
}
b.buf = append(b.buf, bits...)
if len(b.buf) >= batchSize || len(bits) == 0 {
buf := b.buf[:]
b.buf = []uint64{}
b.wgJobs.Add(1)
// 如果接收到了Flush标志则在处理最后buffer后减一次waitgroup
if len(bits) == 0 {
b.wgJobs.Done()
}
//等待有可用排序配额,如成功则消耗一个配额
<-b.chSortJobQuota
go func() {
sort.Slice(buf, func(i, j int) bool { return buf[i] < buf[j] })
//提交至插入任务
b.chInsert <- buf
//恢复1个配额
b.chSortJobQuota <- 1
}()
}
}
}
// 将批量bits写到bitmap
func (b *BloomFilter) consumeInsert() {
for bitsBatch := range b.chInsert {
b.rb.AddMany(bitsBatch)
b.wgJobs.Done()
}
}
// Add 添加数据的Hash位图
func (b *BloomFilter) Add(data []byte) *BloomFilter {
h := b.baseHashes(data)
bits := make([]uint64, b.k)
for i := uint64(0); i < b.k; i++ {
bits[i] = b.location(h, i)
}
b.chOne <- bits //将一个计算好的bloom bits发送到待处理队列
return b
}
// AddString 添加字符串的Hash位图
func (b *BloomFilter) AddString(data string) *BloomFilter {
return b.Add([]byte(data))
}
// Test 如果命中Hash位图则返回真 (有误匹配率)
func (b *BloomFilter) Test(data []byte) bool {
h := b.baseHashes(data)
for i := uint64(0); i < b.k; i++ {
if !b.rb.Contains(b.location(h, i)) {
return false
}
}
return true
}
// TestString 如果命中字符串Hash位图则返回真 (有误匹配率)
func (b *BloomFilter) TestString(data string) bool {
return b.Test([]byte(data))
}
// Flush 将缓冲中的待处理Bit写入Bitmap
func (b *BloomFilter) Flush() {
b.wgJobs.Add(1)
//发出Flush指令
b.chOne <- []uint64{}
b.wgJobs.Wait()
}
// free 将缓冲中的待处理Bit写入Bitmap
func (b *BloomFilter) free() {
close(b.chOne)
close(b.chInsert)
b.rb.Clear()
}
// Iterator 位图遍历器
func (b *BloomFilter) Iterator() roaring64.IntPeekable64 {
return b.rb.Iterator()
}
// GetSizeInBytes 返回位图大小
func (b *BloomFilter) GetSizeInBytes() uint64 {
return b.rb.GetSerializedSizeInBytes()
}
// 获得统计信息,主要用于运行期间获取状态
func (b *BloomFilter) GetStat() BloomFilterStat {
return BloomFilterStat{
M: b.m,
K: b.k,
ElementsMax: b.elementsMax,
ElementsAdded: b.elementsAdded,
FalsePositiveRate: b.falsePositiveRate,
}
}
// SaveToFile 写入到文件
func (b *BloomFilter) SaveToFile(filename string) (err error) {
b.Flush()
headerPB := &Header{
Version: headerVersion1,
M: b.m,
K: b.k,
ElementsMax: b.elementsMax,
ElementsAdded: b.elementsAdded,
FalsePositiveRate: b.falsePositiveRate,
}
headerData, err := proto.Marshal(headerPB)
if err != nil {
return err
}
fi, fe := os.Create(filename)
if fe != nil {
return fe
}
defer fi.Close()
fh := bitmapFileHeader{
Size: uint64(len(headerData)),
}
//写入文件头(字节数)
fe = binary.Write(fi, binary.BigEndian, fh)
if fe != nil {
return fe
}
//写入文件头PB详细信息
fe = binary.Write(fi, binary.BigEndian, headerData)
if fe != nil {
return fe
}
b.rb.RunOptimize()
zw, err := zstd.NewWriter(fi)
if err != nil {
return err
}
defer zw.Close()
_, fe = b.rb.WriteTo(zw)
b.free()
return fe
}
// LoadFromFile 从文件中读取
func LoadFromFile(filename string, headerOnly bool) (bft *BloomFilter, err error) {
fi, fe := os.Open(filename)
if fe != nil {
return nil, fe
}
defer fi.Close()
fh := bitmapFileHeader{}
fe = binary.Read(fi, binary.BigEndian, &fh)
if fe != nil {
return nil, fe
}
headerData := make([]byte, fh.Size)
n, err := fi.Read(headerData)
if err != nil {
return nil, err
}
if n != len(headerData) {
return nil, errors.New("unknown file format")
}
headerPB := &Header{}
err = proto.Unmarshal(headerData, headerPB)
if err != nil {
return nil, err
}
if headerPB.Version != headerVersion1 {
return nil, errors.New("unsupported version")
}
bft = &BloomFilter{}
bft.m = headerPB.GetM()
bft.k = headerPB.GetK()
bft.elementsMax = headerPB.GetElementsMax()
bft.elementsAdded = headerPB.GetElementsAdded()
bft.falsePositiveRate = headerPB.GetFalsePositiveRate()
if headerOnly {
return bft, nil
}
bft.rb = roaring64.New()
zr, err := zstd.NewReader(fi)
if err != nil {
return nil, err
}
defer zr.Close()
_, fe = bft.rb.ReadFrom(zr)
if fe != nil {
return nil, fe
}
return bft, nil
}
// And 两个布隆过滤器进行AND运算返回新的布隆过滤器
func (b *BloomFilter) And(other *BloomFilter) (*BloomFilter, error) {
// 检查参数一致性
if b.k != other.k || b.falsePositiveRate != other.falsePositiveRate {
return nil, errors.New("bloom filters must have same k and falsePositiveRate for AND operation")
}
// 创建新的布隆过滤器
result := newBloomFilter(b.m, b.k, b.elementsMax, b.falsePositiveRate)
// 执行AND运算
result.rb = roaring64.And(b.rb, other.rb)
// 计算AND后的元素数量这是一个估计值因为AND操作后元素数量无法精确计算
// 使用两个过滤器中较小的元素数量作为估计
if b.elementsAdded < other.elementsAdded {
result.elementsAdded = b.elementsAdded
} else {
result.elementsAdded = other.elementsAdded
}
return result, nil
}
// Or 两个布隆过滤器进行OR运算返回新的布隆过滤器
func (b *BloomFilter) Or(other *BloomFilter) (*BloomFilter, error) {
// 检查参数一致性
if b.k != other.k || b.falsePositiveRate != other.falsePositiveRate {
return nil, errors.New("bloom filters must have same k and falsePositiveRate for OR operation")
}
// 创建新的布隆过滤器
result := newBloomFilter(b.m, b.k, b.elementsMax, b.falsePositiveRate)
// 执行OR运算
result.rb = roaring64.Or(b.rb, other.rb)
// 计算OR后的元素数量这是一个估计值
// 使用两个过滤器中较大的元素数量作为估计,但不能超过最大容量
if b.elementsAdded > other.elementsAdded {
result.elementsAdded = b.elementsAdded
} else {
result.elementsAdded = other.elementsAdded
}
if result.elementsAdded > result.elementsMax {
result.elementsAdded = result.elementsMax
}
return result, nil
}