sync.Map 源码阅读

前言

Sync.Map 是Golang 官方提供的线程安全的 map类库,因为Golang 本身自带的map并不是线程安全的,因为会有sync.Map这个类库的存在

实现

基础元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type Map struct {
mu Mutex

// read contains the portion of the map's contents that are safe for
// concurrent access (with or without mu held).
//
// The read field itself is always safe to load, but must only be stored with
// mu held.
//
// Entries stored in read may be updated concurrently without mu, but updating
// a previously-expunged entry requires that the entry be copied to the dirty
// map and unexpunged with mu held.
read atomic.Value // readOnly

// dirty contains the portion of the map's contents that require mu to be
// held. To ensure that the dirty map can be promoted to the read map quickly,
// it also includes all of the non-expunged entries in the read map.
//
// Expunged entries are not stored in the dirty map. An expunged entry in the
// clean map must be unexpunged and added to the dirty map before a new value
// can be stored to it.
//
// If the dirty map is nil, the next write to the map will initialize it by
// making a shallow copy of the clean map, omitting stale entries.
dirty map[interface{}]*entry

// misses counts the number of loads since the read map was last updated that
// needed to lock mu to determine whether the key was present.
//
// Once enough misses have occurred to cover the cost of copying the dirty
// map, the dirty map will be promoted to the read map (in the unamended
// state) and the next store to the map will make a new dirty copy.
misses int
}

// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
m map[interface{}]*entry
amended bool // true if the dirty map contains some key not in m.
}
type entry struct {
// p points to the interface{} value stored for the entry.
//
// If p == nil, the entry has been deleted and m.dirty == nil.
//
// If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
// is missing from m.dirty.
//
// Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
// != nil, in m.dirty[key].
//
// An entry can be deleted by atomic replacement with nil: when m.dirty is
// next created, it will atomically replace nil with expunged and leave
// m.dirty[key] unset.
//
// An entry's associated value can be updated by atomic replacement, provided
// p != expunged. If p == expunged, an entry's associated value can be updated
// only after first setting m.dirty[key] = e so that lookups using the dirty
// map find the entry.
p unsafe.Pointer // *interface{}
}

sync.Map 存储数据的分为两个部分:
1.read元素,使用的是atomic.Value 把map 当成interface进行存储;
2. dirty 一个golang中的一个Map,用于临时存储部分新的数据(具体场景会在后面进行描述)

写流程

1
2
3
4
5
6
7
1.  判断Key是否再Read部分,如果在read部分,则直接采用cas来更换,更换成功则直接返回
2. 否则就要加锁然后修改到dirty中,
有三种情况:
1. 在Read中,但是已经被修改,cas无法修改成功(会先把这个key对应的值与mark无效的指针做对比,如果是,则需要把这个更换了的key对应的放到dirty里面)
2. 在dirty 中,直接进行修改
3. 是一个新Key的情况下,如果dirty没有key不在read中,直接修改dirty,并且把需要读修正的Flag(read.amend = true置入)
3. 解锁

因此 如果当key在Read中,是可以保证一个比较快的实现,因为用的是cas的比较方法,而不是直接加锁去防止竞争锁带来的性能损失。

读流程

  1. 读取Read中是否存在此Key,若存在则直接返回
  2. 当发现key不存在于read,并且没有读修正的时候,直接返回
  3. 发现需要读修正的情况下,会顺便把dirty中的数据置入到read中,并且返回值

LoadORStore 读写一体流程

用于阅读sync.Map的触发代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"sync"
)

func main() {
// for testing sync.map work

var ma sync.Map
k, _ := ma.Load("a")
fmt.Println(k)

ma.Store("a", 1)
_, _ = ma.Load("a")
ma.Store("b", 2)
ma.Store("a", 2)
ma.Store("a", 3)
p, _ := ma.Load("a")
fmt.Println(p)
}

竞锁的情况

1

gomodule

在线一对一迁移GoModule

为什么需要迁移到GoModule

主要是因为三个原因

  • 从 Golang 1.13 以后, GoModule 已经从各种可用的方法由官方建议统一成 GoModule(并且事实上已经在比较成熟的 Golang 的开源项目中都已经开始进行采用),也是为了便于升级 Golang Runtime 的版本来获取一些底层方面的性能提升
  • 可以使用 GoProxy 的中国实现解决实际生产上 Go 下载依赖包困难的问题(Golang 默认对于公开的开源库是会选择回源进行下载,但由于不可抗力,实际生产中下载依赖有可能触发一下404的域名,在不使用代理的情况下,无法下载依赖)
  • 目的是可以抛开 GOPATH 来更加自由的组织自己的项目

GoModule 原理相关

GoModule管理版本方式

GoModule采用语义化版本管理方式来管理库名的版本。

1
2
3
4
5
6
此处在比较正式的软件工程的定义中,关于版本的定义如下:
V $Major.$Minor.$Patch (eg: v2.10.1)
其中定义:
- Major 为大版本,即默认期待为可能会有破坏性的变动,如Api的整体大变更
- Minor 为小版本,为引入新功能或者特性修改
- Patch 为补丁,一般来说是修复对应Minor版本中的部分问题

Golang 也引用了这个定义来进行 GoModule的构建。
即如果包发生破坏性的变化,ModuleName 需要同时加一,以一个我们使用到的excel解析库为例子, 即 Major Version 升级也应该放入 ModuleName 中。

1
2
3
4
// 原模块名
github.com/360EntSecGroup-Skylar/excelize
// 发生变化后
github.com/360EntSecGroup-Skylar/excelize/v2

但是上面例子为比较符合规范的类库,但实际上开发者不一定完全根据这个执行(大部分开发者会跟着规范去执行),没有强制去进行规范化和提供规范化的工具。因此当使用或者升级第三方类库的情况下,最好还是自己去检查一下升级版本前后是否会有影响。

GONOPROXY/GONOSUMDB/GOPRIVATE 概念解析(私用仓库管理相关)

这三个环境变量都是用在当前项目依赖了私有模块,也就是依赖了由 GOPROXY 指定的 Go module proxy 或由 GOSUMDB 指定 Go checksum database 无法访问到的模块时的场景,他们具有如下特性:

1
2
它们三个的值都是一个以英文逗号 “,” 分割的模块路径前缀,匹配规则同 path.Match。其中 GOPRIVATE 较为特殊,它的值将作为 GONOPROXY 和 GONOSUMDB 的默认值,所以建议的最佳姿势是只是用 GOPRIVATE。
在使用上来讲,比如 GOPRIVATE=*.corp.example.com 表示所有模块路径以 corp.example.com 的下一级域名 (如 team1.corp.example.com) 为前缀的模块版本都将不经过 Go module proxy 和 Go checksum database,需要注意的是不包括 corp.example.com 本身。

GOPROXY

这个环境变量主要是用于设置 Go 模块代理,主要如它的值是一个以英文逗号 “,” 分割的 Go module proxy 列表(稍后讲解)

1
2
3
4
作用:用于使 Go 在后续拉取模块版本时能够脱离传统的 VCS 方式从镜像站点快速拉取。它拥有一个默认值,但很可惜 proxy.golang.org 在中国无法访问,故而建议使用 goproxy.cn 作为替代。
设置为 “off” :禁止 Go 在后续操作中使用任 何 Go module proxy。
刚刚在上面,我们可以发现值列表中有 “direct” ,它又有什么作用呢?
其实值列表中的 “direct” 为特殊指示符,用于指示 Go 回源到模块版本的源地址去抓取 (比如 GitHub 等),当值列表中上一个 Go module proxy 返回 404 或 410 错误时,Go 自动尝试列表中的下一个,遇见 “direct” 时回源,遇见 EOF 时终止并抛出类似 “invalid version: unknown revision...” 的错误。

在安装go1.13之后,go会在系统默认使用goproxy.io来进行代理获取。但是由于不可抗力,我们在国内是无法使用这个goproxy的地址的,因此我们只能选用中国区的goproxy的实现。
目前主要有两个实现

但是基于七牛云上面的 goproxy.cn 是目前国内较多GO开发者使用的 GOPROXY ,并且七牛在Go语言方面的布道和投入明显多于阿里的实现。目前我们使用七牛云的实现。(后续文章的配置均以goproxy.cn为GOPROXY变量的值)

1
2
3
4
5
6
7
8
9
10
11
12
13
// go.mop go.mod 文件示例
module gitlab.xinghuolive.com/birds-backend/phoenix

go 1.13

require (
github.com/360EntSecGroup-Skylar/excelize v1.4.1
gitlab.xinghuolive.com/birds-backend/migrations v0.0.0-20191202065617-b123ba4a7cdc
gopkg.in/go-playground/validator.v8 v8.18.2
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
mellium.im/sasl v0.2.1 // indirect
...
)

对于上面的模块每个字段的代表的是

  1. module 表示的是本项目的项目名称
  2. go 1.13 代表使用的是go 1.13进行运行(这里只是用于展示,非强制性,这个是跟实际控制go mod 的go运行版本相关)
  3. require 代表这个项目所依赖的第三方类库(包括公共库和私有库)require 的单个条目的组成为项目名称 version(-commit )
1
2
3
//示例:
github.com/robfig/cron v1.2.0
gitlab.xinghuolive.com/birds-backend/swan v0.0.0-20191112105628-9b584ccec9ef

为什么会有上面的两种样式,原因是因为依赖的库的方式不同。
使用Go module 后依然可以用go get 来新加库(会自动把新依赖添加到go.mod文件中)。
但是可以支持以下方法来更新

1
2
3
4
go get $project@latest 根据tag来进行拉取
go get $project@master 根据branch来进行拉取
go get $project@V1.0 根据版本号来获取
go get $project@d4a36278507 根据commit号来获取

因此会有着两种形式同时存在。

  • 如何管理私有库
    即使上面配置好了GOPROXY,但是实际上私有库也是不能直接获取到的,所以我们需要配置GOPRIVATE变量来告诉go module去构建的时候,这个直接走本地的gitlab去git clone,而且不是去外边去go get 外面的库。
    主要原因:
    • GOPROXY 是无权访问到任何人的私有模块的,所以你放心,安全性没问题。
    • GOPROXY 除了设置模块代理的地址以外,还需要增加 “direct” 特殊标识才可以成功拉取私有库。
  • 同一类库的不同版本的处理
    在GoModule中的预设,假设两个包的大版本如 v1,v2 。它理解为这里是发生了BreakingChange,它会默认把这种情况的两个版本当时两个包来引入。但是对于同一个版本上面的如v1.2和v1.3,它默认是没有破坏性的改动的,因此它会默认筛选更新的版本来进行使用。(可以通过Replace来指定本项目用到的这个包的版本)

Replace的功能介绍

Replace 是 GoModule 中 允许用户进行依赖模块替换的功能
有大概如下三种用法:

  • 把已经消失的类库或者 Fork 出来的类库,使用这个来替换掉原来计算的代码
  • 把不同层级引用的依赖进行归一化直接重定向到统一个目录下的一个库版本
  • 把频繁改动的依赖库隔离其他人影响

GoModule的依赖保留方式

  1. 在目前使用了 GoModule 后,实际上所有的项目依赖都会保存到GOPATH[0]/pkg目录下【go1.13是这样的保留方式,1.14会放到 $GOMODCACHE 的变量路径中】,它会保留多个版本。
    以penguin代码为例子,它会保留多个commit在本地来继续全局的统一保存。

迁移过程

1.梳理私有项目间的依赖

梳理私有项目间的依赖,以最底的依赖来开始迁移至 GoModule,然后逐层上升来构建GoModule。

我们的处理逻辑是从Turkey和Swan开始入手改造项目变成使用 GoModule进行管理,再到Penguin,如此类推,知道所有应用层代码都变成了使用 GoModule进行管理。

2.解决依赖的问题

