Go语言并发控制


Go语言并发控制

参考:
Go语言底层原理剖析 - 第17章 并发控制
Go语言设计与实现 - 6.2 同步原语与锁


在高并发场景下,必定涉及到协程之间的交流与并发控制。除了通道外,go语言还提供了context以及各种锁机制,下面逐一进行介绍。



1. context

Go1.7中引入context包,使得父子协程之间可以进行交流,可以优雅地控制协程的退出。

1.1 context使用方式

context是使用频率非常高的包,不仅Go源码中经常使用,很多Go编写的第三方包也有所使用。context一般作为接口的第一个参数传递超时信息,在Go源码中,net/http、net、sql包的使用方法如下:

// net/http
func (r *Request) WithContext(ctx context.Context) *Request

// sql
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error)

// net
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error)

1.2 context接口详解

1.2.1 context接口结构

context.Context本质是一个接口,提供了4种方法:

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
context-interface

1.2.2 context接口实现

context只是一个接口,意味着需要有具体的实现。Go标准库对此进行了简单的实现,用户也可以按照接口定义的方法自己实现。
context包中定义了一个空的context类emptyCtx,本质是int别名,并对上述4种函数进行了实现。

$GOPATH/src/context/context.go

// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key any) any {
    return nil
}

func (e *emptyCtx) String() string {
    switch e {
    case background:
        return "context.Background"
    case todo:
        return "context.TODO"
    }
    return "unknown empty Context"
}

之后声明了两个emptyCtx的对象:background&todo,并提供相应的获取方式,妥妥的单例模式-懒汉版本

var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
    return background
}

// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
    return todo
}

调用context.Background函数或context.TODO函数会返回最简单的context实现。context.Background函数一般作为根对象存在,其不可以退出,也不能携带值。要具体地使用context的功能,需要派生出新的context,配套的使用函数如下,其中前三个函数用于处理退出。

派生context配套函数

1.3 context原理

context在很大程度上利用了通道在close时会通知所有监听它的协程这一特性来实现。每个派生出的子协程都会创建一个新的退出通道,组织好context之间的关系即可实现继承链上退出的传递,图17-2所示的三个协程中,关闭通道A会连带关闭调用链上的通道B、通道C。

context退出

前面也已经提到,context包中定义了结构体emptyCtx,emptyCtx什么内容都没有,其不可以被退出,也不能携带值。
backgroundtodo是emptyCtx的实例化对象,可以通过context.Background函数或context.TODO函数获取到,一般作为最初始的根对象。

1.3.1 WithCancel

// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func() { c.cancel(true, Canceled) }
}

当调用WithCancel函数时,会产生一个子context结构cancelCtx,并保留了父context的信息。

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

children字段保存当前context之后派生的子context的信息,每个context都会有一个新的done通道,这保证了子context的退出不会影响父context。

1.3.2 WithTimeout

// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
//     func slowOperationWithTimeout(ctx context.Context) (Result, error) {
//         ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
//         defer cancel()  // releases resources if slowOperation completes before timeout elapses
//         return slowOperation(ctx)
//     }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

WithTimeout函数只是对时间的一个封装,最终会调用WithDeadline函数。

1.3.3 WithDeadline

// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        // The current deadline is already sooner than the new one.
        return WithCancel(parent)
    }
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)
    dur := time.Until(d)
    if dur <= 0 {
        c.cancel(true, DeadlineExceeded) // deadline has already passed
        return c, func() { c.cancel(false, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

WithDeadline函数先判断父context是否比当前设置的超时参数d先退出,如果是,那么子协程会随着父context的退出而退出,没有必要再设置定时器。然后创建一个新的context,初始化通道。

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
    done := parent.Done()
    if done == nil {
        return // parent is never canceled
    }

    select {
    case <-done:
        // parent is already canceled
        child.cancel(false, parent.Err())
        return
    default:
    }

    if p, ok := parentCancelCtx(parent); ok {
        p.mu.Lock()
        if p.err != nil {
            // parent has already been canceled
            child.cancel(false, p.err)
        } else {
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            p.children[child] = struct{}{}
        }
        p.mu.Unlock()
    } else {
        atomic.AddInt32(&goroutines, +1)
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

propagateCancel函数会将子context加入父协程的children哈希表中,并开启一个定时器。当定时器到期时,会调用cancel方法关闭通道。

1.3.4 WithValue

// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The provided key must be comparable and should not be of type
// string or any other built-in type to avoid collisions between
// packages using context. Users of WithValue should define their own
// types for keys. To avoid allocating when assigning to an
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
func WithValue(parent Context, key, val any) Context {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    if key == nil {
        panic("nil key")
    }
    if !reflectlite.TypeOf(key).Comparable() {
        panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
    Context
    key, val any
}

WithValue返回一个valueCtx结构体实例。




2. 数据争用 data race

2.1 两个例子

数据争用(data race)在Go语言中指两个协程同时访问相同的内存空间,并且至少有一个写操作的情况。这种情况常常是并发错误的根源,也是最难调试的并发错误之一。

package main

import (
    "fmt"
    "time"
)

var count = 0

func add() {
    count += 1
    time.Sleep(10 * time.Millisecond)
}

func main() {
    go add()
    go add()
    go add()
    time.Sleep(1 * time.Second)
    fmt.Println(count)
}

三个协程共同访问了全局变量count,乍看之下可能没有问题,但是该程序其实是有数据争用的,count的结果也是不明确的。count+=1操作看起来是一条指令,但是对CPU来说,需要先读取count的值,执行+1操作,再将count的值写回内存。

再举一例,Go语言中的经典数据争用错误。如下伪代码所示,在hash表中,存储了我们希望存储到Redis数据库中的数据(data)。但是在Go语言使用range时,变量k是一个堆上地址不变的对象,该地址存储的值会随着range遍历而发生变化。如果此时我们将变量k的地址放入协程save,以提供并发存储而不堵塞程序,那么最后的结果可能是后面的数据覆盖前面的数据,同时导致一些数据不被存储,并且每一次执行完存储的数据也是不明确的。

数据争用

2.2 检测工具race

Go 1.1后提供了强大的检查工具race来排查数据争用问题。race可以使用在多个Go指令中,当检测器在程序中找到数据争用时,将打印报告。对于第一个例子:

go run -race main.go

output:
==================
WARNING: DATA RACE
Read at 0x000100799880 by goroutine 8:
  main.add()
      /Users/xuxiangfei/Code/Demos/test_demos/main.go:11 +0x2c

Previous write at 0x000100799880 by goroutine 7:
  main.add()
      /Users/xuxiangfei/Code/Demos/test_demos/main.go:11 +0x40

Goroutine 8 (running) created at:
  main.main()
      /Users/xuxiangfei/Code/Demos/test_demos/main.go:17 +0x38

Goroutine 7 (running) created at:
  main.main()
      /Users/xuxiangfei/Code/Demos/test_demos/main.go:16 +0x2c
==================
3
Found 1 data race(s)
exit status 66

报错后输出的栈帧信息中能看出具体发生冲突的位置。Read at表明读取发生在main.go文件的第11行,而Previous write表明前一个写入也发生在main.go文件的第11行,从而非常快速地发现并定位数据争用问题。
竞争检测的成本因程序而异,对于典型的程序,内存使用量可能增加5~10倍,执行时间会增加2~20倍。




3. 锁机制

为了解决数据争用以及其他并发问题,Go语言提供了一系列的锁机制。

3.1 原子锁

所谓原子操作,就是“不可中断的一个或一系列操作”。对于2.1中的例子count+=1实际上并不是一个原子操作,读内存和写内存是两步原子操作。
需要有一种机制解决并发访问时数据冲突及内存操作乱序的问题,即提供一种原子性的操作。这通常依赖硬件的支持,例如X86指令集中的LOCK指令,对应Go语言中的**sync/atomic**包。下例使用了atomic.AddInt64函数将变量加1,这种原子操作不会发生并发时的数据争用问题。

package main

import (
    "fmt"
    "sync/atomic"
    "time"
)

var count int64 = 0

func add() {
    atomic.AddInt64(&count, 1)
    time.Sleep(10 * time.Millisecond)
}

func main() {
    go add()
    go add()
    go add()
    time.Sleep(1 * time.Second)
    fmt.Println(count)
}

原子操作是底层最基础的同步保证,通过原子操作可以构建起许多同步原语,例如自旋锁、信号量、互斥锁等。


3.2 自旋锁

在sync/atomic包中还有一个重要的操作—CompareAndSwap,与元素值进行对比并替换。使用该操作我们能够构建起自旋锁,只有获取该锁,才能执行区域中的代码。如下所示,使用一个for循环不断轮询原子操作,直到原子操作成功才获取该锁。
自旋锁会空转浪费资源,对系统对性能有很大的影响。

package main

import (
    "fmt"
    "sync/atomic"
    "time"
)

var flag int64 = 0
var count int64 = 0

func add() {
    for {
        if atomic.CompareAndSwapInt64(&flag, 0, 1) {
            count++
            atomic.StoreInt64(&flag, 0)
            return
        }
    }
    time.Sleep(10 * time.Millisecond)
}

func main() {
    go add()
    go add()
    go add()
    time.Sleep(1 * time.Second)
    fmt.Println(count)
}

3.3 互斥锁

通过原子操作构建起的互斥锁,虽然高效而且简单,但是其并不是万能的。例如,当某一个协程长时间霸占锁,其他协程抢占锁时将无意义地消耗CPU资源。同时当有许多正在获取锁的协程时,可能有协程一直抢占不到锁。

3.3.1 思想介绍

为了解决这种问题,操作系统的锁接口提供了终止与唤醒的机制,例如Linux中的pthread mutex,避免了频繁自旋造成的浪费。
在操作系统内部会构建起锁的等待队列,以便之后依次被唤醒。调用操作系统级别的锁会锁住整个线程使之无法运行,另外锁的抢占还会涉及线程之间的上下文切换。

3.3.2 使用说明

Go语言拥有比线程更加轻量级的协程,在协程的基础上实现了一种比传统操作系统级别的锁更加轻量级的互斥锁,其使用方式如下所示。

var count int64 = 0
var m sync.Mutex

func add() {
    m.Lock()
    count += 1
    m.Unlock()
}

sync.Mutex构建起了互斥锁,在同一时刻,只会有一个获取锁的协程继续执行,而其他的协程将陷入等待状态,这和自旋锁的功能是类似的,但是其提供了更加复杂的机制避免自旋锁的争用问题。

3.3.3 实现原理

3.3.3.1 数据结构

互斥锁是一种混合锁,其实现方式包含了自旋锁,同时参考了操作系统锁的实现。

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
    state int32
    sema  uint32
}

sync.Mutex结构比较简单,其包含了表示当前锁状态的state及信号量sema。

  • state:通过位图的形式存储了当前锁的状态,如图所示,其中包含锁是否为锁定状态、正在等待被锁唤醒的协程数量、两个和饥饿模式有关的标志。
Mutex
  • sema:互质锁中实现的信号量。

为了解决某一个协程可能长时间无法获取锁的问题,Go 1.9之后使用了饥饿模式。在饥饿模式下,unlock会唤醒最先申请加速的协程,从而保证公平。

3.3.3.1 正常模式 & 饥饿模式

Mutex有两种模式:正常模式 与 饥饿模式

正常模式:锁的等待者会按照先进先出的顺序获取锁。刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被“饿死”。

饥饿模式:互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

3.3.3.2 获取锁
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}
获取互斥锁

3.4 读写锁

3.4.1 思想介绍

读写锁通过两种锁来实现,一种为读锁,另一种为写锁。当进行读取操作时,需要加读锁,而进行写入操作时需要加写锁。

读锁 写锁
读锁 不互斥 互斥
写锁 互斥 互斥
  1. 多个协程可以同时获得读锁并执行
  2. 如果此时有协程申请了写锁,那么该写锁会等待所有的读锁都释放后才能获取写锁继续执行
  3. 如果当前的协程申请读锁时已经存在写锁,那么读锁会等待写锁释放后再获取锁继续执行

举个例子,哈希表并不是并发安全的,它只能够并发读取,并发写入时会出现冲突。一种简单的规避方式如下所示,可以在获取map中的数据时加入RLock读锁,在写入数据时使用Lock写锁。

3.4.2 实现原理

读写锁复用了互斥锁及信号量这两种机制。

// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // semaphore for writers to wait for completing readers
    readerSem   uint32 // semaphore for readers to wait for completing writers
    readerCount int32  // number of pending readers
    readerWait  int32  // number of departing readers
}

const rwmutexMaxReaders = 1 << 30
  1. 读锁加锁
    先通过原子操作将readerCount加1,如果readerCount≥0就直接返回,所以如果只有获取读取锁的操作,那么其成本只有一个原子操作。当readerCount<0时,说明当前有写锁,当前协程将借助信号量陷入等待状态,如果获取到信号量则立即退出,没有获取到信号量时的逻辑与互斥锁的逻辑相似。
  2. 读锁解锁
    如果当前没有写锁,则其成本只有一个原子操作并直接退出。
    如果当前有写锁正在等待,则调用rUnlockSlow判断当前是否为最后一个被释放的读锁,如果是则需要增加信号量并唤醒写锁。
  3. 写锁加锁
    调用Lock方法,必须先获取互斥锁,因为它复用了互斥锁的功能。接着readerCount减去rwmutexMaxReaders阻止后续的读操作。
    但获取互斥锁并不一定能直接获取写锁,如果当前已经有其他Goroutine持有互斥锁的读锁,那么当前协程会加入全局等待队列并进入休眠状态,当最后一个读锁被释放时,会唤醒该协程。
  4. 写锁解锁
    调用Unlock方法。将readerCount加上rwmutexMaxReaders,表示不会堵塞后续的读锁,依次唤醒所有等待中的读锁。当所有的读锁唤醒完毕后会释放互斥锁。

3.5 WaitGroup

3.5.1 使用方式

sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:

requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        // res, err := service.call(r)
    }(request)
}
wg.Wait()

