ETCD BoltDB

2018-04-23 Monday     program , golang , linux

ETCD 后端存储采用的是 BBolt 存储引擎,其前身是 BoltDB ,这是一款 golang 实现的嵌入式 KV 存储引擎,参考的是 LMDB,支持事务、ACID、MVCC、ZeroCopy、BTree等特性。

使用

直接通过 go get github.com/boltdb/bolt/... 安装源码。

示例

package main
 
import (
	"bytes"
	"fmt"
	"github.com/boltdb/bolt"
	"log"
)
 
func main() {
	arrBucket := []string{"a", "b"}
	mgr, _ := NewBoltManager("my.db", arrBucket)
	mgr.Add("a", []byte("11"))
	mgr.Add("a", []byte("22"))
	id33, _ := mgr.Add("a", []byte("33"))
	mgr.Add("a", []byte("22"))
 
	mgr.Add("b", []byte("11"))
	mgr.Add("b", []byte("22"))
 
	log.Println("Select a=>>>>>>>>>>>>>>>>>>>")
	mgr.Select("a")
	log.Println("Select b=>>>>>>>>>>>>>>>>>>>")
	mgr.Select("b")
 
	log.Println("RemoveID a=>>>>>>>>>>>>>>>>>>>")
	mgr.RemoveID("a", []byte(fmt.Sprintf("%d", id33)))
 
	log.Println("RemoveValTransaction a=>>>>>>>>>>>>>>>>>>>")
	mgr.RemoveValTransaction("a", []byte("22"))
	log.Println("Select a=>>>>>>>>>>>>>>>>>>>")
	mgr.Select("a")
 
	//清理Bucket
	for _, v := range arrBucket {
		mgr.RemoveBucket(v)
	}
	mgr.Close()
 
}
 
//BlotDB的管理类
type BoltManager struct {
	db *bolt.DB "数据库类"
}
 
//创建库管理,并生成Bucket
func NewBoltManager(dbPath string, bucket []string) (*BoltManager, error) {
	//  bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second,ReadOnly: true})
	db, err := bolt.Open(dbPath, 0644, nil)
	if err != nil {
		return nil, err
	}
	err = db.Update(func(tx *bolt.Tx) error {
		for _, v := range bucket {
			_, err := tx.CreateBucketIfNotExists([]byte(v))
			if err != nil {
				return err
			}
		}
		return nil
	})
 
	if err != nil {
		return nil, err
	}
	return &BoltManager{db}, nil
}
 
//关库数据库
func (m *BoltManager) Close() error {
	return m.db.Close()
}
 
//移除Bucket
func (m *BoltManager) RemoveBucket(bucketName string) (err error) {
	err = m.db.Update(func(tx *bolt.Tx) error {
		return tx.DeleteBucket([]byte(bucketName))
	})
	return err
}
 
//组Bucket增加值
func (m *BoltManager) Add(bucketName string, val []byte) (id uint64, err error) {
 
	err = m.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte(bucketName))
		id, _ = b.NextSequence() //sequence uint64
		bBuf := fmt.Sprintf("%d", id)
		return b.Put([]byte(bBuf), val)
	})
	return
}
 
//遍历
func (m *BoltManager) Select(bucketName string) (err error) {
	m.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte(bucketName))
		b.ForEach(func(k, v []byte) error {
			log.Printf("key=%s, value=%s\n", string(k), v)
			return nil
		})
		return nil
	})
	return nil
}
 
//移除指定Bucket中指定ID
func (m *BoltManager) RemoveID(bucketName string, id []byte) error {
	err := m.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte(bucketName))
		return b.Delete(id)
	})
	return err
}
 
//移除指定Bucket中指定Val
func (m *BoltManager) RemoveVal(bucketName string, val []byte) (err error) {
	var arrID []string
	arrID = make([]string, 1)
	err = m.db.View(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte(bucketName))
		c := b.Cursor()
		for k, v := c.First(); k != nil; k, v = c.Next() {
			log.Printf("key=%s, value=%s\n", k, string(v))
			if bytes.Compare(v, val) == 0 {
				arrID = append(arrID, string(k))
			}
		}
		return nil
	})
 
	err = m.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket([]byte(bucketName))
		for _, v := range arrID {
			b.Delete([]byte(v))
			log.Println("Del k:", v)
		}
		return nil
	})
 
	return err
}
 
//查找指定值
func (m *BoltManager) SelectVal(bucketName string, val []byte) (
	arr []string,
	err error,
) {
	arr = make([]string, 0, 1)
	err = m.db.View(func(tx *bolt.Tx) error {
		c := tx.Bucket([]byte(bucketName)).Cursor()
		for k, v := c.First(); k != nil; k, v = c.Next() {
			if bytes.Compare(v, val) == 0 {
				arr = append(arr, string(k))
			}
		}
		return nil
	})
	return arr, err
}
 