因此可能出现下面两种问题:

  1. 依赖消失的情况[由于 Golang 在 GoModule之前是没有一套官方定义的统一方法去管理第三方的依赖,只会把代码托管在第三方的平台上(eg:github),并且在历史上没有一个类似于其他语言社区有一个中心化的库管理的工具(如Python中的Pypi, Java中的Maven]
  • 解决方案
    对上面的这种情况,像项目中之前依赖的
    1
    2
    github.com/xiao100/redis-pattern
    github.com/xiao100/compoent
    两个类库的情况下,因为类库不算复杂并且比较小,我们目前的做法是直接把Vendor中的旧代码挪出来进入项目中自己进行管理.
  1. 本项目的依赖和依赖项目的引用了同一个包的不同版本的问题
  • 解决方案
    本项目的依赖版本和依赖的依赖的项目版本不一致的情况
    在上图提及 Magpie 的项目中用到 xiao100/cast 版本与 penguin 中用到的版本不一样,但是因为只是Cast的new方法是比较小的差异,新版本多返回了一个 error类 型,所以只要手动处理。(其实也可以理解为倒逼一些引用了的包比较旧的版本升级,只要测试充分,其实对系统的伤害不大,而且依赖库解决了一些潜在的 Bug)

3.获取私有依赖的新版本的问题

  1. 配置好相关的环境变量后, 在所在目录上面直接执行
1
go get gitlab.xinghuovip.com/birds-backend/$project@($branch/$gitTag/$commitNumber)

执行成功后,go.mod 文件会自动修改相关的依赖。

4. Replace 在开发中的使用

因为我们在项目中采用的是 GitFlow 的功能分支开发模式进行代码管理,因此每个人切换或者合并分支的频率是比较高的,因此,如果每次都要手动去 Go get 下层依赖的改动其实会特别繁琐,因此我们在开发中,如果会修改到下层依赖的代码时候,如 turkey 的代码,我们默认会把 go.mod 中 turkey 的引用修改成相对位置的引用,这样就能减少开发切换的成本,降低使用心智负担。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// go.mod
module gitlab.xinghuolive.com/birds-backend/phoenix

go 1.13

require (
github.com/360EntSecGroup-Skylar/excelize/v2 v2.1.0
github.com/Shopify/sarama v1.27.2
github.com/aliyun/aliyun-oss-go-sdk v2.1.4+incompatible
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b // indirect
github.com/bxcodec/faker v2.0.1+incompatible
github.com/bxcodec/faker/v3 v3.5.0
github.com/cep21/circuit v3.0.0+incompatible
github.com/dgrijalva/jwt-go v3.2.0+
...
)
replace gitlab.xinghuolive.com/birds-backend/turkey => ../turkey

5. Jenkins构建脚本的修改

  1. 修改了 build 函数里面的拉取的函数直接通过Go Get 去进行获取对应库的分支,以前采用的方式是使用 git clone 下来对应版本的私有代码到GOPATH来进行构建。
  2. 为了方便开发时功能分支的切换,而且降低开发人员对GoMod 文件管理的心智负担,我们在构建的时候会把 go mod 中的 replace 相关的内容在构建中通过shell命令给替换掉,保证了上线代码的版本必定是指定构建依赖的分支

迁移GoModule后遇到的坑

  • Jenkins构建上面的问题
    遇到一个比较大的坑是,实现的第一版的时候,想把pull本项目的代码与Go get 依赖放到同一个 Stage 中,build单独放到另外一个 Stage 中。但是发现在 Jenkin s中如果把 go get 放到 build 之外的 Stage 中,实际上并没有go get 依赖库新版本成功。因此最终是把go get 自己私有依赖和 build 放到了同一个 stage 就可以解决问题
  • 依赖库版本管理的问题
    有一次在开发版本国中发生比较大的代码合入之后,直接使用 Go Mod tidy 整理依赖后,go.mod 文件重新计算依赖后,直接把我们的excel解析库的版本进行了升级,然后api发生了变动,经检查后,我们直接通过下面的go get 的命令来直接锁死了该库的版本
    1
    go get  github.com/360EntSecGroup-Skylar/excelize/v2@<=v2.1.0

本地切换 GoModule 使用方法

Windows桌面开发环境

  1. 安装Go1.13

    1. 在Goland中点击 File-> Setting -> GO -> GOROOT
    2. 点击加号 -> Download
    3. 点击对应版本和选择安装这个Go版本的路径即可
    4. 然后使用go1.13.4 的sdk 即可
  2. 配置Go相关的变量

    1. 把项目路径从GOPATH中删除(与添加GOPATH是相反的操作,此处不再展示)
    2. 执行下面的命令组,配置 GOPROXY 和 GOPRIVATE 环境变量(我的操作是在git shell 中,理论上在 cmd 也可以执行相同的命令)
      1
      2
      3
      go env -w GOPROXY=https://goproxy.cn,direct
      go env -w GOPRIVATE=gitlab.xinghuolive.com

  3. 配置git相关的配置

    1. 配置默认使用ssl 代替https仓库。在git shell中执行下面命令

      1
      2
      3
      4
      5
      6
      vim ~/.gitconfig
      输入下面的内容
      [url "ssh://git@gitlab.xinghuolive.com/"]
      insteadOf = https://gitlab.xinghuolive.com/
      [url "ssh://git@github.com/"]
      insteadOf = https://github.com/
    2. 配置github的用户(方便拉取github源的情况免输入用户密码,如果本机有在github做公钥的话可以忽略这个)

      1
      2
      3
      4
      // 在bash中执行
      vim ~/.netrc
      输入下面内容
      machine github.com login username $github_username password $github_password

      此处 $github_password 可以直接输入github账号的密码或者使用github生成的token

  4. 在 IDE 项目中打开 GoModule 选项

    1. 在 Goland 中点击 File-> Setting -> GO -> Go Module
      ![image.png](https://cdn.nlark.com/yuque/0/2019/png/554199/1575364828721-a5e60d60-bf10-4d8d-9aa3-965a1c8b4f77.png#align=left&display=inline&height=458&margin=%5Bobject%20Object%5D&name=image.png&originHeight=916&originWidth=1348&size=89362&status=done&style=none&width=674)

    Enable 勾选 -> Proxy选项选择direct -> VendoringMode 选择关闭即可

  5. 然后进行愉快的Build即可,跟以前项目的使用没有任何差异。

Linux环境配置

  1. 如果是使用Goland IDE 按上面的操作即可,上面Windows的操作在git shell中执行的,可以直接使用shell执行即可

FAQ

开发中依赖变动导致build 失败情况

  1. 直接更新 go.mod 依赖版本
    在目前开发中如果依赖被改动导致build失败的情况下(如phoenix中的引用swan,swan被改动的情况下)目前的的处理方法是切到swan,然后拉取代码。
    但是切到GoModule后,对于上面的情况我们可以直接在phoenix项目中,使用
    1
    go get gitlab.xinghuolive.com/birds-backend/swan@develop

即可以解决上面的依赖问题。不用切换到swan去进行代码拉取。

  1. 使用 Replace 来进行
    1
    go mod edit -replace github.com/pselle/bar=/Users/pselle/Projects/bar

Reference

  1. GOMODULE-Wiki
  2. GIT-HTTPS
  3. GoModule提案
  4. GoModule小结
  5. 七牛云GOPROXY的实现
  6. 阿里云GOPROXY的实现

git-rebase

git commit 整理小提示

问题

发现目前代码提交的时候,有同事其实可以把debug用的commit可以与实际的改动合成一个[虽然更加建议的是用单元测试和TDD来做开发],但是并没有这样做,导致每次的合入多合入了不少没用的Commit信息。

引导作用

希望可以通过这篇小分享可以达到把Git commit log与历史操作的意图能映射起来,能够快速找出可能当时修改的需求是什么,或者方便找出大概是哪些commit修改过哪部分的模块,达到快速定位,而不用全部靠经验来去问,减少沟通成本的开销

使用工具

git rebase -i $branch_name

branch_name是需要rebase到的分支,可以理解为提交MR的Target分支即可

操作例子

下面会以简单的场景作为例子来展示

上图中有三个commit,本质上是对同一个功能进行修改。但是实际上只有最后一个Commit是有意义的,因此我们的目的是通过改写commit把三个commit的commitlog合成一个,并且保存三个commit的改动的代码。

执行命令后会弹出下图界面

1
git rebase -i develop

根据我们的目的,应该把前置的东西修改成如下状态,然后用Vim的方法进行保存。

(之后会对下面的状态会进行补充说明。
此处描述一下操作的目的,由于git的限制,第一个commit必须不能为Fixup的状态,所以我们实际上的操作要变成把第一个commit的commit log 改写,然后把后面两个的commit 保留代码修改,丢弃后两个的commit log )

然后就会rebase中的状态

查看git status

我们应该使用git commit –amend来继续commit的修改

对其进行修改,我们把红色圈圈的部分修改为下图(实际上即把第三个commit log 的msg作为修改值),然后保存

然后继续

查看git log 

我们成功把三个commit合成了一个commit,并且把commit log改写成了我们想要的样子

Git rebase 参数讲解

pick 代表的是直接复用commit
edit  使用commit,但是需要commiter手动修改commit msg 
reword  使用commit,并且使用在此处修改后的commit msg ( 可以理解为edit + git commit –amend的连击操作)
squash  使用commit,并且把commit msg 直接追加到上一个commit ,如果上图中第二个改写为s,出现的最终commitlog应该为(注意生成结果是一个commit)

1
2
add: translation in output
debug: add log run success

fixup  使用commit,并且丢弃其commit msg ,保留代码的变动
exec  执行其他的shell命令
break 可以理解为打断点,就如果对某个commit的改动不太确定可以通过break停在某个commit上,使用–continue即可继续rebase的流程
drop  丢弃commit
label  对此commit打标签
reset  把head重置到此,这里的reset 是软reset,即此commit后的修改都会保留到git 工作区中,不会消失
merge 直接merge其他的分支或者commit

操作流程整理

  1. 拉取最新的TargetBranch
  2. 切换到开发分支
  3. 执行rebase -i操作 ,根据实际情况来改写commit

PS

  1. 对于是从最新的TargetBranch通过git flow 创建出来的分支也是可以使用rebase的命令来整理commit的

Prometheus(1)- 数据抓取源码阅读

目录

  1. 使用的目的
  2. 代码实现

代码版本

基于 prometheus项目的master branch的5a554df0855bf707a8c333c0dd830067d03422cf commit

使用目的

Prometheus 是一个基于Pull模型所进行数据采集的系统,因此,需要在主体项目中有一个抓取数据的模块,而Scrape就是这样的模块。因此这个也是Prometheus的一个主要部分。

1
2
3
4
5
6
7
8
入口代码部分
main.go
// line 356
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
// line 427
scrapeManager.ApplyConfig
// line 555
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())

代码实现

整体的流程图

目录结构

1
2
3
4
5
6
7
8
-rw-r--r-- 1 ray 197121  2239 9月  30 16:09 helpers_test.go
-rw-r--r-- 1 ray 197121 7543 9月 30 16:09 manager.go //主要的控制的模块Manager
-rw-r--r-- 1 ray 197121 10727 9月 30 16:09 manager_test.go
-rw-r--r-- 1 ray 197121 35749 9月 30 16:09 scrape.go // 主要进行采集的模块
-rw-r--r-- 1 ray 197121 40682 9月 30 16:09 scrape_test.go
-rw-r--r-- 1 ray 197121 11743 9月 30 16:09 target.go // 抓取的公用部分的逻辑
-rw-r--r-- 1 ray 197121 9542 9月 30 16:09 target_test.go

重要数据结构描述

ScrapeManager 是管理所有抓取的一个抽象

1
2
3
4
5
6
7
8
9
10
11
12
13
type Manager struct {
logger log.Logger
append Appendable
graceShut chan struct{}

jitterSeed uint64 // Global jitterSeed seed is used to spread scrape workload across HA setup.
mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
targetSets map[string][]*targetgroup.Group

triggerReload chan struct{}
}

ScrapePools 是单个的Job的抓取目标的工作单位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
appendable Appendable
logger log.Logger

mtx sync.RWMutex
config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same
// set of hashes.
activeTargets map[uint64]*Target
droppedTargets []*Target
loops map[uint64]loop
cancel context.CancelFunc

// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(scrapeLoopOptions) loop

loop是单个Target的执行单位,是一个接口。在这里主要使用的是ScrapeLoop的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type loop interface {
run(interval, timeout time.Duration, errc chan<- error)
stop()
}

type scrapeLoop struct {
scraper scraper
l log.Logger
cache *scrapeCache
lastScrapeSize int
buffers *pool.Pool
jitterSeed uint64
honorTimestamps bool

appender func() storage.Appender
sampleMutator labelsMutator
reportSampleMutator labelsMutator

parentCtx context.Context
ctx context.Context
cancel func()
stopped chan struct{}
}

scraper接口时具体的执行单位,scrapeLoop也是调用scraper的方法来进行数据的抓取, Prometheus默认使用targetScraper去抓取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type scraper interface {
scrape(ctx context.Context, w io.Writer) (string, error) // 抓取数据的方法
report(start time.Time, dur time.Duration, err error) // 上报数据的方法
offset(interval time.Duration, jitterSeed uint64) time.Duration // 记录数据偏移的方法
}

type targetScraper struct {
*Target //包含了report和offset方法

client *http.Client //因为Prometheus的exporter是以http接口进行数据的暴露的,所以会有httpclient的结构包含在里面
req *http.Request
timeout time.Duration

gzipr *gzip.Reader
buf *bufio.Reader
}


主协程逻辑

跟着调用的部分,我们先从初始化后的manager的ApplyConfig方法开始看起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()

c := make(map[string]*config.ScrapeConfig)

for _, scfg := range cfg.ScrapeConfigs {
c[scfg.JobName] = scfg
}
m.scrapeConfigs = c
// 使用全局配置来生成一个集群内不重复的seed
if err := m.setJitterSeed(cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}

// Cleanup and reload pool if the configuration has changed.
var failed bool
// 根据解析出来的配置生成对应的ScrapePool, 如果有并且数据没有改变的话,那就不进行操作,否则
for name, sp := range m.scrapePools {
if cfg, ok := m.scrapeConfigs[name]; !ok {
sp.stop()
delete(m.scrapePools, name)
} else if !reflect.DeepEqual(sp.config, cfg) {
err := sp.reload(cfg)
if err != nil {
level.Error(m.logger).Log("msg", "error reloading scrape pool", "err", err, "scrape_pool", name)
failed = true
}
}
}

if failed {
return errors.New("failed to apply the new configuration")
}
return nil
}

