Prometheus(1)- 数据抓取源码阅读
目录
- 使用的目的
- 代码实现
代码版本
基于 prometheus项目的master branch的5a554df0855bf707a8c333c0dd830067d03422cf commit
使用目的
Prometheus 是一个基于Pull模型所进行数据采集的系统,因此,需要在主体项目中有一个抓取数据的模块,而Scrape就是这样的模块。因此这个也是Prometheus的一个主要部分。
1 | 入口代码部分 |
代码实现
整体的流程图
目录结构
1 | -rw-r--r-- 1 ray 197121 2239 9月 30 16:09 helpers_test.go |
重要数据结构描述
ScrapeManager 是管理所有抓取的一个抽象
1 | type Manager struct { |
ScrapePools 是单个的Job的抓取目标的工作单位
1 | appendable Appendable |
loop是单个Target的执行单位,是一个接口。在这里主要使用的是ScrapeLoop的实例
1 | type loop interface { |
scraper接口时具体的执行单位,scrapeLoop也是调用scraper的方法来进行数据的抓取, Prometheus默认使用targetScraper去抓取数据
1 | type scraper interface { |
主协程逻辑
跟着调用的部分,我们先从初始化后的manager的ApplyConfig方法开始看起。
1 | func (m *Manager) ApplyConfig(cfg *config.Config) error { |
- 把解析好的配置,遍历,变为一个jobName为key,配置值为Value的map
- 对比自己的配置,如果之前已经存在但是配置发生变动的,则去reload scraper pool的配置。
- (Reload) 如果需要reload配置的情况下,会重新生成scrapePool后,派生多一个线程去执行scraperPool.Sync(),知道Manager的targetSet被遍历完为止。Sync方法的内容会后面进行详细讲解。
Run()方法
1 | func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error { |
功能:
- 等待从Main.go中传入的discoveryManager的SyncCh是否有变动,如果有变动,更新Targetset。
- 派生出了一个Reloader协程,Reloader协程会定时检查是否有关闭的信号或者Reload信号(triggerReload channel,就是外部给与主协程的刺激产生的二级信号),如果有,则执行reload操作。
子协程逻辑
ScraperPool
1 | // Sync converts target groups into actual scrape targets and synchronizes |
Sync函数是一个对外暴露函数的接口:
- 把配置解析出来的target结构化。
- 调用内部方法sync()来进行数据抓取的执行
- 一些计数器添加计数
值得注意的是Append方法,是一个封装了的方法,是同是进行对变量的修改,并且包含了采集到的数据持久化的操作。
1 | // sync takes a list of potentially duplicated targets, deduplicates them, starts |
主要逻辑:
- 把传入的Target列表进行遍历
1.1 如果target不在active的map中, 生成targetScraper,然后把targetScraper放入Loop里面,调用Loop.run()在协程中进行逻辑
1.2 否则, 会先删除旧的协程,然后重新生成协程。
ScraperLoop
1 | func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { |
ScraperLoop是单个Target进行获取的执行单位,协程使用死循环进行占用,然后调用scraper接口的Scrape方法去抓取数据,并且调用Stroage模块的Appender的接口金属数据的持久化,然后继续定时休眠的过程。我们需要更加具体的看一下实例Scraper的Scrape方法。
ScraperLoop把Scraper抽象出来的三个接口都进行了调用:
- 开始部分的Select代码段中的Offset是用于控制第一次执行的时候等待的间隔
- Scrape方法就是直接进行数据的抓取,下面有详细解析
- report方法,修改Scraper中Target自己保存的状态。
TargerScraper
1 | func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) { |
Scrape方法是使用HttpClient进行对target url 的数据抓取,抓取的内容在context中进行传递,得到返回后,继续解析,返回给ScraperLoop的Run方法使用。