// read contains the portion of the map's contents that are safe for // concurrent access (with or without mu held). // // The read field itself is always safe to load, but must only be stored with // mu held. // // Entries stored in read may be updated concurrently without mu, but updating // a previously-expunged entry requires that the entry be copied to the dirty // map and unexpunged with mu held. read atomic.Value // readOnly
// dirty contains the portion of the map's contents that require mu to be // held. To ensure that the dirty map can be promoted to the read map quickly, // it also includes all of the non-expunged entries in the read map. // // Expunged entries are not stored in the dirty map. An expunged entry in the // clean map must be unexpunged and added to the dirty map before a new value // can be stored to it. // // If the dirty map is nil, the next write to the map will initialize it by // making a shallow copy of the clean map, omitting stale entries. dirty map[interface{}]*entry
// misses counts the number of loads since the read map was last updated that // needed to lock mu to determine whether the key was present. // // Once enough misses have occurred to cover the cost of copying the dirty // map, the dirty map will be promoted to the read map (in the unamended // state) and the next store to the map will make a new dirty copy. misses int }
// readOnly is an immutable struct stored atomically in the Map.read field. type readOnly struct { m map[interface{}]*entry amended bool // true if the dirty map contains some key not in m. } type entry struct { // p points to the interface{} value stored for the entry. // // If p == nil, the entry has been deleted and m.dirty == nil. // // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry // is missing from m.dirty. // // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty // != nil, in m.dirty[key]. // // An entry can be deleted by atomic replacement with nil: when m.dirty is // next created, it will atomically replace nil with expunged and leave // m.dirty[key] unset. // // An entry's associated value can be updated by atomic replacement, provided // p != expunged. If p == expunged, an entry's associated value can be updated // only after first setting m.dirty[key] = e so that lookups using the dirty // map find the entry. p unsafe.Pointer // *interface{} }
可以使用 GoProxy 的中国实现解决实际生产上 Go 下载依赖包困难的问题(Golang 默认对于公开的开源库是会选择回源进行下载,但由于不可抗力,实际生产中下载依赖有可能触发一下404的域名,在不使用代理的情况下,无法下载依赖)
目的是可以抛开 GOPATH 来更加自由的组织自己的项目
GoModule 原理相关
GoModule管理版本方式
GoModule采用语义化版本管理方式来管理库名的版本。
1 2 3 4 5 6
此处在比较正式的软件工程的定义中,关于版本的定义如下: V $Major.$Minor.$Patch (eg: v2.10.1) 其中定义: - Major 为大版本,即默认期待为可能会有破坏性的变动,如Api的整体大变更 - Minor 为小版本,为引入新功能或者特性修改 - Patch 为补丁,一般来说是修复对应Minor版本中的部分问题
Golang 也引用了这个定义来进行 GoModule的构建。 即如果包发生破坏性的变化,ModuleName 需要同时加一,以一个我们使用到的excel解析库为例子, 即 Major Version 升级也应该放入 ModuleName 中。
修改了 build 函数里面的拉取的函数直接通过Go Get 去进行获取对应库的分支,以前采用的方式是使用 git clone 下来对应版本的私有代码到GOPATH来进行构建。
为了方便开发时功能分支的切换,而且降低开发人员对GoMod 文件管理的心智负担,我们在构建的时候会把 go mod 中的 replace 相关的内容在构建中通过shell命令给替换掉,保证了上线代码的版本必定是指定构建依赖的分支
迁移GoModule后遇到的坑
Jenkins构建上面的问题 遇到一个比较大的坑是,实现的第一版的时候,想把pull本项目的代码与Go get 依赖放到同一个 Stage 中,build单独放到另外一个 Stage 中。但是发现在 Jenkin s中如果把 go get 放到 build 之外的 Stage 中,实际上并没有go get 依赖库新版本成功。因此最终是把go get 自己私有依赖和 build 放到了同一个 stage 就可以解决问题
依赖库版本管理的问题 有一次在开发版本国中发生比较大的代码合入之后,直接使用 Go Mod tidy 整理依赖后,go.mod 文件重新计算依赖后,直接把我们的excel解析库的版本进行了升级,然后api发生了变动,经检查后,我们直接通过下面的go get 的命令来直接锁死了该库的版本
1
go get github.com/360EntSecGroup-Skylar/excelize/v2@<=v2.1.0
jitterSeed uint64 // Global jitterSeed seed is used to spread scrape workload across HA setup. mtxScrape sync.Mutex // Guards the fields below. scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool targetSets map[string][]*targetgroup.Group
triggerReload chan struct{} }
ScrapePools 是单个的Job的抓取目标的工作单位
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
appendable Appendable logger log.Logger
mtx sync.RWMutex config *config.ScrapeConfig client *http.Client // Targets and loops must always be synchronized to have the same // set of hashes. activeTargets map[uint64]*Target droppedTargets []*Target loops map[uint64]loop cancel context.CancelFunc
// Constructor for new scrape loops. This is settable for testing convenience. newLoop func(scrapeLoopOptions) loop
// Sync converts target groups into actual scrape targets and synchronizes // the currently running scraper with the resulting set and returns all scraped and dropped targets. func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { start := time.Now()
var all []*Target sp.mtx.Lock() sp.droppedTargets = []*Target{} for _, tg := range tgs { targets, err := targetsFromGroup(tg, sp.config) if err != nil { level.Error(sp.logger).Log("msg", "creating targets failed", "err", err) continue } for _, t := range targets { if t.Labels().Len() > 0 { all = append(all, t) } else if t.DiscoveredLabels().Len() > 0 { sp.droppedTargets = append(sp.droppedTargets, t) } } } sp.mtx.Unlock() sp.sync(all)
// sync takes a list of potentially duplicated targets, deduplicates them, starts // scrape loops for new targets, and stops scrape loops for disappeared targets. // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { sp.mtx.Lock() defer sp.mtx.Unlock()
for _, t := range targets { t := t hash := t.hash() uniqueTargets[hash] = struct{}{}
if _, ok := sp.activeTargets[hash]; !ok { s := &targetScraper{Target: t, client: sp.client, timeout: timeout} l := sp.newLoop(scrapeLoopOptions{ target: t, scraper: s, limit: limit, honorLabels: honorLabels, honorTimestamps: honorTimestamps, mrc: mrc, })
sp.activeTargets[hash] = t sp.loops[hash] = l
go l.run(interval, timeout, nil) } else { // Need to keep the most updated labels information // for displaying it in the Service Discovery web page. sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels()) } }
var wg sync.WaitGroup
// Stop and remove old targets and scraper loops. for hash := range sp.activeTargets { if _, ok := uniqueTargets[hash]; !ok { wg.Add(1) go func(l loop) {
// Wait for all potentially stopped scrapers to terminate. // This covers the case of flapping targets. If the server is under high load, a new scraper // may be active and tries to insert. The old scraper that didn't terminate yet could still // be inserting a previous sample set. wg.Wait() }
// Only record after the first scrape. if !last.IsZero() { targetIntervalLength.WithLabelValues(interval.String()).Observe( time.Since(last).Seconds(), ) }
b := sl.buffers.Get(sl.lastScrapeSize).([]byte) buf := bytes.NewBuffer(b)
if scrapeErr == nil { b = buf.Bytes() // NOTE: There were issues with misbehaving clients in the past // that occasionally returned empty results. We don't want those // to falsely reset our buffer size. if len(b) > 0 { sl.lastScrapeSize = len(b) } } else { level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error()) if errc != nil { errc <- scrapeErr } }
// A failed scrape is the same as an empty scrape, // we still call sl.append to trigger stale markers. total, added, seriesAdded, appErr := sl.append(b, contentType, start) if appErr != nil { level.Warn(sl.l).Log("msg", "append failed", "err", appErr) // The append failed, probably due to a parse error or sample limit. // Call sl.append again with an empty scrape to trigger stale markers. if _, _, _, err := sl.append([]byte{}, "", start); err != nil { level.Warn(sl.l).Log("msg", "append failed", "err", err) } }
for { select { case <-m.ctx.Done(): return case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker. select { case <-m.triggerSend: sentUpdates.WithLabelValues(m.name).Inc() select { case m.syncCh <- m.allGroups(): default: delayedUpdates.WithLabelValues(m.name).Inc() level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle") select { case m.triggerSend <- struct{}{}: default: } } default: } } } }
type Discoverer interface { // Run hands a channel to the discovery provider (Consul, DNS etc) through which it can send // updated target groups. // Must returns if the context gets canceled. It should not close the update // channel on returning. Run(ctx context.Context, up chan<- []*targetgroup.Group) }
case event := <-d.watcher.Events: // fsnotify sometimes sends a bunch of events without name or operation. // It's unclear what they are and why they are sent - filter them out. if len(event.Name) == 0 { break } // Everything but a chmod requires rereading. if event.Op^fsnotify.Chmod == 0 { break } // Changes to a file can spawn various sequences of events with // different combinations of operations. For all practical purposes // this is inaccurate. // The most reliable solution is to reload everything if anything happens. d.refresh(ctx, ch)
case <-ticker.C: // Setting a new watch after an update might fail. Make sure we don't lose // those files forever. d.refresh(ctx, ch)
case err := <-d.watcher.Errors: if err != nil { level.Error(d.logger).Log("msg", "Error watching file", "err", err) } } } }
level.Error(d.logger).Log("msg", "Error reading file", "path", p, "err", err) // Prevent deletion down below. ref[p] = d.lastRefresh[p] continue } select { // 把新传入的传入到Update中 case ch <- tgroups: case <-ctx.Done(): return }
ref[p] = len(tgroups) } // Send empty updates for sources that disappeared. for f, n := range d.lastRefresh { m, ok := ref[f] if !ok || n > m { level.Debug(d.logger).Log("msg", "file_sd refresh found file that should be removed", "file", f) d.deleteTimestamp(f) for i := m; i < n; i++ { select { case ch <- []*targetgroup.Group{{Source: fileSource(f, i)}}: case <-ctx.Done(): return } } } } d.lastRefresh = ref
func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { defer func() { for _, tc := range d.treeCaches { tc.Stop() } for _, pathUpdate := range d.pathUpdates { // Drain event channel in case the treecache leaks goroutines otherwise. for range pathUpdate { } } d.conn.Close() }()
for _, pathUpdate := range d.pathUpdates { go func(update chan treecache.ZookeeperTreeCacheEvent) { for event := range update { select { case d.updates <- event: case <-ctx.Done(): return } } }(pathUpdate) }
for { select { case <-ctx.Done(): return case event := <-d.updates: tg := &targetgroup.Group{ Source: event.Path, } if event.Data != nil { labelSet, err := d.parse(*event.Data, event.Path) if err == nil { tg.Targets = []model.LabelSet{labelSet} d.sources[event.Path] = tg } else { delete(d.sources, event.Path) } } else { delete(d.sources, event.Path) } select { case <-ctx.Done(): return case ch <- []*targetgroup.Group{tg}: } } } }
func (slice ByteSlice) Append(data []byte) []byte { // Body exactly the same as the Append function defined above. l := len(slice) if l+len(data) > cap(slice) { // reallocate // Allocate double what's needed, for future growth. newSlice := make([]byte, (l+len(data))*2) // The copy function is predeclared and works for any slice type. copy(newSlice, slice) slice = newSlice } slice = slice[0 : l+len(data)] copy(slice[l:], data) return slice }
func (p *ByteSlice) Append2(data []byte) { slice := *p // Body as above, without the return. l := len(slice) if l+len(data) > cap(slice) { // reallocate // Allocate double what's needed, for future growth. newSlice := make([]byte, (l+len(data))*2) // The copy function is predeclared and works for any slice type. copy(newSlice, slice) slice = newSlice } slice = slice[0 : l+len(data)] copy(slice[l:], data) *p = slice }
func (p *ByteSlice) Write(data []byte) (n int, err error) { slice := *p // Body as above, without the return. l := len(slice) if l+len(data) > cap(slice) { // reallocate // Allocate double what's needed, for future growth. newSlice := make([]byte, (l+len(data))*2) // The copy function is predeclared and works for any slice type. copy(newSlice, slice) slice = newSlice } slice = slice[0 : l+len(data)] copy(slice[l:], data) *p = slice *p = slice return len(data), nil }
func main() { var b ByteSlice b = b.Append([]byte{1, 2, 3}) fmt.Printf("byteSlice 1 is %v", b) b.Write([]byte{7, 8, 9}) fmt.Printf("byteSlice 2 is %v", b) /* func Fprintf(w io.Writer, format string, a ...interface{}) (n int, err error) */ fmt.Fprintf(&b, "This hour has %d days\n", 7) /* if use below code , will throw error fmt.Fprintf(b, "This hour has %d days\n", 7) */ b.Append2([]byte{4, 5, 6}) fmt.Printf("byteSlice 3 is %v", b) }
as type io.Writer in argument to fmt.Fprintf: ByteSlice does not implement io.Writer (Write method has pointer receiver)
根据官方的描述,原文如下:
1 2 3
The rule about pointers vs. values for receivers is that value methods can be invoked on pointers and values, but pointer methods can only be invoked on pointers.
This rule arises because pointer methods can modify the receiver; invoking them on a value would cause the method to receive a copy of the value, so any modifications would be discarded. The language therefore disallows this mistake. There is a handy exception, though. When the value is addressable, the language takes care of the common case of invoking a pointer method on a value by inserting the address operator automatically. In our example, the variable b is addressable, so we can call its Write method with just b.Write. The compiler will rewrite that to (&b).Write for us.
For example, the documentation for bytes.Buffer states that "the zero value for Buffer is an empty buffer ready to use." Similarly, sync.Mutex does not have an explicit constructor or Init method. Instead, the zero value for a sync.Mutex is defined to be an unlocked mutex.
The zero-value-is-useful property works transitively. Consider this type declaration.
type SyncedBuffer struct { lock sync.Mutex buffer bytes.Buffer }
但是有些时候直接初始化0值不要足够,需要一个构建者。像这个例子一样
1 2 3 4 5 6 7 8 9 10 11
func NewFile(fd int, name string) *File { if fd < 0 { return nil } f := new(File) f.fd = fd f.name = name f.dirinfo = nil f.nepipe = 0 return f }
因为上面这段代码有比较多的参数,因此我们可以用一个命名的字段来继续初始化
1 2 3 4 5 6 7
func NewFile(fd int, name string) *File { if fd < 0 { return nil } f := File{fd, name, nil, 0} return &f }
func main(){ // allocates slice structure; *p == nil; rarely useful var p *[]int = new([]int) // the slice v now refers to a new array of 100 ints var v []int = make([]int, 5) fmt.Printf("p values is %v,%v\n", p, *p==nil) fmt.Printf("v values is %v, %v\n", v, v==nil) }
------------------ p values is &[],true v values is [0 0 0 0 0], false
// Methods required by sort.Interface. func (s Sequence) Len() int { return len(s) } func (s Sequence) Less(i, j int) bool { return s[i] < s[j] } func (s Sequence) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// Copy returns a copy of the Sequence. func (s Sequence) Copy() Sequence { copy := make(Sequence, 0, len(s)) return append(copy, s...) }
// Method for printing - sorts the elements before printing. func (s Sequence) String() string { s = s.Copy() // Make a copy; don't overwrite argument. sort.Sort(s) str := "[" for i, elem := range s { // Loop is O(N²); will fix that in next example. if i > 0 { str += " " } str += fmt.Sprint(elem) } return str + "]" }
// 修改前 func (s Sequence) String() string { s = s.Copy() sort.Sort(s) return fmt.Sprint([]int(s)) }
// 修改后 type Sequence []int
// Method for printing - sorts the elements before printing func (s Sequence) String() string { s = s.Copy() sort.IntSlice(s).Sort() return fmt.Sprint([]int(s)) }
// PathError records an error and the operation and // file path that caused it. type PathError struct { Op string // "open", "unlink", etc. Path string // The associated file. Err error // Returned by the system call. }
func CubeRoot(x float64) float64 { z := x/3 // Arbitrary initial value for i := 0; i < 1e6; i++ { prevz := z z -= (z*z*z-x) / (3*z*z) if veryClose(z, prevz) { return z } } // A million iterations has not converged; something is wrong. panic(fmt.Sprintf("CubeRoot(%g) did not converge", x)) }
// Error is the type of a parse error; it satisfies the error interface. type Error string func (e Error) Error() string { return string(e) }
// error is a method of *Regexp that reports parsing errors by // panicking with an Error. func (regexp *Regexp) error(err string) { panic(Error(err)) }
// Compile returns a parsed representation of the regular expression. func Compile(str string) (regexp *Regexp, err error) { regexp = new(Regexp) // doParse will panic if there is a parse error. defer func() { if e := recover(); e != nil { regexp = nil // Clear return value. err = e.(Error) // Will re-panic if not a parse error. } }() return regexp.doParse(str), nil }
func Contents(filename string) (string, error) { f, err := os.Open(filename) if err != nil { return "", err } defer f.Close() // f.Close will run when we're finished.
var result []byte buf := make([]byte, 100) for { n, err := f.Read(buf[0:]) result = append(result, buf[0:n]...) // append is discussed later. if err != nil { if err == io.EOF { break } return "", err // f will be closed if we return here. } } return string(result), nil // f will be closed if we return here. }
-- OUTER JOIN SELECT A.PK AS A_PK, A.Value AS A_Value, B.Value AS B_Value, B.PK AS B_PK FROM Table_A A FULL OUTER JOIN Table_B B ON A.PK = B.PK
A_PK A_Value B_Value B_PK ---- ---------- ---------- ---- 1 FOX TROT 1 2 COP CAR 2 3 TAXI CAB 3 6 WASHINGTON MONUMENT 6 7 DELL PC 7 NULL NULL MICROSOFT 8 NULL NULL APPLE 9 NULL NULL SCOTCH 11 5 ARIZONA NULL NULL 4 LINCOLN NULL NULL 10 LUCENT NULL NULL
(11 row(s) affected)
高级Join类型
LEFT JOIN EXCLUDING INNER JOIN
选择A与B中,A没有与B有交集的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14
-- LEFT EXCLUDING JOIN SELECT A.PK AS A_PK, A.Value AS A_Value, B.Value AS B_Value, B.PK AS B_PK FROM Table_A A LEFT JOIN Table_B B ON A.PK = B.PK WHERE B.PK IS NULL
-- RIGHT EXCLUDING JOIN SELECT A.PK AS A_PK, A.Value AS A_Value, B.Value AS B_Value, B.PK AS B_PK FROM Table_A A RIGHT JOIN Table_B B ON A.PK = B.PK WHERE A.PK IS NULL
A_PK A_Value B_Value B_PK ---- ---------- ---------- ---- NULL NULL MICROSOFT 8 NULL NULL APPLE 9 NULL NULL SCOTCH 11
(3 row(s) affected)
OUTER JOIN EXCLUDING INNER JOIN
选择A与B中,A没有与B有交集的部分和B与A没有交集的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
-- OUTER EXCLUDING JOIN SELECT A.PK AS A_PK, A.Value AS A_Value, B.Value AS B_Value, B.PK AS B_PK FROM Table_A A FULL OUTER JOIN Table_B B ON A.PK = B.PK WHERE A.PK IS NULL OR B.PK IS NULL
select e.empno, e.name, e.job, e.sal, e.deptno from emp e, (select ename, job, sal from emp where job = "CLERK") V where V.ename = e.ename and V.job = e.job and V.sal = e.sal
Join的处理手法
1 2 3 4 5
select e.empno, e.name, e.job, e.sal, e.deptno from emp e join (select ename, job, sal from emp where job = "CLERK") V on ( V.ename = e.ename V.job = e.job V.sal = e.sal)
查询只存在于一个表中的数据
一般来说,直接使用not in 就可以了。但是对于如果含有Null的数据,就不能直接使用这样的方法处理。 那为什么null的数据就会出现问题呢?这个就要看一下他可能的实现方式 对于Mysql的实现, not in 和 in 本质上是 or的关系运算。 由于null 参与Or的逻辑运算方式不一致,In 和Not in 将产生不同的结果。
// 处理存在于EMP 不存于v的查询 select * from ( select e.empno, e.ename, e.job, e.mgr, e.hiredate, e.sal, e.comm, e.deptno, count(*) as cnt from emp e group by empno, ename, job, mgr, hiredate, sal, comm, deptno) e ) where not exists ( select null from ( select v.empno, v.ename, v.job, v.mgr, v.hiredate, v.sal, v.comm, v.deptno, count(*) as cnt from v group by empno, ename, job, mgr, hiredate, sal, comm, deptno) v where v.empno = e.empno and v.ename = e.ename and v.job = e.job and v.mgr = e.mgr and v.hiredate = e.hiredate and v.sal = e.sal and v.deptno = e.deptno and v.cnt = e.cnt and coalesce(v.comm, 0) = coalesce(e.comm, 0) ) )
// 处理存在于V 不存于EMP的查询 select * from ( select v.empno, v.ename, v.job, v.mgr, v.hiredate, v.sal, v.comm, v.deptno, count(*) as cnt from v group by empno, ename, job, mgr, hiredate, sal, comm, deptno) v ) where not exists ( select null from ( select e.empno, e.ename, e.job, e.mgr, e.hiredate, e.sal, e.comm, e.deptno, count(*) as cnt from v group by empno, ename, job, mgr, hiredate, sal, comm, deptno) e where v.empno = e.empno and v.ename = e.ename and v.job = e.job and v.mgr = e.mgr and v.hiredate = e.hiredate and v.sal = e.sal and v.deptno = e.deptno and v.cnt = e.cnt and coalesce(v.comm, 0) = coalesce(e.comm, 0) ) )
// 总体 select * from ( select e.empno, e.ename, e.job, e.mgr, e.hiredate, e.sal, e.comm, e.deptno, count(*) as cnt from emp e group by empno, ename, job, mgr, hiredate, sal, comm, deptno) e ) where not exists ( select null from ( select v.empno, v.ename, v.job, v.mgr, v.hiredate, v.sal, v.comm, v.deptno, count(*) as cnt from v group by empno, ename, job, mgr, hiredate, sal, comm, deptno) v where v.empno = e.empno and v.ename = e.ename and v.job = e.job and v.mgr = e.mgr and v.hiredate = e.hiredate and v.sal = e.sal and v.deptno = e.deptno and v.cnt = e.cnt and coalesce(v.comm, 0) = coalesce(e.comm, 0) ) ) Unoin all select * from ( select v.empno, v.ename, v.job, v.mgr, v.hiredate, v.sal, v.comm, v.deptno, count(*) as cnt from v group by empno, ename, job, mgr, hiredate, sal, comm, deptno) v ) where not exists ( select null from ( select e.empno, e.ename, e.job, e.mgr, e.hiredate, e.sal, e.comm, e.deptno, count(*) as cnt from v group by empno, ename, job, mgr, hiredate, sal, comm, deptno) e where v.empno = e.empno and v.ename = e.ename and v.job = e.job and v.mgr = e.mgr and v.hiredate = e.hiredate and v.sal = e.sal and v.deptno = e.deptno and v.cnt = e.cnt and coalesce(v.comm, 0) = coalesce(e.comm, 0) ) )
select x.name from ( select a.ename , ( select count(*) from emp b where b.ename <= a.ename ) as rn from emp a ) x where mod(rn,2) = 1
外查询使用OR逻辑
先去Join表,然后再去进行Or的逻辑判断
1 2 3 4
select e.ename, d.deptno , d.dname, d.loc from dept d left join emp e on (d.deptno = e.deptno and (e.deptno=10 or e.deptno=20)) order by 2
先创建一个中间表,然后再去进行Join的操作
select e.ename, d.deptno , d.dname, d.loc from dept d left join (select * from emp e where e.deptno=10 or e.deptno=20) on d.deptno = e.deptno order by 2
对单表需要做数据运算情况
情况1: 找出互逆的记录(本例) 情况2: 查找表中某列1相差为1,并且某列2差为5的记录
总体的思路,把自己与自己(或者与自己的子集)求笛卡尔积,然后去进行条件的筛选
1 2 3 4
select distinct v1.* from V v1, V v2 where v1.test1 = v2.test2 and v1.test2 = v2.test1 and v1.test1 <= v1.test2
找出最靠前的N条记录
此处使用了标量子查询来创建了一张临时表的RNK的列
1 2 3 4 5 6 7 8 9
// select ename, sal from ( select ( select (count(distinct b.sal) from emp b where a.sal <= b.sal) as rnk, a.sal, a.ename ) from emp a ) where rnk <=5
OrderBy
基础查询
1 2 3 4
// 升序查询 select * from emp order by col2 asc; // 降序查询 select * from emp order by col2 desc;
多字段排序
1
select empno, deptno, sal, ename, job from emp order by deptno (asc), sal desc;
动态排序
1 2 3 4
select ename, sal, job, comm from emp order by case when job = "salesman" then comm else sal end;
update
基础语法
1
update table name set col_name = xxx where $cond
delete
基础语法
1
delete from table_name where $cond
删除重复记录
1
delete from table where id not in (select min(id) from table group by name)
Having & GroupBy
1 2 3 4
wiki原文 A HAVING clause in SQL specifies that an SQL SELECT statement should only return rows where aggregate values meet the specified conditions. It was added to the SQL language because the WHERE keyword could not be used with aggregate functions. The HAVING clause filters the data on the group row but not on the individual row. To view the present condition formed by the GROUP BY clause, the HAVING clause is used.
Having的语句是必须要在GroupBy后面才能使用。并且与Where的区别是,Where不能直接接入聚合的函数(如Sum()、Count()、Avg()) 这种的聚合函数, 意思是不能 where sum(column_a) 这样的用法), 并且Having可以对按Group区分的Row进行过滤的操作
所以常规语法一般是
1
select * from table_a A group by columa_a having count (A.column_a ) > 200
特殊注意
类似于Sum, max, min , avg 这些也是可以直接用于select 的条件上面的
1
select max(Salary) as SecondHighestSalary from employee where salary<(select max(distinct(salary)) from employee)
sql 三元运算符 if (expr1, expr2, expr3) 跟正常编程语言中的三元运算符一致,只是语法有变动。也是满足条件一,则返回expr2,否则返回expr3
type ShardMaster struct { mu sync.Mutex me int rf *raft.Raft applyCh chan raft.ApplyMsg // Your data here. // Lab3 此处是为database 是一个map[string]string 的结构 configs []Config // indexed by config num dup map[int64]Result chanResult map[int]chan Result }
case Cfg: if kv.CfgDupCheck(cmd.ClientId, cmd.Seq) { kv.SwitchConfig(cmd) if kv.CheckMigrateDone() { // if migrate done, use new config, if not, do nothing to avoid replying the old group replied kv.myconfig[0] = kv.myconfig[1] //DPrintf("group %d-%d is applied new config , shard is %v, kv is %v", kv.gid, kv.me, kv.myshards, kv.database) } kv.cfgdup[cmd.ClientId] = cmd.Seq if kv.maxraftstate != -1 { kv.SaveSnapshot(index) } }
func (kv *ShardKV) SwitchConfig(newcfg Cfg) { if newcfg.NewConfig.Num == kv.myconfig[0].Num+1 { if kv.myconfig[0].Num != 0 { kv.GenShardChangeList(newcfg) } else if kv.myconfig[0].Num == 0 { for i := 0; i < shardmaster.NShards; i++ { if newcfg.NewConfig.Shards[i] == kv.gid { kv.myshards[i] = 1 } } } newc := kv.makeEmptyConfig() kv.CopyConfig(&newcfg.NewConfig, &newc) kv.myconfig[1] = newc } }
// 此函数是用于生成需要发送和修改那些部分的参数 func (kv *ShardKV) GenShardChangeList(newcfg Cfg) { for i := 0; i < shardmaster.NShards; i++ { if kv.myconfig[0].Shards[i] == kv.gid && newcfg.NewConfig.Shards[i] != kv.gid { //need to send kv.needsend[i] = newcfg.NewConfig.Shards[i] } if kv.myconfig[0].Shards[i] != kv.gid && newcfg.NewConfig.Shards[i] == kv.gid { //need to recv _, ok := kv.needrecv[kv.myconfig[0].Shards[i]] if !ok { kv.needrecv[kv.myconfig[0].Shards[i]] = make([]int, 0) } kv.needrecv[kv.myconfig[0].Shards[i]] = append(kv.needrecv[kv.myconfig[0].Shards[i]], i) } } DPrintf("!!! group %d-%d, new config need to send is %v, need to receive is %v", kv.gid, kv.me, kv.needsend, kv.needrecv) }
func (kv *ShardKV) MigrationRoutine() { for { if _, isLeader := kv.rf.GetState(); isLeader { kv.mu.Lock() for k, v := range kv.needrecv { //DPrintf("group %d-%d needrecv ") needshard := make([]int, 0) for i := 0; i < len(v); i++ { needshard = append(needshard, v[i]) }
args := PullArgs{Shard: needshard, ClientId: int64(kv.gid), Seq: kv.myconfig[0].Num} go func(mgid int, arg *PullArgs) { servers := kv.myconfig[0].Groups[mgid] DPrintf("Migrate: group %d-%d get Gid %d Servers is %v", kv.gid, kv.me, mgid, servers) for { for _, si := range servers { reply := PullReply{} srv := kv.make_end(si) DPrintf("group %d-%d start call to gid %d", kv.gid, kv.me, mgid) // 由GroupA ok := srv.Call("ShardKV.Pull", arg, &reply) //DPrintf("Migrate: group %d-%d calling for server %v rpc pull, result is %t", // kv.gid, kv.me, si, ok) if !ok { DPrintf("Migrate Failed: group %d-%d calling for server %v rpc pull, result is %t", kv.gid, kv.me, si, ok) } if ok && reply.WrongLeader == false { if reply.Err == ErrNeedWait { DPrintf("Migrate: waiting server %v to pull new config from SM", si) return } if _, isleader := kv.rf.GetState(); isleader { newmapkv := make(map[string]string) for k, v := range reply.MapKV { newmapkv[k] = v } var newdup [shardmaster.NShards]map[int64]int for i := 0; i < shardmaster.NShards; i++ { newdup[i] = make(map[int64]int) for k, v := range reply.ShardDup[i] { newdup[i][k] = v } } mig := Migrate{newmapkv, newdup, arg.Seq, mgid} kv.mu.Lock() // this is how partition data can be repliacated kv.rf.Start(mig) DPrintf("Migrate: group %d-%d start migrate the data pulled from %d", kv.gid, kv.me, mgid) kv.mu.Unlock() return } } else { DPrintf("Migrate Failed: group %d-%d call %d-%v meet wrong leader", kv.gid, kv.me, mgid, si) DPrintf("!!!server is %v", servers) } time.Sleep(20 * time.Millisecond) } } }(k, &args) } kv.mu.Unlock() } time.Sleep(100 * time.Millisecond) } }
case Migrate: if kv.MigrateDupCheck(cmd.Gid, cmd.Num) { //DPrintf("group %d-%d apply the migrate data from %d and config num %d", kv.gid, kv.me, cmd.Gid, cmd.Num) //DPrintf("group %d-%d database before migrate is %v", kv.gid, kv.me, kv.database) for k, v := range cmd.MapKV { kv.database[k] = v } //DPrintf("group %d-%d database after migrate is %v", kv.gid, kv.me, kv.database) for i := 0; i < shardmaster.NShards; i++ { for k, v := range cmd.ShardDup[i] { kv.dup[i][k] = v } } for i := 0; i < len(kv.needrecv[cmd.Gid]); i++ { kv.myshards[kv.needrecv[cmd.Gid][i]] = 1 } delete(kv.needrecv, cmd.Gid) if kv.CheckMigrateDone() { kv.myconfig[0] = kv.myconfig[1] DPrintf("Migrate: group %d-%d successful switch to config %d", kv.gid, kv.me, kv.myconfig[0].Num) //DPrintf("group %d-%d is applied new config , shard is %v", kv.gid, kv.me, kv.myshards) } kv.migratedup[cmd.Gid] = cmd.Num if kv.maxraftstate != -1 { kv.SaveSnapshot(index) } }
type KVServer struct { mu sync.Mutex me int // 包含了Raft的实例 rf *raft.Raft // Applych 是 用与接受Raft成形成共识后的返回 applyCh chan raft.ApplyMsg
maxraftstate int // snapshot if log grows this big database map[string]string dup map[int64]int chanResult map[int]chan Op }
// Type OP type Op struct { // Key 是 Get()、Put()、Append()三个都会用到的值 Key string // Value是Put()、Append()用到的字段,Get此处默认为空 Value string // 存放的是操作的名称 Name string // 用于表示客户端的来源 ClientId int64 // 给予序号 Seq int }
func (kv *KVServer) ApplyOPRoutine() { //this gorouine is to asyncly get the result of raft applych reply to // and to produce signal to reply client Rpc Request DPrintf("Apply gorountine runing ") for { msg := <-kv.applyCh //DPrintf("get apply msg from raftServer") if msg.CommandValid { index := msg.CommandIndex if cmd, ok := msg.Command.(Op); ok { kv.mu.Lock() // 对比单个客户端的序号,来减少重复的旧操作的更新操作 if kv.dupcheck(cmd.ClientId, cmd.Seq) { if cmd.Name == PUT { kv.database[cmd.Key] = cmd.Value } else if cmd.Name == APPEND { if _, ok := kv.database[cmd.Key]; ok { kv.database[cmd.Key] += cmd.Value } else { kv.database[cmd.Key] = cmd.Value } } kv.dup[cmd.ClientId] = cmd.Seq } res := Op{cmd.Key, kv.database[cmd.Key], cmd.Name, cmd.ClientId, cmd.Seq} ch, ok := kv.chanResult[index] if ok { select { case <-ch: default: } ch <- res //DPrintf("the cmd has been commited , push request return to chan") } if kv.maxraftstate != -1 && kv.rf.GetStateSize() >= kv.maxraftstate && index == kv.rf.GetCommitedIndex() { DPrintf("Do snapshot for over the maxraftstate") kv.DoSnapShot(index) } kv.mu.Unlock() } } else { kv.LoadSnapShot(msg.Snapshot)
} } }
// 把OP传入到Raft共识层的函数 func(rf *Raft) StartCommand(cmd Op) (Err, string){ index, _, isLeader := kv.rf.Start(cmd) //DPrintf("start command %s , client id is %d, key is %s, value is %s", // cmd.Name, cmd.ClientId, cmd.Key, cmd.Value) if !isLeader { kv.mu.Unlock() //DPrintf("not leader ") return ERRWrongLeader, "" } ch := make(chan Op, 1) kv.chanResult[index] = ch kv.mu.Unlock()
defer func() { // After finish the task kv.mu.Lock() delete(kv.chanResult, index) kv.mu.Unlock() }() select { case c := <-ch: // this channel return is get data from ApplyRoutine if kv.CheckSame(c, cmd) { resvalue := "" if cmd.Name == GET { resvalue = c.Value } return OK, resvalue } else { DPrintf("Leader has change, index %d op %s error", index, cmd.Name) return ERRWrongLeader, "" } case <-time.After(time.Duration(200) * time.Millisecond): DPrintf("log get agree timeout, index is %d", index) return ERRTimeout, "" } }