func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
targetScrapePoolReloads.Inc()
start := time.Now()

sp.mtx.Lock()
defer sp.mtx.Unlock()

client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false)
if err != nil {
targetScrapePoolReloadsFailed.Inc()
return errors.Wrap(err, "error creating HTTP client")
}
sp.config = cfg
oldClient := sp.client
sp.client = client

var (
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
)

for fp, oldLoop := range sp.loops {
var (
t = sp.activeTargets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
newLoop = sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
limit: limit,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})
)
wg.Add(1)

go func(oldLoop, newLoop loop) {
oldLoop.stop()
wg.Done()

go newLoop.run(interval, timeout, nil)
}(oldLoop, newLoop)

sp.loops[fp] = newLoop
}

wg.Wait()
oldClient.CloseIdleConnections()
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(start).Seconds(),
)
return nil
}

  1. 把解析好的配置,遍历,变为一个jobName为key,配置值为Value的map
  2. 对比自己的配置,如果之前已经存在但是配置发生变动的,则去reload scraper pool的配置。
  3. (Reload) 如果需要reload配置的情况下,会重新生成scrapePool后,派生多一个线程去执行scraperPool.Sync(),知道Manager的targetSet被遍历完为止。Sync方法的内容会后面进行详细讲解。

Run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
m.updateTsets(ts)

select {
case m.triggerReload <- struct{}{}:
default:
}

case <-m.graceShut:
return nil
}
}
}


func (m *Manager) reloader() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}

功能:

  1. 等待从Main.go中传入的discoveryManager的SyncCh是否有变动,如果有变动,更新Targetset。
  2. 派生出了一个Reloader协程,Reloader协程会定时检查是否有关闭的信号或者Reload信号(triggerReload channel,就是外部给与主协程的刺激产生的二级信号),如果有,则执行reload操作。

子协程逻辑

ScraperPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
start := time.Now()

var all []*Target
sp.mtx.Lock()
sp.droppedTargets = []*Target{}
for _, tg := range tgs {
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
continue
}
for _, t := range targets {
if t.Labels().Len() > 0 {
all = append(all, t)
} else if t.DiscoveredLabels().Len() > 0 {
sp.droppedTargets = append(sp.droppedTargets, t)
}
}
}
sp.mtx.Unlock()
sp.sync(all)

targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

Sync函数是一个对外暴露函数的接口:

  1. 把配置解析出来的target结构化。
  2. 调用内部方法sync()来进行数据抓取的执行
  3. 一些计数器添加计数

值得注意的是Append方法,是一个封装了的方法,是同是进行对变量的修改,并且包含了采集到的数据持久化的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
sp.mtx.Lock()
defer sp.mtx.Unlock()

var (
uniqueTargets = map[uint64]struct{}{}
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit)
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
)

for _, t := range targets {
t := t
hash := t.hash()
uniqueTargets[hash] = struct{}{}

if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
limit: limit,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
mrc: mrc,
})

sp.activeTargets[hash] = t
sp.loops[hash] = l

go l.run(interval, timeout, nil)
} else {
// Need to keep the most updated labels information
// for displaying it in the Service Discovery web page.
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
}
}

var wg sync.WaitGroup

// Stop and remove old targets and scraper loops.
for hash := range sp.activeTargets {
if _, ok := uniqueTargets[hash]; !ok {
wg.Add(1)
go func(l loop) {

l.stop()

wg.Done()
}(sp.loops[hash])

delete(sp.loops, hash)
delete(sp.activeTargets, hash)
}
}

// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
wg.Wait()
}

主要逻辑:

  1. 把传入的Target列表进行遍历
    1.1 如果target不在active的map中, 生成targetScraper,然后把targetScraper放入Loop里面,调用Loop.run()在协程中进行逻辑
    1.2 否则, 会先删除旧的协程,然后重新生成协程。

ScraperLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
select {
case <-time.After(sl.scraper.offset(interval, sl.jitterSeed)):
// Continue after a scraping offset.
case <-sl.ctx.Done():
close(sl.stopped)
return
}

var last time.Time

ticker := time.NewTicker(interval)
defer ticker.Stop()

mainLoop:
for {
select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
default:
}

var (
start = time.Now()
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
)

// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}

b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
buf := bytes.NewBuffer(b)

contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
cancel()

if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
}

// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
total, added, seriesAdded, appErr := sl.append(b, contentType, start)
if appErr != nil {
level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
level.Warn(sl.l).Log("msg", "append failed", "err", err)
}
}

sl.buffers.Put(b)

if scrapeErr == nil {
scrapeErr = appErr
}

if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err)
}
last = start

select {
case <-sl.parentCtx.Done():
close(sl.stopped)
return
case <-sl.ctx.Done():
break mainLoop
case <-ticker.C:
}
}

close(sl.stopped)

sl.endOfRunStaleness(last, ticker, interval)
}

ScraperLoop是单个Target进行获取的执行单位,协程使用死循环进行占用,然后调用scraper接口的Scrape方法去抓取数据,并且调用Stroage模块的Appender的接口金属数据的持久化,然后继续定时休眠的过程。我们需要更加具体的看一下实例Scraper的Scrape方法。

ScraperLoop把Scraper抽象出来的三个接口都进行了调用:

  1. 开始部分的Select代码段中的Offset是用于控制第一次执行的时候等待的间隔
  2. Scrape方法就是直接进行数据的抓取,下面有详细解析
  3. report方法,修改Scraper中Target自己保存的状态。

TargerScraper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
if s.req == nil {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return "", err
}
req.Header.Add("Accept", acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", userAgentHeader)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))

s.req = req
}

resp, err := s.client.Do(s.req.WithContext(ctx))
if err != nil {
return "", err
}
defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return "", errors.Errorf("server returned HTTP status %s", resp.Status)
}

if resp.Header.Get("Content-Encoding") != "gzip" {
_, err = io.Copy(w, resp.Body)
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}

if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return "", err
}
} else {
s.buf.Reset(resp.Body)
if err = s.gzipr.Reset(s.buf); err != nil {
return "", err
}
}

_, err = io.Copy(w, s.gzipr)
s.gzipr.Close()
if err != nil {
return "", err
}
return resp.Header.Get("Content-Type"), nil
}

Scrape方法是使用HttpClient进行对target url 的数据抓取,抓取的内容在context中进行传递,得到返回后,继续解析,返回给ScraperLoop的Run方法使用。

Prometheus服务发现源码阅读

目录

  1. 使用的目的
  2. 代码实现
    2.1 主协程逻辑
    2.2 子协程逻辑

代码版本

基于 prometheus项目的master branch的5a554df0855bf707a8c333c0dd830067d03422cf commit

使用目的

服务发现是Prometheus中最重要的功能之一,因为它是支撑Prometheus可以在容器的环境下的最重要的功能。因为应用的容器部署的弹性和有效的时长远与传统的基于服务器(无论是实体机还是虚拟机[OpenStack]这一类的机器)部署, 都会变动的更加快。因此为了适应这种弹性大、变化快的环境,它需要基于不同平台来支持服务发现这个功能。

Prometheus的服务发现模块是在prometheus/discovery的目录下面,它在Prometheus中的体系支撑了采集器的发现和AlertManager的发现。
可以看下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//  此代码在cmd/main.go中的 350-352行
ctxScrape, cancelScrape = context.WithCancel(context.Background())
discoveryManagerScrape = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape"))

ctxNotify, cancelNotify = context.WithCancel(context.Background())
discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))

// 具体注入到两个功能的使用
/*
cmd/main.go line 555, 把Manager的输出的信道传递给scrapeManager,然后后面就会去restore scrape的方式。
后面会有单独文章写Scrape(抓取数据)的部分
*/
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())

/*
cmd/main.go line 726, 把Manager的输出的信道传递给notifierManager,然后后面就会去获取更新alertManager组件的方式。
后面会有单独文章写Notifer(抓取数据)的部分
*/
notifierManager.Run(discoveryManagerNotify.SyncCh())

// 在main.go line 735, run Group 会执行上面注册在g 里面的这两个discoveryManager的Run方法。

所以整体的代码执行的入口流程

  1. 在main.go实例化两个discoveryManager来进行服务发现的操作
  2. 把注入的discovery config进行解析,并且生成对应的Provider协程
  3. 主协程执行上面两个discoveryManager的Run方法

代码实现

模块的运行逻辑图

主协程的逻辑

  1. 先去读取服务发现相关的配置,调用ApplyConfig()读取存在的service discovery的方式,并且加载到NewManager的结构体中。可以同同时支持多个service Discovery的方式。使用add函数把配置变为Providers结构体。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    add := func(cfg interface{}, newDiscoverer func() (Discoverer, error)) {
    t := reflect.TypeOf(cfg).String()
    for _, p := range m.providers {
    if reflect.DeepEqual(cfg, p.config) {
    p.subs = append(p.subs, setName)
    added = true
    return
    }
    }

    d, err := newDiscoverer()
    if err != nil {
    level.Error(m.logger).Log("msg", "Cannot create service discovery", "err", err, "type", t)
    failedCount++
    return
    }

    provider := provider{
    name: fmt.Sprintf("%s/%d", t, len(m.providers)),
    d: d,
    config: cfg,
    subs: []string{setName},
    }
    m.providers = append(m.providers, &provider)
    added = true
    }
    // 支持多种方式的配置 具体可以看代码的discovery/manager.go 356-417行
  2. 然后为每个Provider生成两个协程去进行服务发现的功能。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // discovery/manager.go line 223-226
    func (m *Manager) startProvider(ctx context.Context, p *provider) {
    level.Debug(m.logger).Log("msg", "Starting provider", "provider", p.name, "subs", fmt.Sprintf("%v", p.subs))
    ctx, cancel := context.WithCancel(ctx)
    // 此处Update是执行协程与主协程的交流通道,传输Watch的变动
    updates := make(chan []*targetgroup.Group)
    m.discoverCancel = append(m.discoverCancel, cancel)

    // 执行服务发现的Watch
    go p.d.Run(ctx, updates)
    // 把每个协程生成的updates放入到Manager中,使得子协程中发现到服务的变动的时候可以通知到主协程
    go m.updater(ctx, p, updates)
    }

    // discovery/manager.go line 205-207
    for _, prov := range m.providers {
    m.startProvider(m.ctx, prov)
    }
    ApplyConfig完成后,Run方法如下
    1
    2
    3
    4
    5
    6
    7
    8
    func (m *Manager) Run() error {
    go m.sender()
    for range m.ctx.Done() {
    m.cancelDiscoverers()
    return m.ctx.Err()
    }
    return nil
    }
    Manager会派生多一个协程去定时检查配置的变动,本质上是检查上面派生的updater协程时候有传入信号到triggerchan中,主协程就是为了防止泄露一直去遍历context的是否done,保持主协程阻塞,维持这个模块的运行。我们主要看一下Manager.Updater()和Manager.Sender()这两个函数。

此处 Manager.trigglechan is 长度为1的buffered channel。updates := make(chan []*targetgroup.Group), 是unbuffered channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
receivedUpdates.WithLabelValues(m.name).Inc()
if !ok {
level.Debug(m.logger).Log("msg", "discoverer channel closed", "provider", p.name)
return
}

for _, s := range p.subs {
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
}

select {
case m.triggerSend <- struct{}{}:
default:
}
}
}
}

func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()

for {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
select {
case <-m.triggerSend:
sentUpdates.WithLabelValues(m.name).Inc()
select {
case m.syncCh <- m.allGroups():
default:
delayedUpdates.WithLabelValues(m.name).Inc()
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}:
default:
}
}
default:
}
}
}
}

步骤

  1. Provider 传入变化了的update到update channel中
  2. Manager.Updater协程收集到了变化的内容后,修改Manager里面Group的内容。塞入信号到triggerSend channel中
  3. Manager.Sender协程定时去进行获取通知,检查到triggerSend Channel中可以获取之后,就会尝试塞到syncCh中,使得订阅者可以收到这个消息。(即Main里面的两个放入syncCh方法的ScrapeManager和NotiferManager可以收到)

然后整个的流程就是这样

派生协程的逻辑

Provider协程的逻辑

1
2
3
4
5
6
7
type Discoverer interface {
// Run hands a channel to the discovery provider (Consul, DNS etc) through which it can send
// updated target groups.
// Must returns if the context gets canceled. It should not close the update
// channel on returning.
Run(ctx context.Context, up chan<- []*targetgroup.Group)
}

Provider协程主要时根据具体注入的配置来生成出与对应的系统进行服务发现的能力。每个Provider中的Discoverer都实现了自己对应的Run方法即上面p.d.Run()的方法。(如何生成上面已经描述,此处不再复述)

下面我们简单的看一下其中两个Provider,基于配置文件的Provider和基于zookeeper的Provider来看看具体的流程是怎样处理的。
对于Provider,我们可以理解为一个watch&notify的模型,但是是基于不同平台给予的Api继续watch&notify的操作。