类似于操作系统中的 fork-in 模式,WaitGroup.Wait() 会hang住,直到 len(requests) 个子协程全部 Done() 了。

3.5.2 实现原理

3.5.2.1 数据结构
type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

WaitGroup结构体中只包含两个成员变量:

  • noCopy — 保证 WaitGroup不会被开发者通过再赋值的方式拷贝
  • state1 — 存储着状态和信号量;
golang-waitgroup-state

WaitGroup对外暴露了三个方法:

  1. WaitGroup.Add
  2. WaitGroup.Wait
  3. WaitGroup.Done
3.5.2.2 WaitGroup.Add
func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if v > 0 || w == 0 {
        return
    }
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

WaitGroup.Add 可以更新 WaitGroup 中的计数器 counter。虽然 WaitGroup.Add 方法传入的参数可以为负数,但是计数器只能是非负数,一旦出现负数就会发生程序崩溃。当调用计数器归零,即所有任务都执行完成时,才会通过 sync.runtime_Semrelease 唤醒处于等待状态的 Goroutine。

3.5.2.3 WaitGroup.Wait
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        if v == 0 {
            return
        }
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap)
            if +statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

WaitGroup.Add 会在计数器大于 0 并且不存在等待的 Goroutine 时,调用 runtime.sync_runtime_Semacquire陷入睡眠。