//在事务中,移除指定Bucket中指定Val
func (m *BoltManager) RemoveValTransaction(bucketName string, val []byte) (
	count int,
	err error,
) {
	arrID, err1 := m.SelectVal(bucketName, val)
	if err1 != nil {
		return 0, err1
	}
	count = len(arrID)
	if count == 0 {
		return count, nil
	}
 
	tx, err1 := m.db.Begin(true)
	if err1 != nil {
		return count, err1
	}
	b := tx.Bucket([]byte(bucketName))
	for _, v := range arrID {
		if err = b.Delete([]byte(v)); err != nil {
			log.Printf("删除ID(%s)失败! 执行回滚. err:%s \r\n", v, err)
			tx.Rollback()
			return
		}
		log.Println("删除ID(", v, ")成功!")
	}
	err = tx.Commit()
	return
}

打开数据库

package main

import (
	"log"
	"time"
	"github.com/boltdb/bolt"
)

func main() {
	// 打开数据库
	db, err := bolt.Open("my.db", 0600, &bolt.Options{Timeout: 1 * time.Second})
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()
}

第一参数指定路径,不存在则创建, 第二个为文件操作,第三个参数是可选参数,内部可以配置只读和超时时间等。

BoltDB 只能单点写入和读取,如果多个同时操作的话后者会被挂起直到前者关闭操作为止,一次只允许一个读写事务,但一次允许多个只读事务。

读写事务

可以使用 DB.Update() 来完成。

err := db.Update(func(tx *bolt.Tx) error {
	return nil
})

在闭包中,结束时返回 nil 来提交事务,返回一个错误在任何点回滚事务。

批量读写事物

每一次新的事物都需要等待上一次事物的结束,此时可以通过 DB.Batch() 批处理来完成。

err := db.Batch(func(tx *bolt.Tx) error {
	return nil
})

在批处理过程中如果某个事务失败了,批处理会多次调用这个函数函数返回成功则成功,如果中途失败了,则整个事务会回滚。

只读事务

只读事务可以使用 DB.View() 来完成。

err := db.View(func(tx *bolt.Tx) error {
	return nil
})

不改变数据的操作都可以通过只读事务来完成, 您只能检索桶、检索值,或在只读事务中复制数据库。

启动事务

DB.Begin() 启动函数包含在 DB.Update()DB.Batch() 中,该函数启动事务开始执行事务并返回结果关闭事务,有时可能需要手动启动事物你可以使用 Tx.Begin() 来开始,切记不要忘记关闭事务。

// Start a writable transaction.
tx, err := db.Begin(true)
if err != nil {
	return err
}
defer tx.Rollback()

// Use the transaction...
_, err := tx.CreateBucket([]byte("MyBucket"))
if err != nil {
	return err
}

// Commit the transaction and check for error.
if err := tx.Commit(); err != nil {
	return err
}

使用桶

桶是数据库中键/值对的集合,桶中的所有键必须是唯一的,可以使用 DB.CreateBucket() 创建一个桶。

db.Update(func(tx *bolt.Tx) error {
	b, err := tx.CreateBucket([]byte("MyBucket"))
	if err != nil {
		return fmt.Errorf("create bucket: %s", err)
	}
	return nil
})

也可以使用 Tx.CreateBucketIfNotExists() 来创建桶,该函数会先判断是否已经存在该桶不存在即创建,删除桶可以使用 Tx.DeleteBucket() 来完成。

使用 KV 对

存储键值对到桶里可以使用 Bucket.Put() 来完成。

db.Update(func(tx *bolt.Tx) error {
	b := tx.Bucket([]byte("MyFriendsBucket"))
	err := b.Put([]byte("one"), []byte("zhangsan"))
	return err
})

获取键值 Bucket.Get()

db.View(func(tx *bolt.Tx) error {
	b := tx.Bucket([]byte("MyFriendsBucket"))
	v := b.Get([]byte("one"))
	fmt.Printf("The answer is: %s\n", v)
	return nil
})

get() 函数不返回一个错误,除非有某种系统故障,如果键存在,那么它将返回它的值;如果它不存在,那么它将返回 nil



如果喜欢这里的文章,而且又不差钱的话,欢迎打赏个早餐 ^_^


About This Blog

Recent Posts

Categories

Related Links

  • RTEMS
    RTEMS
  • GNU
  • Linux Kernel
  • Arduino

Search


This Site was built by Jin Yang, generated with Jekyll, and hosted on GitHub Pages
©2013-2019 – Jin Yang