FileProvider执行

FileProvider支持解析json和yaml的格式内容

1
2
3
4
5
6
7
8
9
// discovery/file/file.go line 39-46
var (
patFileSDName = regexp.MustCompile(`^[^*]*(\*[^/]*)?\.(json|yml|yaml|JSON|YML|YAML)$`)

// DefaultSDConfig is the default file SD configuration.
DefaultSDConfig = SDConfig{
RefreshInterval: model.Duration(5 * time.Minute),
}
)

下面我们看看主要实现Discoverer的Run方法的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
level.Error(d.logger).Log("msg", "Error adding file watcher", "err", err)
return
}
d.watcher = watcher
defer d.stop()

// 协程内第一次执行把conf添加到discoverer中
d.refresh(ctx, ch)

ticker := time.NewTicker(d.interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return

case event := <-d.watcher.Events:
// fsnotify sometimes sends a bunch of events without name or operation.
// It's unclear what they are and why they are sent - filter them out.
if len(event.Name) == 0 {
break
}
// Everything but a chmod requires rereading.
if event.Op^fsnotify.Chmod == 0 {
break
}
// Changes to a file can spawn various sequences of events with
// different combinations of operations. For all practical purposes
// this is inaccurate.
// The most reliable solution is to reload everything if anything happens.
d.refresh(ctx, ch)

case <-ticker.C:
// Setting a new watch after an update might fail. Make sure we don't lose
// those files forever.
d.refresh(ctx, ch)

case err := <-d.watcher.Errors:
if err != nil {
level.Error(d.logger).Log("msg", "Error watching file", "err", err)
}
}
}
}

上面的死循环可以看出,对于File Discoverer, 它支持两种的Watch的方式,一个是定时监控(ticker.C),一个是事件触发监控(event := <-d.watcher.Events)。

所以重点是在于Refresh函数,本质上就是一个重新解析并且检查文件内容是否有变动的函数,解析完文件之后就可以传出配置到对应的update channel中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (d *Discovery) refresh(ctx context.Context, ch chan<- []*targetgroup.Group) {
t0 := time.Now()
defer func() {
fileSDScanDuration.Observe(time.Since(t0).Seconds())
}()
ref := map[string]int{}
// 把初始化传入的配置的路径进行检查
for _, p := range d.listFiles() {
// 把单个文件进行解析,获取出里面的targets
tgroups, err := d.readFile(p)
if err != nil {
fileSDReadErrorsCount.Inc()

level.Error(d.logger).Log("msg", "Error reading file", "path", p, "err", err)
// Prevent deletion down below.
ref[p] = d.lastRefresh[p]
continue
}
select {
// 把新传入的传入到Update中
case ch <- tgroups:
case <-ctx.Done():
return
}

ref[p] = len(tgroups)
}
// Send empty updates for sources that disappeared.
for f, n := range d.lastRefresh {
m, ok := ref[f]
if !ok || n > m {
level.Debug(d.logger).Log("msg", "file_sd refresh found file that should be removed", "file", f)
d.deleteTimestamp(f)
for i := m; i < n; i++ {
select {
case ch <- []*targetgroup.Group{{Source: fileSource(f, i)}}:
case <-ctx.Done():
return
}
}
}
}
d.lastRefresh = ref

d.watchFiles()
}

zookeeperProvider执行

Zookeeper的Provider与上面的模式类似,差别在于:

  1. 初始化的时候需要创建zookeeper的链接(配置中需要传入配置好zookeeper相关的配置)
  2. 分为了ServerSetPoint和NerveEndPoint两个类型的Discovery,因此抽象了一个Discovery的结构体
  3. 没有了定时检查(因为不存在文件系统问题的)这种检查的方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
type Discovery struct {
conn *zk.Conn

sources map[string]*targetgroup.Group

updates chan treecache.ZookeeperTreeCacheEvent
pathUpdates []chan treecache.ZookeeperTreeCacheEvent
treeCaches []*treecache.ZookeeperTreeCache

parse func(data []byte, path string) (model.LabelSet, error)
logger log.Logger
}

func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer func() {
for _, tc := range d.treeCaches {
tc.Stop()
}
for _, pathUpdate := range d.pathUpdates {
// Drain event channel in case the treecache leaks goroutines otherwise.
for range pathUpdate {
}
}
d.conn.Close()
}()

for _, pathUpdate := range d.pathUpdates {
go func(update chan treecache.ZookeeperTreeCacheEvent) {
for event := range update {
select {
case d.updates <- event:
case <-ctx.Done():
return
}
}
}(pathUpdate)
}

for {
select {
case <-ctx.Done():
return
case event := <-d.updates:
tg := &targetgroup.Group{
Source: event.Path,
}
if event.Data != nil {
labelSet, err := d.parse(*event.Data, event.Path)
if err == nil {
tg.Targets = []model.LabelSet{labelSet}
d.sources[event.Path] = tg
} else {
delete(d.sources, event.Path)
}
} else {
delete(d.sources, event.Path)
}
select {
case <-ctx.Done():
return
case ch <- []*targetgroup.Group{tg}:
}
}
}
}

Run的流程

  1. 给每一个需要检查的路径派生一个协程
  2. 死循环获取是否有更新
  3. 如果有,则使用传入的parse函数去进行解析,然后把结果发送到update的channel中,即ch变量中

Effective Go Reading

本文章的目的是为了详细阅读和理解Effective GO所提及到的内容。

目录

  1. Method
    1.1 Pointers vs Values
  2. Data
    2.1 New vs Make
    2.2 Array
    2.3 Slice
    2.4 Map
    2.5 Append
  3. Interface
  4. Error
  5. PackageInit
  6. Defer
  7. ShareNote

Method

Pointers vs Values

主要的区别,如果方法是放在类型值上面而不是指针上面的,可以通过指针和普通类型来进行使用。但是对于方法绑定的是指针的类型,只能通过指针来进行使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main

import "fmt"

type ByteSlice []byte

func (slice ByteSlice) Append(data []byte) []byte {
// Body exactly the same as the Append function defined above.
l := len(slice)
if l+len(data) > cap(slice) { // reallocate
// Allocate double what's needed, for future growth.
newSlice := make([]byte, (l+len(data))*2)
// The copy function is predeclared and works for any slice type.
copy(newSlice, slice)
slice = newSlice
}
slice = slice[0 : l+len(data)]
copy(slice[l:], data)
return slice
}

func (p *ByteSlice) Append2(data []byte) {
slice := *p
// Body as above, without the return.
l := len(slice)
if l+len(data) > cap(slice) { // reallocate
// Allocate double what's needed, for future growth.
newSlice := make([]byte, (l+len(data))*2)
// The copy function is predeclared and works for any slice type.
copy(newSlice, slice)
slice = newSlice
}
slice = slice[0 : l+len(data)]
copy(slice[l:], data)
*p = slice
}

func (p *ByteSlice) Write(data []byte) (n int, err error) {
slice := *p
// Body as above, without the return.
l := len(slice)
if l+len(data) > cap(slice) { // reallocate
// Allocate double what's needed, for future growth.
newSlice := make([]byte, (l+len(data))*2)
// The copy function is predeclared and works for any slice type.
copy(newSlice, slice)
slice = newSlice
}
slice = slice[0 : l+len(data)]
copy(slice[l:], data)
*p = slice
*p = slice
return len(data), nil
}

func main() {
var b ByteSlice
b = b.Append([]byte{1, 2, 3})
fmt.Printf("byteSlice 1 is %v", b)
b.Write([]byte{7, 8, 9})
fmt.Printf("byteSlice 2 is %v", b)
/*
func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error)
*/
fmt.Fprintf(&b, "This hour has %d days\n", 7)
/*
if use below code , will throw error
fmt.Fprintf(b, "This hour has %d days\n", 7)
*/
b.Append2([]byte{4, 5, 6})
fmt.Printf("byteSlice 3 is %v", b)
}

上面的例子表达的是,假如直接像注释的代码那样,把B传入一个满足io.Writer的指针的方法中(含有Write方法),但是因为我们的自定义类型上面的Write是指针方法,而不是类型方法,所以会出现类型报错的问题。报错如下

1
2
as type io.Writer in argument to fmt.Fprintf:
ByteSlice does not implement io.Writer (Write method has pointer receiver)

根据官方的描述,原文如下:

1
2
3
The rule about pointers vs. values for receivers is that value methods can be invoked on pointers and values, but pointer methods can only be invoked on pointers.

This rule arises because pointer methods can modify the receiver; invoking them on a value would cause the method to receive a copy of the value, so any modifications would be discarded. The language therefore disallows this mistake. There is a handy exception, though. When the value is addressable, the language takes care of the common case of invoking a pointer method on a value by inserting the address operator automatically. In our example, the variable b is addressable, so we can call its Write method with just b.Write. The compiler will rewrite that to (&b).Write for us.

翻译一下:
对于接收者类型是指针还是值的规则,值接收者可以 被值或者指针进行调用。而指针方法只能被指针进行调用。

这个规则的产生的原因是因为指针方法可以修改接受者的值。 但是以值的方式继续调用的情况下,go是会自动把值复制一份,然后继续方法的调用,所有对于里面变量得修改都会被丢弃(因为是值传递,除非使用Return +调用的地方有返回值接收)。 因此语言不允许有这种的错误出现。当值是可以获得地址的情况下,语言会自动把值获取指针传入指针调用的方法里面。

Data

New vs Make

New

New 是 Go里面的分配内存的方法,但是它只会创建一个Zero值(即创建一个0的空间给对应的内存,并且返回这个变量所占有的内存地址)。

由于由new出来的内存占用为0。这样对于设计你自己的数据结构很有帮助,原因是因为可以默认初始化了0的值,而不用之后再去进行二次的初始化。

官网上的例子:

1
2
3
4
5
6
7
8
9
For example, the documentation for bytes.Buffer states that "the zero value for Buffer is an empty buffer ready to use." Similarly, sync.Mutex does not have an explicit constructor or Init method. Instead, the zero value for a sync.Mutex is defined to be an unlocked mutex.

The zero-value-is-useful property works transitively. Consider this type declaration.

type SyncedBuffer struct {
lock sync.Mutex
buffer bytes.Buffer
}

但是有些时候直接初始化0值不要足够,需要一个构建者。像这个例子一样

1
2
3
4
5
6
7
8
9
10
11
func NewFile(fd int, name string) *File {
if fd < 0 {
return nil
}
f := new(File)
f.fd = fd
f.name = name
f.dirinfo = nil
f.nepipe = 0
return f
}

因为上面这段代码有比较多的参数,因此我们可以用一个命名的字段来继续初始化

1
2
3
4
5
6
7
func NewFile(fd int, name string) *File {
if fd < 0 {
return nil
}
f := File{fd, name, nil, 0}
return &f
}

事实上,直接获取复合文字初始化的结构体的地址,实际上会创建一个全新的实例并且赋值,因此我们可以把上面的最后两行代码合成一行代码

1
2
3
4
5
f := File{fd, name, nil, 0}
return &f
===

return &File{fd, name, nil, 0}

用于和范围: 适用于创建数组、切片、映射(with the field labels being indices or map keys as appropriate)。

返回值:
一个对应类型的指针

Make

Make 可以用于创建并且返回一个非nil的值。
适用范围:
切片、映射、channel

1
make([]int,10,100)  // 这样是创建了一个容量为100,但是填入了10个0的切片。

官网上面对make的描述

1
For slices, maps, and channels, make initializes the internal data structure and prepares the value for use.

返回值:
一个对应类型的数据

Diff

区别上面的Make和New的目的是为了,对于切片、映射、channel这三种类型,在底层的实现原理中都必须要先创建一个底层的实现才能被引用到,才能在暴露给语言的使用者上面不会抛出错误。
所以本质上,new创建出来的可以理解为一个nil的对象,而make创建出来的是一个带有底层数据结构,并且有非nil的对象。
而且返回值有所不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import "fmt"

func main(){
// allocates slice structure; *p == nil; rarely useful
var p *[]int = new([]int)
// the slice v now refers to a new array of 100 ints
var v []int = make([]int, 5)
fmt.Printf("p values is %v,%v\n", p, *p==nil)
fmt.Printf("v values is %v, %v\n", v, v==nil)

}

------------------
p values is &[],true
v values is [0 0 0 0 0], false

Array

数组在Go的三个特点:

  1. 如果直接把数据传入到一个函数中,则是把这个数组的值拷贝一份,然后把拷贝的副本传到函数中进行使用。
  2. 数组的长度也是它的类型属性之一,[10]int和[20]int不是等价的。
  3. Array都是值。

而且有一个使用的小技巧,如果想要减少传递的数据的量,可以直接传入指针,这样可以免于拷贝多一份中间的数据。
对于Go来说,因为数组类型支持的方法比较少,而且不能够通过动态去进行特定长度数组创建。因此更加建议的是用Slice来代替数组。

Slice

可以看我的博客另外一篇的文章Go Slice上面有提及高级的用法,此处不再重复。

二维数组和二维切片

对于二维数组的声明,可以使用这样的方法