WaitGroup 的计数器归零时,陷入睡眠状态的 Goroutine 会被唤醒,上述方法也会立刻返回。

3.5.2.4 WaitGroup.Done
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

WaitGroup.Done 只是对 WaitGroup.Add 方法的简单封装,我们可以向 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine;


3.6 Once

3.6.1 使用方式

func main() {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        o.Do(func() {
            fmt.Println("only once")
        })
    }
}

// only once

sync.Once可以保证在 Go 程序运行期间的某段代码只会执行一次。

3.6.2 实现原理

3.6.2.1 数据结构
type Once struct {
    done uint32
    m    Mutex
}
  • done: 标识代码块是否执行过
  • m: 互斥锁
3.6.2.2 Once.Do

Once.DoOnce结构体对外唯一暴露的方法,该方法会接收一个入参为空的函数:

  • 如果传入的函数已经执行过,会直接返回;
  • 如果传入的函数没有执行过,会调用 Once.doSlow执行传入的函数:
func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}
  1. 为当前 Goroutine 获取互斥锁;
  2. 执行传入的无入参函数;
  3. 运行延迟函数调用,将成员变量 done 更新成 1;

3.6 总结

Go语言中的互斥锁算一种混合锁,结合了原子操作、自旋、信号量、全局哈希表、等待队列、操作系统级别锁等多种技术,实现相对复杂。
但是Go语言的锁相对于操作系统级别的锁更快,因为在大部分情况下锁的争用停留在用户态。在有些场景下使用锁更简单,会比通道有更清晰的语义表达,需要结合具体的场景使用。


文章作者: xuxiangfei
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 xuxiangfei !
  目录