for { select { case <-m.ctx.Done(): return case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. select { case <-m.triggerSend: sentUpdates.WithLabelValues(m.name).Inc() select { case m.syncCh <- m.allGroups(): default: delayedUpdates.WithLabelValues(m.name).Inc() level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle") select { case m.triggerSend <- struct{}{}: default: } } default: } } } }
type Discoverer interface { // Run hands a channel to the discovery provider (Consul, DNS etc) through which it can send // updated target groups. // Must returns if the context gets canceled. It should not close the update // channel on returning. Run(ctx context.Context, up chan<- []*targetgroup.Group) }
case event := <-d.watcher.Events: // fsnotify sometimes sends a bunch of events without name or operation. // It's unclear what they are and why they are sent - filter them out. if len(event.Name) == 0 { break } // Everything but a chmod requires rereading. if event.Op^fsnotify.Chmod == 0 { break } // Changes to a file can spawn various sequences of events with // different combinations of operations. For all practical purposes // this is inaccurate. // The most reliable solution is to reload everything if anything happens. d.refresh(ctx, ch)
case <-ticker.C: // Setting a new watch after an update might fail. Make sure we don't lose // those files forever. d.refresh(ctx, ch)
case err := <-d.watcher.Errors: if err != nil { level.Error(d.logger).Log("msg", "Error watching file", "err", err) } } } }
level.Error(d.logger).Log("msg", "Error reading file", "path", p, "err", err) // Prevent deletion down below. ref[p] = d.lastRefresh[p] continue } select { // 把新传入的传入到Update中 case ch <- tgroups: case <-ctx.Done(): return }
ref[p] = len(tgroups) } // Send empty updates for sources that disappeared. for f, n := range d.lastRefresh { m, ok := ref[f] if !ok || n > m { level.Debug(d.logger).Log("msg", "file_sd refresh found file that should be removed", "file", f) d.deleteTimestamp(f) for i := m; i < n; i++ { select { case ch <- []*targetgroup.Group{{Source: fileSource(f, i)}}: case <-ctx.Done(): return } } } } d.lastRefresh = ref
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { defer func() { for _, tc := range d.treeCaches { tc.Stop() } for _, pathUpdate := range d.pathUpdates { // Drain event channel in case the treecache leaks goroutines otherwise. for range pathUpdate { } } d.conn.Close() }()
for _, pathUpdate := range d.pathUpdates { go func(update chan treecache.ZookeeperTreeCacheEvent) { for event := range update { select { case d.updates <- event: case <-ctx.Done(): return } } }(pathUpdate) }
for { select { case <-ctx.Done(): return case event := <-d.updates: tg := &targetgroup.Group{ Source: event.Path, } if event.Data != nil { labelSet, err := d.parse(*event.Data, event.Path) if err == nil { tg.Targets = []model.LabelSet{labelSet} d.sources[event.Path] = tg } else { delete(d.sources, event.Path) } } else { delete(d.sources, event.Path) } select { case <-ctx.Done(): return case ch <- []*targetgroup.Group{tg}: } } } }