1
2
3
4
5
6
7
8
type Transform [3][3]float64  // A 3x3 array, really an array of arrays.
type LinesOfText [][]byte // A slice of byte slices.

text := LinesOfText{
[]byte("Now is the time"),
[]byte("for all good gophers"),
[]byte("to bring some fun to the party."),
}

如果使用Make来继续初始化的情况。需要考虑两个不同的使用场景导致的初始化方式的不同。
第一种:如果内部的一位数组可能发生扩展或者收缩,那么就要单独去分配这个一位数组。
第二种: 如果内部的数组长度不会发生size变化而只会发生值得变化,则可以进行统一得初始化。(这样性能会比较好,因为可以一次性的调用Allocate的系统函数,减少分配内存的开销)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// First Method
picture := make([][]uint8, YSize)
for i:= range picture{
picture[i] = make([]uint8, XSize)
}

// Second Method
picture := make([][]uint8, YSize)
pixels := make([]uint8, XSize * YSize)

for i := range picture{
picture[i], pixels = pixels[:XSize], pixels[XSize:]
}

Map

主要注意当一个Key不存在与一个Map中的情况下,需要使用这种方法来进行判断

1
2
3
4
5
var tz map[string]int
var ds string = "abc"
if val, ok := tz[ds]; ok{
return val
}

同理在上面的例子上面,如果反过来进行使用,可以用于判断Key是否存在于Map中。

1
2
3
4
5
6
var tz map[string]int
var ds string = "abc"
_, exist := tz[ds]
if !exist {
return "is not exist"
}

如果需要删除一个值的情况下,使用delete的函数进行处理

1
2
3
4
var tz map[string]int
var ds string = "abc"
// delete(map, key)
delete(tz, ds)

Append

Append 可以接受多个参数

1
2
3
4
5
6
func append(slice []T, elements ...T) []T

// use case
x := []int{1,2,3}
x = append(x, 4, 5, 6)
fmt.Println(x)

对于如果要把两个数组直接接起来的情况下

1
2
3
4
x := []int{1,2,3}
y := []int{4,5,6}
x = append(x, y...)
fmt.Println(x)

InterFace

Interface

一个结构体可以实现多个接口,只要它实现了那些接口所定义的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Sequence []int

// Methods required by sort.Interface.
func (s Sequence) Len() int {
return len(s)
}
func (s Sequence) Less(i, j int) bool {
return s[i] < s[j]
}
func (s Sequence) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

// Copy returns a copy of the Sequence.
func (s Sequence) Copy() Sequence {
copy := make(Sequence, 0, len(s))
return append(copy, s...)
}

// Method for printing - sorts the elements before printing.
func (s Sequence) String() string {
s = s.Copy() // Make a copy; don't overwrite argument.
sort.Sort(s)
str := "["
for i, elem := range s { // Loop is O(N²); will fix that in next example.
if i > 0 {
str += " "
}
str += fmt.Sprint(elem)
}
return str + "]"
}

Coversions

上面所引用的到方法其实是重新实现了fmt包里面的Sprint方法。我们可以用一些方法来为这个提速,在调用Sprint之前把数据转换成[]int类型,因为Sequence的本质上就
是[]int。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 修改前
func (s Sequence) String() string {
s = s.Copy()
sort.Sort(s)
return fmt.Sprint([]int(s))
}

// 修改后
type Sequence []int

// Method for printing - sorts the elements before printing
func (s Sequence) String() string {
s = s.Copy()
sort.IntSlice(s).Sort()
return fmt.Sprint([]int(s))
}

type assertions

使用Type switch 实际上的操作会把那个变量根据分支的判断的类型来转型。
下面这段代码是一个例子,目的是如果不是string类型, 调用其String()进行输出。而如果是,直接输出。

1
2
3
4
5
6
7
8
9
10
11
type Stringer interface {
String() string
}

var value interface{} // Value provided by caller.
switch str := value.(type) {
case string:
return str
case Stringer:
return str.String()
}

如果对于使用场景是单分支,只需要判断接口是否为那个类型的实现的情况下,可以使用

1
2
// 
str, ok := value.(string)

注意的是继续类型推断的情况下,必须要填入实际的类型,不能再填入Interface。

Generality

如果包里面的一个类型只是为了实现接口并且其他方法不需要进行导出给外部使用的情况下。只需要导出接口就好,这样可以避免对于使用者不感兴趣的方法的实现的复杂度。
如官方的Hash模块,crc32.NewIEEE 和 adler32.New这两个方法都是返回 接口类型Hash.Hash32。

interface & method

只要这个类型实现了这个接口的所有方法,即可以把这个类型传入来当接口使用.可以理解为简单的依赖翻转。

Error

Defination

Error 接口在代码里面的定义是这样的。

1
2
3
type error interface {
Error() string
}

如果需要实现一个自定义的Error(添加部分与业务相关的信息)

1
2
3
4
5
6
7
8
9
10
11
// PathError records an error and the operation and
// file path that caused it.
type PathError struct {
Op string // "open", "unlink", etc.
Path string // The associated file.
Err error // Returned by the system call.
}

func (e *PathError) Error() string {
return e.Op + " " + e.Path + ": " + e.Err.Error()
}

并且建议的可行方法,错误信息应该能够表明他们的来源(即发生错误的模块是哪个模块的哪个函数)。
一般来说,函数调用者如果关心具体的错误信息的话,可以使用一个类型switch来获取具体的错误类型和相信信息。

1
2
3
4
5
6
7
8
9
10
11
for try := 0; try < 2; try++ {
file, err = os.Create(filename)
if err == nil {
return
}
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
deleteTempFiles() // Recover some space.
continue
}
return
}

Panic

对于预设以内的错误,返回错误的方式应该是返回错误的信息为多一个参数.

1
2
3
4
5
6

func Get() (string, Error){
return "", nil
}

k, err := Get()

但是对于不可恢复的错误,我们不能让程序继续运行。
Panic()的作用是创建一个Runtime Error并且使得程序无法继续运行。
Panic可以接受任意长度的参数,并且打印到日志上,

1
2
3
4
5
6
7
8
9
10
11
12
func CubeRoot(x float64) float64 {
z := x/3 // Arbitrary initial value
for i := 0; i < 1e6; i++ {
prevz := z
z -= (z*z*z-x) / (3*z*z)
if veryClose(z, prevz) {
return z
}
}
// A million iterations has not converged; something is wrong.
panic(fmt.Sprintf("CubeRoot(%g) did not converge", x))
}

官方对于Panic的态度:程序员应该尽可能的去考虑并且解决所有的异常的情况。
对于真实的代码库上面不建议使用这个方法。

Recover

用于恢复发生Panic的Goroutine。但是必须在panic前面的地方添加一个defer 并且把Recover函数放入其中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work)
}
}

func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Println("work failed:", err)
}
}()
do(work)
}

官方库中处理复杂错误的例子,Regexp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Error is the type of a parse error; it satisfies the error interface.
type Error string
func (e Error) Error() string {
return string(e)
}

// error is a method of *Regexp that reports parsing errors by
// panicking with an Error.
func (regexp *Regexp) error(err string) {
panic(Error(err))
}

// Compile returns a parsed representation of the regular expression.
func Compile(str string) (regexp *Regexp, err error) {
regexp = new(Regexp)
// doParse will panic if there is a parse error.
defer func() {
if e := recover(); e != nil {
regexp = nil // Clear return value.
err = e.(Error) // Will re-panic if not a parse error.
}
}()
return regexp.doParse(str), nil
}

即使上面把regexp 的类型变成了nil。但是如果在e判断类型的时候如果不是Error的类型。程序仍然会发生错误,并且崩溃推出。

1
2
3
if pos == 0 {
re.error("'*' illegal at start of expression")
}

对于上面的这种re-panic的策略,官方建议是在一个包内进行使用,这样就不会把错误暴露给Client。
虽然re-panic最终程序还是崩溃了,但是这样可以使得程序具体的错误可以过滤一层,并且找到更加直接的错误的原因。

PackageInit

可以直接看译文即可。

也可以直接读介绍的原文

Defer

Defer语法是在函数返回前进行调度的清除函数调用。它能够很好地处理多分支返回情况下的释放资源的问题。(类似于Python的With语法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Contents(filename string) (string, error) {
f, err := os.Open(filename)
if err != nil {
return "", err
}
defer f.Close() // f.Close will run when we're finished.

var result []byte
buf := make([]byte, 100)
for {
n, err := f.Read(buf[0:])
result = append(result, buf[0:n]...) // append is discussed later.
if err != nil {
if err == io.EOF {
break
}
return "", err // f will be closed if we return here.
}
}
return string(result), nil // f will be closed if we return here.
}
1
2
3
官方对于Defer的好处声明:
1. 位置更加接近,更好的可以清晰的看出操作
2. 防止资源忘了关闭导致的泄露问题

延迟函数(如果函数是方法,则包括接收方)的参数在延迟执行时而不是在调用执行时进行评估。除了避免担心函数执行时变量会更改值外,这还意味着单个延迟的调用站点可以延迟多个函数的执行。这是一个简单的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
)

func main() {
for i := 0; i < 5; i++ {
defer fmt.Printf("%d ", i)
}
}

-------
output:
4 3 2 1 0

Defer函数的执行吮吸是LIFO(LastInFirstOut)栈的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import "fmt"

func trace(s string) string {
fmt.Println("entering:", s)
return s
}

func un(s string) {
fmt.Println("leaving:", s)
}

func a() {
defer un(trace("a"))
fmt.Println("in a")
}

func b() {
defer un(trace("b"))
fmt.Println("in b")
a()
}

func main() {
b()
}
-------------------
output
entering: b
in b
entering: a
in a
leaving: a
leaving: b

从上面的例子可以看出来,defer函数只能包含一层,如果像上面的代码 defer un(trace()), 那么会先执行trace(),然后再把un()函数压入defer的栈中。

更加实际的使用场景相关的例子

ShareNote

  1. EffectiveGo
  2. GoInitFunc
  3. GoInitFunc译文

SQL复习

目录

  1. Join
    1.1 基本Join类型
    1.2 高级Join类型
  2. 视图和子查询
    2.1 视图
    2.2 子查询
  3. 基本语法
    3.1 Select
    3.2 OrderBy
    3.3 Update
    3.4 Delete
  4. Having & GroupBy
  5. 注意事项
  6. ShareNote

目的

因为之前工作上面一直使用Python的ORM,然后很少使用手写SQL,因此需要回顾一些Sql的用法。

Join

此处所提及的表的数据和表类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
TABLE_A
PK Value
---- ----------
1 FOX
2 COP
3 TAXI
6 WASHINGTON
7 DELL
5 ARIZONA
4 LINCOLN
10 LUCENT

TABLE_B
PK Value
---- ----------
1 TROT
2 CAR
3 CAB
6 MONUMENT
7 PC
8 MICROSOFT
9 APPLE
11 SCOTCH

基本Join类型

InnerJoin

求表A与表B数据的交集部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- INNER JOIN
SELECT A.PK AS A_PK, A.Value AS A_Value,
B.Value AS B_Value, B.PK AS B_PK
FROM Table_A A
INNER JOIN Table_B B
ON A.PK = B.PK

A_PK A_Value B_Value B_PK
---- ---------- ---------- ----
1 FOX TROT 1
2 COP CAR 2
3 TAXI CAB 3
6 WASHINGTON MONUMENT 6
7 DELL PC 7

(5 row(s) affected)

LeftJoin

选取表A的所有数据, 在此例子中,如果表B不存在对应查询的ID的时候,则会填入Null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- LEFT JOIN
SELECT A.PK AS A_PK, A.Value AS A_Value,
B.Value AS B_Value, B.PK AS B_PK
FROM Table_A A
LEFT JOIN Table_B B
ON A.PK = B.PK

A_PK A_Value B_Value B_PK
---- ---------- ---------- ----
1 FOX TROT 1
2 COP CAR 2
3 TAXI CAB 3
4 LINCOLN NULL NULL
5 ARIZONA NULL NULL
6 WASHINGTON MONUMENT 6
7 DELL PC 7
10 LUCENT NULL NULL

(8 row(s) affected)

RightJoin

选取表B的所有数据,在此例子中,如果表A不存在对应查询的ID的时候,则会填入Null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- RIGHT JOIN
SELECT A.PK AS A_PK, A.Value AS A_Value,
B.Value AS B_Value, B.PK AS B_PK
FROM Table_A A
RIGHT JOIN Table_B B
ON A.PK = B.PK

A_PK A_Value B_Value B_PK
---- ---------- ---------- ----
1 FOX TROT 1
2 COP CAR 2
3 TAXI CAB 3
6 WASHINGTON MONUMENT 6
7 DELL PC 7
NULL NULL MICROSOFT 8
NULL NULL APPLE 9
NULL NULL SCOTCH 11

(8 row(s) affected)

OuterJoin

选取表A与表B的数据的全集,当另一张表缺失的情况下,会填补NUll信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- OUTER JOIN
SELECT A.PK AS A_PK, A.Value AS A_Value,
B.Value AS B_Value, B.PK AS B_PK
FROM Table_A A
FULL OUTER JOIN Table_B B
ON A.PK = B.PK

A_PK A_Value B_Value B_PK
---- ---------- ---------- ----
1 FOX TROT 1
2 COP CAR 2
3 TAXI CAB 3
6 WASHINGTON MONUMENT 6
7 DELL PC 7
NULL NULL MICROSOFT 8
NULL NULL APPLE 9
NULL NULL SCOTCH 11
5 ARIZONA NULL NULL
4 LINCOLN NULL NULL
10 LUCENT NULL NULL

(11 row(s) affected)

高级Join类型

LEFT JOIN EXCLUDING INNER JOIN

选择A与B中,A没有与B有交集的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- LEFT EXCLUDING JOIN
SELECT A.PK AS A_PK, A.Value AS A_Value,
B.Value AS B_Value, B.PK AS B_PK
FROM Table_A A
LEFT JOIN Table_B B
ON A.PK = B.PK
WHERE B.PK IS NULL

A_PK A_Value B_Value B_PK
---- ---------- ---------- ----
4 LINCOLN NULL NULL
5 ARIZONA NULL NULL
10 LUCENT NULL NULL
(3 row(s) affected)

RIGHT JOIN EXCLUDING INNER JOIN

选择A与B中,B没有与A有交集的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- RIGHT EXCLUDING JOIN
SELECT A.PK AS A_PK, A.Value AS A_Value,
B.Value AS B_Value, B.PK AS B_PK
FROM Table_A A
RIGHT JOIN Table_B B
ON A.PK = B.PK
WHERE A.PK IS NULL

A_PK A_Value B_Value B_PK
---- ---------- ---------- ----
NULL NULL MICROSOFT 8
NULL NULL APPLE 9
NULL NULL SCOTCH 11

(3 row(s) affected)

OUTER JOIN EXCLUDING INNER JOIN

选择A与B中,A没有与B有交集的部分和B与A没有交集的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- OUTER EXCLUDING JOIN
SELECT A.PK AS A_PK, A.Value AS A_Value,
B.Value AS B_Value, B.PK AS B_PK
FROM Table_A A
FULL OUTER JOIN Table_B B
ON A.PK = B.PK
WHERE A.PK IS NULL
OR B.PK IS NULL

A_PK A_Value B_Value B_PK
---- ---------- ---------- ----
NULL NULL MICROSOFT 8
NULL NULL APPLE 9
NULL NULL SCOTCH 11
5 ARIZONA NULL NULL
4 LINCOLN NULL NULL
10 LUCENT NULL NULL

(6 row(s) affected)

视图与子查询

视图

视图并不是用来保存数据的,而是通过保存读取数据的SELECT 语句的方法来为用户提供便利的工具。

究竟视图是什么呢?如果用一句话概述的话,就是“从SQL 的角度来看视图就是一张表”。实际上,在SQL 语句中并不需要区分哪些是表,哪些是视图。
视图和表的差别:区别只有一个,那就是“是否保存了实际的数据”。
但是使用视图时并不会将数据保存到存储设备之中,而且也不会将数据保存到其他任何地方。实际上视图保存的是SELECT 语句(图5-1)。我们从视图中读取数据时,视图会在内部执行该SELECT 语句并创建出一张临时表。

视图是需要通过create view 的语法来创建的,本质也是满足一定条件的数据的集合。

限制

  1. 视图不可以与Group By 同时使用(可能某些DB会不支持)

子查询

基础定义:子查询就是一次性的视图(SELECT语句)。与视图不同,子查询在SELECT语句执行完毕之后就会消失。

1
2
3
4
select col_1 from (select col2_ from j  where $cond 1) p where $cond2

其中
p为子查询,内容为select col2_ from j where $cond 1 满足这个条件的 j 表数据创建出来的新临时表

标量子查询

而标量子查询则有一个特殊的限制,那就是必须而且只能返回1 行1列的结果。也就是返回表中某一行的某一列的值。
由于返回的是单一的值,因此标量子查询的返回值可以用在= 或者<> 这样需要单一值的比较运算符之中。

标量子查询的书写位置

可以再任意位置: SELECT 子句、GROUP BY 子句、HAVING 子句,还是ORDERBY 子句,几乎所有的地方都可以使用。

基本语法

select

基础的选择操作

1
select col1, col2, col..... from table1... where $cond

Select 中 引用别名列

1
select * from (select sal as salary, comm as commission from emp) x where salary < 5000

上面的这种方法可以处理类似的情况:

  1. 聚合函数(Sum()、Min()、Max())
  2. 标量子查询
  3. 窗口函数
  4. 别名

将含有别名列的查询放入内嵌视图,就可以在外层查询中引用别名列。为什么要这么做
呢? WHERE 子句会比SELECT 子句先执行,就最初那个失败的查询例子而言,当WHERE 子句
被执行时,SALARY 和COMMISSION 尚不存在。直到WHERE 子句执行完毕,那些别名列才会生
效。然而,FROM 子句会先于WHERE 子句执行。如果把最初的那个查询放入一个FROM 子句,
其查询结果会在最外层的WHERE 子句开始之前产生,这样一来,最外层的WHERE 子句就能
“看见”别名列了。当表里的某些列没有被恰当命名的时候,这个技巧尤其有用。

使用条件逻辑

1
2
3
4
5
select col1 , col2 ,
case when col2 <= 2000 then "UnderPAID"
when col2 > 4000 then "OVERPAID"
end as status
from emp

输出的列为[col1, status]

限制返回条数

1
select * from emp limit 1

随机返回固定条数

1
select * from emp order by rand() limit 5 

Null的判断

1
2
3
4
//  判断col1 是否为空
select * from emp where col1 is null
// 判断col2 是否不为空
select * from emp where col2 is not null

Union

1
2
3
select deptno from emp 
union
select deptno from dept

如果Union想获取到重复的条目,则应该使用union all

1
2
3
4
5
// 如果使用union all 的等价实现
select distinct deptno from
(select deptno from emp
union all
select deptno from dept )

对于多表Join查询

合并相关的行

目的:对一个共同的列或者具有相同值的列做连接查询,返回多个表中的行。

1
2
select e.name, d.loc from emp e , dept d 
where e.deptno = 10 and e.deptno = d.deptno

这个解决方案是一个关于连接查询的例子。更准确地说,它是内连接中的相等连接。连接
查询是一种把来自两个表的行合并起来的操作。对于相等连接而言,其连接条件依赖于某
个相等条件(例如,一个表的部门编号和另一个表的部门编号相等)。内连接是最早的一
种连接,它返回的每一行都包含了来自参与连接查询的各个表的数据。

理论上,连接操作首先会依据FROM 子句里列出的表生成笛卡儿积(列出所有可能的行组
合),如下所示。

EMP 表里部门编号为10 的全部员工与DEPT 表的所有部门组合都被列出来了。然后,通过
WHERE 子句里的e.deptno 和d.deptno 做连接操作,限定了只有EMP.DEPTNO 和DEPT.DEPTNO
相等的行才会被返回。

1
2
3
4
类似实现,使用显式的Join来实现
select e.name, d.loc from emp e
inner join dept d on e.deptno = d.deptno
where e.deptno = 10

如果把上面的变成小于等于10的情况,用Join的方式合并的话会变成这样

1
2
3
select e.name, d.loc from emp e  
inner join dept d on e.deptno = d.deptno
where (e.deptno = 10) or (e.deptno < 10)

查找两个表相同的行并且连接多列

需求: 获取Clerk的信息,但是需要全部的列

步骤: 1. 创建一个视图把Clerk的信息查出来
2. 然后再去与emp表做Join获取完整的信息

1
2
3
4
5
select e.empno, e.name, e.job, e.sal, e.deptno from emp e, 
(select ename, job, sal from emp where job = "CLERK") V
where V.ename = e.ename and
V.job = e.job and
V.sal = e.sal

Join的处理手法

1
2
3
4
5
select e.empno, e.name, e.job, e.sal, e.deptno from emp e
join (select ename, job, sal from emp where job = "CLERK") V on (
V.ename = e.ename
V.job = e.job
V.sal = e.sal)

查询只存在于一个表中的数据

一般来说,直接使用not in 就可以了。但是对于如果含有Null的数据,就不能直接使用这样的方法处理。
那为什么null的数据就会出现问题呢?这个就要看一下他可能的实现方式
对于Mysql的实现, not in 和 in 本质上是 or的关系运算。 由于null 参与Or的逻辑运算方式不一致,In 和Not in 将产生不同的结果。

此处默认表中有一条Null的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// In
select deptno from dept where deptno in (10, 50, null)
~~~~~~~~~~
Deptno
--------
10

select deptno from dept where (deptno=10 or deptno=50 or deptno=null)
~~~~~~~~~~
Deptno
--------
10

//Not in

select deptno from dept where deptno not in (10, 50, null)
~~~~~~~~~~
Deptno
--------
no rows

select deptno from dept where deptno not (deptno=10 or deptno=50 or deptno=null)
~~~~~~~~~~
Deptno
--------
no rows

如果想要解决上面的null 所导致的问题, 需要结合Not exists 和关联子查询。

1
2
3
4
5
6
select d.deptno from dept d where
not exists (select null from emp e where d.deptno = e.deptno)
~~~~~~~~~
Deptno
--------
40

确定两个表是否有相同的数据

对于上面这种查询,可能会出现红色圈的数据重复的现象。那么我们要怎样才能确定是否有重复数据呢?

1
2
3
4
5
create view v as 
select * from emp where deptno != 10
union all
select * from emp where ename = "ward"

处理手法使用关联子查询和UNION ALL 找出那些存在于视图V 而不存在于EMP 表的数据,以及存在于EMP 表而不存在于视图V 的数据,并将它们合并起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//  处理存在于EMP 不存于v的查询
select * from (
select e.empno, e.ename, e.job, e.mgr, e.hiredate,
e.sal, e.comm, e.deptno, count(*) as cnt from emp e
group by empno, ename, job, mgr, hiredate,
sal, comm, deptno) e
) where not exists (
select null from (
select v.empno, v.ename, v.job, v.mgr, v.hiredate,
v.sal, v.comm, v.deptno, count(*) as cnt from v
group by empno, ename, job, mgr, hiredate,
sal, comm, deptno) v
where v.empno = e.empno and
v.ename = e.ename and
v.job = e.job and
v.mgr = e.mgr and
v.hiredate = e.hiredate and
v.sal = e.sal and
v.deptno = e.deptno and
v.cnt = e.cnt and
coalesce(v.comm, 0) = coalesce(e.comm, 0)
)
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//  处理存在于V 不存于EMP的查询
select * from (
select v.empno, v.ename, v.job, v.mgr, v.hiredate,
v.sal, v.comm, v.deptno, count(*) as cnt from v
group by empno, ename, job, mgr, hiredate,
sal, comm, deptno) v
) where not exists (
select null from (
select e.empno, e.ename, e.job, e.mgr, e.hiredate,
e.sal, e.comm, e.deptno, count(*) as cnt from v
group by empno, ename, job, mgr, hiredate,
sal, comm, deptno) e
where v.empno = e.empno and
v.ename = e.ename and
v.job = e.job and
v.mgr = e.mgr and
v.hiredate = e.hiredate and
v.sal = e.sal and
v.deptno = e.deptno and
v.cnt = e.cnt and
coalesce(v.comm, 0) = coalesce(e.comm, 0)
)
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 总体
select * from (
select e.empno, e.ename, e.job, e.mgr, e.hiredate,
e.sal, e.comm, e.deptno, count(*) as cnt from emp e
group by empno, ename, job, mgr, hiredate,
sal, comm, deptno) e
) where not exists (
select null from (
select v.empno, v.ename, v.job, v.mgr, v.hiredate,
v.sal, v.comm, v.deptno, count(*) as cnt from v
group by empno, ename, job, mgr, hiredate,
sal, comm, deptno) v
where v.empno = e.empno and
v.ename = e.ename and
v.job = e.job and
v.mgr = e.mgr and
v.hiredate = e.hiredate and
v.sal = e.sal and
v.deptno = e.deptno and
v.cnt = e.cnt and
coalesce(v.comm, 0) = coalesce(e.comm, 0)
)
)
Unoin all
select * from (
select v.empno, v.ename, v.job, v.mgr, v.hiredate,
v.sal, v.comm, v.deptno, count(*) as cnt from v
group by empno, ename, job, mgr, hiredate,
sal, comm, deptno) v
) where not exists (
select null from (
select e.empno, e.ename, e.job, e.mgr, e.hiredate,
e.sal, e.comm, e.deptno, count(*) as cnt from v
group by empno, ename, job, mgr, hiredate,
sal, comm, deptno) e
where v.empno = e.empno and
v.ename = e.ename and
v.job = e.job and
v.mgr = e.mgr and
v.hiredate = e.hiredate and
v.sal = e.sal and
v.deptno = e.deptno and
v.cnt = e.cnt and
coalesce(v.comm, 0) = coalesce(e.comm, 0)
)
)

高级应用

分页

可以使用Mysql 或者PG 内置的 limit 和 offset 进行处理

1
2
3
select sal from emp order by sal limit 5 offset 0

select sal from emp order by sal limit 5 offset 5

间隔获取记录

对于没有默认编号的数据,我们需要先编号再进行过滤的操作
如果默认是已经有字段是序号的话,则采用直接Mod数据即可

1
2
3
4
5
6
select x.name from (
select a.ename , (
select count(*) from emp b where
b.ename <= a.ename
) as rn from emp a
) x where mod(rn,2) = 1

外查询使用OR逻辑

  1. 先去Join表,然后再去进行Or的逻辑判断
    1
    2
    3
    4
    select e.ename, d.deptno , d.dname, d.loc from dept d 
    left join emp e on (d.deptno = e.deptno
    and (e.deptno=10 or e.deptno=20))
    order by 2
  2. 先创建一个中间表,然后再去进行Join的操作

select e.ename, d.deptno , d.dname, d.loc from dept d
left join (select * from emp e where e.deptno=10 or e.deptno=20)
on d.deptno = e.deptno order by 2

对单表需要做数据运算情况

情况1: 找出互逆的记录(本例)
情况2: 查找表中某列1相差为1,并且某列2差为5的记录

总体的思路,把自己与自己(或者与自己的子集)求笛卡尔积,然后去进行条件的筛选

1
2
3
4
select distinct v1.* from V v1, V v2 where
v1.test1 = v2.test2
and v1.test2 = v2.test1
and v1.test1 <= v1.test2

找出最靠前的N条记录

此处使用了标量子查询来创建了一张临时表的RNK的列

1
2
3
4
5
6
7
8
9
// 
select ename, sal from (
select (
select (count(distinct b.sal) from emp b where
a.sal <= b.sal) as rnk,
a.sal,
a.ename
) from emp a
) where rnk <=5

OrderBy

基础查询

1
2
3
4
// 升序查询
select * from emp order by col2 asc;
// 降序查询
select * from emp order by col2 desc;

多字段排序

1
select empno, deptno, sal, ename, job from emp order by deptno (asc), sal desc;

动态排序

1
2
3
4
select ename, sal, job, comm from emp order by 
case when job = "salesman" then comm
else sal
end;

update

基础语法

1
update table name set col_name = xxx where $cond

delete

基础语法

1
delete from table_name where $cond

删除重复记录

1
delete from table where id not in (select min(id) from table group by name)

Having & GroupBy

1
2
3
4
wiki原文
A HAVING clause in SQL specifies that an SQL SELECT statement should only return rows where aggregate values meet the specified conditions. It was added to the SQL language because the WHERE keyword could not be used with aggregate functions.
The HAVING clause filters the data on the group row but not on the individual row.
To view the present condition formed by the GROUP BY clause, the HAVING clause is used.

Having的语句是必须要在GroupBy后面才能使用。并且与Where的区别是,Where不能直接接入聚合的函数(如Sum()、Count()、Avg()) 这种的聚合函数, 意思是不能 where sum(column_a) 这样的用法), 并且Having可以对按Group区分的Row进行过滤的操作

所以常规语法一般是

1
select * from table_a A group by columa_a having count (A.column_a ) > 200

特殊注意

  1. 类似于Sum, max, min , avg 这些也是可以直接用于select 的条件上面的
    1
    select max(Salary) as SecondHighestSalary from employee where salary<(select max(distinct(salary)) from employee)
  2. sql 三元运算符
    if (expr1, expr2, expr3)
    跟正常编程语言中的三元运算符一致,只是语法有变动。也是满足条件一,则返回expr2,否则返回expr3

ShareNote

  1. Visual-Representation-of-SQL
  2. Having-Sql-Cluse-wiki
  3. Sql经典实例
  4. Sql基础教程

Go语言切片技巧

添加元素到切片中

添加元素到尾部

1
2
3
4
5
var a []int
// 添加元素(后面的元素都需要解包)
append(a, 0)
// 添加切片(后面的元素都需要解包)
append(a, []int{1,2,3}...)

添加元素到头部

1
2
3
4
5
var a []int
// 添加元素(后面的元素都需要解包)
a = append([]int{1}, a...)
// 添加切片(后面的元素都需要解包)
a = append([]int{1,2,3}, a...)

添加指定元素到切片中

性能较低的方法(会产生中间过渡切片)

1
2
3
var a []int
a = append(a[:i], append([]int{1,2}, a[i:]...)...) // 在第i个元素插入切片
a = append(a[:i], append([]int{1}, a[i:]...)...) // 在第i个元素插入元素

性能较高的方法

1
2
3
4
5
6
7
8
9
10
//  插入单个元素
a = append(a, 0)
copy(a[i+1:], a[i:])
a[i] = x

// 添加多个元素(一个切片)
// b= []int{1,2,3}
a = append(a,b..)
copy(a[i+len(b):], a[i:])
copy(a[i:], b)

删除切片元素

删除结尾元素

1
2
3
4
5
a = []int{1,2,3}
// 删除单个元素
a = a[:len(a)-1]
// 删除多个元素
a = a[:len(a)-N]

删除开头元素

1
2
3
4
5
a = []int{1,2,3}
// 删除单个元素
a = a[1:]
// 删除多个元素
a = a[N:]

不移动指针的方法(把后面元素往前移动), 用到了空指针

1
2
3
4
5
6
7
8
9
10
a = []int{1,2,3}
// 删除单个元素
a = append(a[:0], a[1:]...)
// 删除多个元素
a = append(a[:0], a[N:]...)

// 使用Copy来进行处理
a = []int{1,2,3}
a = a[:copy(a, a[1:])]
a = a[:copy(a, a[N:])]

删除中间元素

1
2
3
4
5
6
a = []int{1,2,3,4,5}
a = append(a[:i], a[i+1:]...)
a = append(a[:i], a[i+N:]...)

a = a[:i+copy(a[i:], a[i+1:])]
a = a[:i+copy(a[i:], a[i+N:])]

切片的实现原理及使用的注意事项

切片本质上是对底层数组的一个数据的引用。但是它是一个动态的结构。它的底层结构是这样的

1
2
3
4
5
type SliceHeader struct{
Data uintptr
Len int // 实际已经使用的容量
Cap int // 最大的容量
}

可以看出实际上这两个切片都是指向同一个数组的。但是如果当切片进行扩展后,会变成这样。

可以看出,因为原来的底层数组因为长度已经不足以Slice进行扩展,因此Slice会先去创建一个新的底层数组,并且把原来的元素加入到新创建的数组中,并且再把新插入的元素插入到新数组中。然后把原来指向的底层数组的Reference Count 减一。

使用的技巧

  1. 因为如果append进去切片的时候,len = cap 就会出现内存的申请和数据的复制,这样会使得比较慢。使用时尽量去减少触发内存分配的次数和分配内存的大小。
  2. 对于切片来说,即使是空切片,cap=0, len=0, 它实际上也不会是nil, 因此判断一个切片是否为空,应该使用len($slice_name) == 0 来继续判断。

切片GC的问题

一个比较经常的情况,对底层数组的某一个内存的引用,导致整个数组无法被GC。

变量引用的情况

1
2
3
4
func FindPhoneNumber(filename string) []byte{
b, _ := ioutil.ReadFile(filename)
return regexp.MustCompile("[0-9]+").Find(b)
}

上面的这段代码里面,因为返回的地方是一个切片引用了b的底层数组,导致b的底层数组会一直在内存中。

解决上面的问题,可以使用传值把需要用到的值传到一个新的切片里面,这样就能减少依赖。

1
2
3
4
5
func FindPhoneNumber(filename string) []byte{
b, _ := ioutil.ReadFile(filename)
b = regexp.MustCompile("[0-9]+").Find(b)
return append([]byte{}, b)
}

删除变量的情况

1
2
var a *[]int{1,2,3,4,4,5,6}
a = a[:len(a)-1] // 被删除的最后一个元素仍然被引用

保险的处理方法, 把需要删除的元素设置为nil,然后再去进行切片

1
2
3
var a *[]int{1,2,3,4,4,5,6}
a[len(a)-1] = nil
a = a[:len(a)-1] // 被删除的最后一个元素仍然被引用

MIT6.824 Lab4 实现及解析

目录

  1. 实验目的
  2. 实验实现
    2.1 lab4-1
    2.2 lab4-2

实验目的

lab4需要实现两个模块:

  1. lab4-1 完成一个基于KvRaft的ShardMaster(可以理解为一个分片调度的存放的机器),但是写入的是配置而不是简单的K-v的值(但是与Lab3的实现相当相似)
  2. lab4-2 完成一个Sharding的kvRaft数据库(实现了Multi-raft)

整体的实验架构是这样的

架构图如上:
一个值得注意的点是,各个KV数据库层之间不会直接相互通信,只是会通过Raft层来同步操作。
并且Client如果找到的不是Leader的节点,会直接放弃操作。然后请求集群内的下一个节点。
PS: 这里的KvDatabase 和Raft总体包起来才是一个KVServer的实例,而不是单独脱离的。

流程大概如下:

  1. Client发送请求到ShardMaster,查询Key的具体对应的分片在哪个组里面
  2. ShardMaster收到请求后,返回对应数据Key所在的组信息
  3. Client发送请求到对应分片的Raft Leader中
  4. Raft层同步成功后,会通过ApplyCh返回信号的KvDatabase
  5. 分片的KvDatabase对Client端做出应答(如果成功返回结果为成功,如果下面同步层失败导致超时,返回给客户端的结果为超时)

副本与分片的问题

在上面的架构图中,Multi-Raft已经实现了分片的副本的实现。

分片: 只要是把数据进行划分(从大的数据变为只负责一部分的数据量)即是分片(在定义上面数据库的垂直划分和水平划分和此处的Key按键去Mod划分都是属于分片的操作,但是数据库表的划分和此处的Key的Mod划分不是在同一个层级上面的,理解不一样)
副本: 是指数据的重复的数量。但是一般只有一份的数据我们不会称之为单副本,而是称为0副本。副本一般是>=2才叫。副本的目的是为了冗余的问题。防止因为单点故障而导致数据全部的丢失。

实验实现

lab4-1

对比起与Lab3 的实现,它在这里需要支持的是Leave(), Move(), Join(), Query()的四种方法,因为分别对应节点的加入集群、退出集群、移动集群和集群配置查询的四种操作。
所以基本实现的思路与Lab3是类似的。

1
2
3
4
5
6
7
8
9
10
11
12
type ShardMaster struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
// Your data here.
// Lab3 此处是为database 是一个map[string]string 的结构
configs []Config // indexed by config num
dup map[int64]Result
chanResult map[int]chan Result
}

但是有一个比较重要的需求是,它需要完成一个shard 与 集群的绑定关系的变动,因为本来就是需要支持的目的就是数据随着Group的添加和变动来做到数据的均衡。
并且一个比较不同的是,对于重复的Get操作操作,Lab3采用的是直接返回Kv的值,但是在此处,因为Config的存放机制是一个数组(里面的顺序就是配置有效的顺序),因此需要把重复的读配置,返回一个复制好的配置。

lab4-2

这里的实现是需要首先保证Kv的功能可以使用,然后保证在配置变动并且数据搬移完成之后,才能继续对外提供Kv的服务。并且需要保证节点挂掉之后可以读取会最新的状态下来

总体流程:
在提供KV服务的同时,需要把配置定时进行更新,并且实际应用新配置之前,必须保证数据迁移成功。因此实际上用到了3个单独的协程去分别做这几个工作

  1. 读取配置协程(定时向ShardMaster请求配置)
  2. 数据迁移的协程
  3. 应用数据同步的协程

因为这个实验中的目的有三种,因此我们的消息类型也定义了三种

  1. 数据操作类型,与Lab3原来类似的OP类型(可以包含Get、Pull、Put、Append的操作)
  2. 配置更新类型,把Lab4-1的Config类型封装一层进行使用
  3. 真实的数据迁移类型,原因是:因为数据迁移的时候是两个RaftGroup的Leader相互通信,并且需要把原来数据KV格式同步进去到新的组的所有副本上面。因此单独分配一个数据类型来记录此类数据

向ShardMaster读取配置的协程的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (kv *ShardKV) ConfigUpdateRoutine() {
for {
kv.mu.Lock()
curConfigNum := kv.myconfig[0].Num
kv.mu.Unlock()
// 此处Query带上当前版本加1的原因,防止查询到的配置不正确,导致分片的节点之前一直无法达成配置上共识
config := kv.mck.Query(curConfigNum + 1)
//DPrintf("get newConfig from SM group is %v, shard is %v", config.Groups, config.Shards)
kv.mu.Lock()
if config.Num == kv.myconfig[0].Num+1 {
// update with static NewConfig
newConfig := kv.makeEmptyConfig()
kv.CopyConfig(&config, &newConfig)
if _, isLeader := kv.rf.GetState(); isLeader {
cfg := Cfg{newConfig, int64(kv.gid), kv.myconfig[0].Num}
kv.rf.Start(cfg)
DPrintf("Config: group %d-%d is start config %d into consueum",
kv.gid, kv.me, cfg.NewConfig.Num)
//index, _, isleader := kv.rf.Start(cfg)
//DPrintf("))))) server %d gid %d Start cfg, index is %d, isleader is %t, kv is %v",
// kv.me, kv.gid, index, isleader, kv.database)
}
}
kv.mu.Unlock()
time.Sleep(100 * time.Millisecond)
}
}

修改自己需要发送和接受Shard的配置部分的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
case Cfg:
if kv.CfgDupCheck(cmd.ClientId, cmd.Seq) {
kv.SwitchConfig(cmd)
if kv.CheckMigrateDone() {
// if migrate done, use new config, if not, do nothing to avoid replying the old group replied
kv.myconfig[0] = kv.myconfig[1]
//DPrintf("group %d-%d is applied new config , shard is %v, kv is %v", kv.gid, kv.me, kv.myshards, kv.database)
}
kv.cfgdup[cmd.ClientId] = cmd.Seq
if kv.maxraftstate != -1 {
kv.SaveSnapshot(index)
}
}

func (kv *ShardKV) SwitchConfig(newcfg Cfg) {
if newcfg.NewConfig.Num == kv.myconfig[0].Num+1 {
if kv.myconfig[0].Num != 0 {
kv.GenShardChangeList(newcfg)
} else if kv.myconfig[0].Num == 0 {
for i := 0; i < shardmaster.NShards; i++ {
if newcfg.NewConfig.Shards[i] == kv.gid {
kv.myshards[i] = 1
}
}
}
newc := kv.makeEmptyConfig()
kv.CopyConfig(&newcfg.NewConfig, &newc)
kv.myconfig[1] = newc
}
}

// 此函数是用于生成需要发送和修改那些部分的参数
func (kv *ShardKV) GenShardChangeList(newcfg Cfg) {
for i := 0; i < shardmaster.NShards; i++ {
if kv.myconfig[0].Shards[i] == kv.gid && newcfg.NewConfig.Shards[i] != kv.gid {
//need to send
kv.needsend[i] = newcfg.NewConfig.Shards[i]
}
if kv.myconfig[0].Shards[i] != kv.gid && newcfg.NewConfig.Shards[i] == kv.gid {
//need to recv
_, ok := kv.needrecv[kv.myconfig[0].Shards[i]]
if !ok {
kv.needrecv[kv.myconfig[0].Shards[i]] = make([]int, 0)
}
kv.needrecv[kv.myconfig[0].Shards[i]] = append(kv.needrecv[kv.myconfig[0].Shards[i]], i)
}
}
DPrintf("!!! group %d-%d, new config need to send is %v, need to receive is %v", kv.gid, kv.me, kv.needsend, kv.needrecv)
}

获取迁移的数据部分
数据迁移的副本是只要检测到相关属性的变化之后(感知到数据的变化)后,新的数据所归属的Leader就会与旧Leader继续RPC的Pull调用, 去获取它的Database和DUP的部分
当拉取到配置了之后,就会把数据变成日志应用到状态中,就可以实现分片数据的副本的性质。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func (kv *ShardKV) MigrationRoutine() {
for {
if _, isLeader := kv.rf.GetState(); isLeader {
kv.mu.Lock()
for k, v := range kv.needrecv {
//DPrintf("group %d-%d needrecv ")
needshard := make([]int, 0)
for i := 0; i < len(v); i++ {
needshard = append(needshard, v[i])
}

args := PullArgs{Shard: needshard, ClientId: int64(kv.gid), Seq: kv.myconfig[0].Num}
go func(mgid int, arg *PullArgs) {
servers := kv.myconfig[0].Groups[mgid]
DPrintf("Migrate: group %d-%d get Gid %d Servers is %v",
kv.gid, kv.me, mgid, servers)
for {
for _, si := range servers {
reply := PullReply{}
srv := kv.make_end(si)
DPrintf("group %d-%d start call to gid %d", kv.gid, kv.me, mgid)
// 由GroupA
ok := srv.Call("ShardKV.Pull", arg, &reply)
//DPrintf("Migrate: group %d-%d calling for server %v rpc pull, result is %t",
// kv.gid, kv.me, si, ok)
if !ok {
DPrintf("Migrate Failed: group %d-%d calling for server %v rpc pull, result is %t",
kv.gid, kv.me, si, ok)
}
if ok && reply.WrongLeader == false {
if reply.Err == ErrNeedWait {
DPrintf("Migrate: waiting server %v to pull new config from SM", si)
return
}
if _, isleader := kv.rf.GetState(); isleader {
newmapkv := make(map[string]string)
for k, v := range reply.MapKV {
newmapkv[k] = v
}
var newdup [shardmaster.NShards]map[int64]int
for i := 0; i < shardmaster.NShards; i++ {
newdup[i] = make(map[int64]int)
for k, v := range reply.ShardDup[i] {
newdup[i][k] = v
}
}
mig := Migrate{newmapkv, newdup, arg.Seq, mgid}
kv.mu.Lock()
// this is how partition data can be repliacated
kv.rf.Start(mig)
DPrintf("Migrate: group %d-%d start migrate the data pulled from %d", kv.gid, kv.me, mgid)
kv.mu.Unlock()
return
}
} else {
DPrintf("Migrate Failed: group %d-%d call %d-%v meet wrong leader",
kv.gid, kv.me, mgid, si)
DPrintf("!!!server is %v", servers)
}
time.Sleep(20 * time.Millisecond)
}
}
}(k, &args)
}
kv.mu.Unlock()
}
time.Sleep(100 * time.Millisecond)
}
}

非Leader节点同步KV数据的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
case Migrate:
if kv.MigrateDupCheck(cmd.Gid, cmd.Num) {
//DPrintf("group %d-%d apply the migrate data from %d and config num %d", kv.gid, kv.me, cmd.Gid, cmd.Num)
//DPrintf("group %d-%d database before migrate is %v", kv.gid, kv.me, kv.database)
for k, v := range cmd.MapKV {
kv.database[k] = v
}
//DPrintf("group %d-%d database after migrate is %v", kv.gid, kv.me, kv.database)
for i := 0; i < shardmaster.NShards; i++ {
for k, v := range cmd.ShardDup[i] {
kv.dup[i][k] = v
}
}
for i := 0; i < len(kv.needrecv[cmd.Gid]); i++ {
kv.myshards[kv.needrecv[cmd.Gid][i]] = 1
}
delete(kv.needrecv, cmd.Gid)
if kv.CheckMigrateDone() {
kv.myconfig[0] = kv.myconfig[1]
DPrintf("Migrate: group %d-%d successful switch to config %d", kv.gid, kv.me, kv.myconfig[0].Num)
//DPrintf("group %d-%d is applied new config , shard is %v", kv.gid, kv.me, kv.myshards)
}
kv.migratedup[cmd.Gid] = cmd.Num
if kv.maxraftstate != -1 {
kv.SaveSnapshot(index)
}
}

MIT6.824 Lab3 实现及解析

目录

  1. 实验目的
  2. 实验实现

实验目的

lab3需要实现一个建议的带客户端的 分布式KV的数据库,需要支持对外的Get(), Put(), Append()三个操作

整体的实验架构是这样的

架构图如上:
一个值得注意的点是,各个KV数据库层之间不会直接相互通信,只是会通过Raft层来同步操作。
并且Client如果找到的不是Leader的节点,会直接放弃操作。然后请求集群内的下一个节点。
PS: 这里的KvDatabase 和Raft总体包起来才是一个KVServer的实例,而不是单独脱离的。

流程大概如下:

  1. Client发送请求到KvDatabase
  2. KvDatabase收到请求后,会把收到的命令重新封装,通过Raft提供的Start API来在Raft集群中进行同步的操作
  3. Raft同步到其他的非Leader节点中
  4. Raft层同步成功后,会通过ApplyCh返回信号的KvDatabase
  5. KvDatabase对Client端做出应答(如果成功返回结果为成功,如果下面同步层失败导致超时,返回给客户端的结果为超时)

实验实现

此部分实现分为两个部分:

  1. 基础的键值对的实现
  2. 日志压缩与快照的部分

基础键值对的实现

因为上面提及到这里的KvDatabase 和Raft总体包起来才是一个KVServer的实例,而不是单独脱离的。
所以此处展示一下KvRaft所包含的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type KVServer struct {
mu sync.Mutex
me int
// 包含了Raft的实例
rf *raft.Raft
// Applych 是 用与接受Raft成形成共识后的返回
applyCh chan raft.ApplyMsg

maxraftstate int // snapshot if log grows this big
database map[string]string
dup map[int64]int
chanResult map[int]chan Op
}

// Type OP
type Op struct {
// Key 是 Get()、Put()、Append()三个都会用到的值
Key string
// Value是Put()、Append()用到的字段,Get此处默认为空
Value string
// 存放的是操作的名称
Name string
// 用于表示客户端的来源
ClientId int64
// 给予序号
Seq int
}

上面所描述到的Op结构体的操作为什么要把Get加入,并且需要区分ClientId和Seq的原因,本质上都是需要在全序关系广播中实现线性化所需要的。如果不知道什么是全序关系广播,可以查看之前我的文章分布式系统一致性与共识

此处还需要注意,因为我们暴露给客户端的操作是一个同步的操作,但是我们这层与Raft层是一个异步的操作,因此,需要我们这边等待Raft层异步返回成功,并且我们此层把数据保留下来后,才能继续返回

所以需要需要在另外一个协程中去读取ApplyCH的数据,然后继续对比之后再去创建返回给客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
func (kv *KVServer) ApplyOPRoutine() {
//this gorouine is to asyncly get the result of raft applych reply to
// and to produce signal to reply client Rpc Request
DPrintf("Apply gorountine runing ")
for {
msg := <-kv.applyCh
//DPrintf("get apply msg from raftServer")
if msg.CommandValid {
index := msg.CommandIndex
if cmd, ok := msg.Command.(Op); ok {
kv.mu.Lock()
// 对比单个客户端的序号,来减少重复的旧操作的更新操作
if kv.dupcheck(cmd.ClientId, cmd.Seq) {
if cmd.Name == PUT {
kv.database[cmd.Key] = cmd.Value
} else if cmd.Name == APPEND {
if _, ok := kv.database[cmd.Key]; ok {
kv.database[cmd.Key] += cmd.Value
} else {
kv.database[cmd.Key] = cmd.Value
}
}
kv.dup[cmd.ClientId] = cmd.Seq
}
res := Op{cmd.Key, kv.database[cmd.Key], cmd.Name,
cmd.ClientId, cmd.Seq}
ch, ok := kv.chanResult[index]
if ok {
select {
case <-ch:
default:
}
ch <- res
//DPrintf("the cmd has been commited , push request return to chan")
}
if kv.maxraftstate != -1 && kv.rf.GetStateSize() >= kv.maxraftstate && index == kv.rf.GetCommitedIndex() {
DPrintf("Do snapshot for over the maxraftstate")
kv.DoSnapShot(index)
}
kv.mu.Unlock()
}
} else {
kv.LoadSnapShot(msg.Snapshot)

}
}
}

// 把OP传入到Raft共识层的函数
func(rf *Raft) StartCommand(cmd Op) (Err, string){
index, _, isLeader := kv.rf.Start(cmd)
//DPrintf("start command %s , client id is %d, key is %s, value is %s",
// cmd.Name, cmd.ClientId, cmd.Key, cmd.Value)
if !isLeader {
kv.mu.Unlock()
//DPrintf("not leader ")
return ERRWrongLeader, ""
}
ch := make(chan Op, 1)
kv.chanResult[index] = ch
kv.mu.Unlock()

defer func() {
// After finish the task
kv.mu.Lock()
delete(kv.chanResult, index)
kv.mu.Unlock()
}()
select {
case c := <-ch:
// this channel return is get data from ApplyRoutine
if kv.CheckSame(c, cmd) {
resvalue := ""
if cmd.Name == GET {
resvalue = c.Value
}
return OK, resvalue
} else {
DPrintf("Leader has change, index %d op %s error", index, cmd.Name)
return ERRWrongLeader, ""
}
case <-time.After(time.Duration(200) * time.Millisecond):
DPrintf("log get agree timeout, index is %d", index)
return ERRTimeout, ""
}
}

日志压缩与快照

此处的快照与Raft本身的快照多了两个KVDatabase 特有的属性, database(保存的数据) 和 dup (客户端操作序号的记录)这两个属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (kv *KVServer) DoSnapShot(index int) {
kv.rf.SaveSnapShot(index, kv.database, kv.dup)
}


// 读取快照的函数
func (kv *KVServer) LoadSnapShot(snapshot []byte) {
if snapshot == nil || len(snapshot) < 1 {
kv.mu.Lock()
kv.database = make(map[string]string)
kv.mu.Unlock()
return
}
s := bytes.NewBuffer(snapshot)
decoder := labgob.NewDecoder(s)
var kvdb map[string]string
var kvdup map[int64]int
if decoder.Decode(&kvdb) != nil || decoder.Decode(&kvdup) != nil {
DPrintf("server %d, Decode Snapshot error", kv.me)
} else {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.database = kvdb
kv.dup = kvdup
DPrintf("msg snapshot db is %v, dup is %v", kvdb, kvdup)
DPrintf("server %d , load Snapshot success", kv.me)
}
}

在本实验中,会触发日志保留的情况只是因为保存的Log> maxraftstate。