Go并发模式——Context


Context介绍

在Go服务器程序中,每个请求都会有一个goroutine去处理。然而,处理程序往往还需要创建额外的goroutine去访问后端资源,比如数据库、RPC服务等。由于这些goroutine都是在处理同一个请求,所以它们往往需要访问一些共享的资源,比如用户身份信息、认证token、请求截止时间等。而且如果请求超时或者被取消后,所有的goroutine都应该马上退出并且释放相关的资源。Go提供了一个Context包专门用来解决这个问题,本文我们来介绍Context包的基本使用方法。

接口

了解PostgreSQL源码的应该知道,PG里面也有一个Context,专门用来管理内存等资源,Go的这个Context的定位与之类似。我们看下Go的Context的定义:

type Context interface {
        // Deadline returns the time when work done on behalf of this context
        // should be canceled. Deadline returns ok==false when no deadline is
        // set. Successive calls to Deadline return the same results.
        Deadline() (deadline time.Time, ok bool)

        // Done returns a channel that's closed when work done on behalf of this
        // context should be canceled. Done may return nil if this context can
        // never be canceled. Successive calls to Done return the same value.
        //
        // WithCancel arranges for Done to be closed when cancel is called;
        // WithDeadline arranges for Done to be closed when the deadline
        // expires; WithTimeout arranges for Done to be closed when the timeout
        // elapses.
        //
        // Done is provided for use in select statements:
        //
        //  // Stream generates values with DoSomething and sends them to out
        //  // until DoSomething returns an error or ctx.Done is closed.
        //  func Stream(ctx context.Context, out chan<- Value) error {
        //      for {
        //          v, err := DoSomething(ctx)
        //          if err != nil {
        //              return err
        //          }
        //          select {
        //          case <-ctx.Done():
        //              return ctx.Err()
        //          case out <- v:
        //          }
        //      }
        //  }
        //
        // See https://blog.golang.org/pipelines for more examples of how to use
        // a Done channel for cancelation.
        Done() <-chan struct{}

        // Err returns a non-nil error value after Done is closed. Err returns
        // Canceled if the context was canceled or DeadlineExceeded if the
        // context's deadline passed. No other values for Err are defined.
        // After Done is closed, successive calls to Err return the same value.
        Err() error

        // Value returns the value associated with this context for key, or nil
        // if no value is associated with key. Successive calls to Value with
        // the same key returns the same result.
        //
        // Use context values only for request-scoped data that transits
        // processes and API boundaries, not for passing optional parameters to
        // functions.
        //
        // A key identifies a specific value in a Context. Functions that wish
        // to store values in Context typically allocate a key in a global
        // variable then use that key as the argument to context.WithValue and
        // Context.Value. A key can be any type that supports equality;
        // packages should define keys as an unexported type to avoid
        // collisions.
        //
        // Packages that define a Context key should provide type-safe accessors
        // for the values stored using that key:
        //
        //     // Package user defines a User type that's stored in Contexts.
        //     package user
        //
        //     import "context"
        //
        //     // User is the type of value stored in the Contexts.
        //     type User struct {...}
        //
        //     // key is an unexported type for keys defined in this package.
        //     // This prevents collisions with keys defined in other packages.
        //     type key int
        //
        //     // userKey is the key for user.User values in Contexts. It is
        //     // unexported; clients use user.NewContext and user.FromContext
        //     // instead of using this key directly.
        //     var userKey key = 0
        //
        //     // NewContext returns a new Context that carries value u.
        //     func NewContext(ctx context.Context, u *User) context.Context {
        //         return context.WithValue(ctx, userKey, u)
        //     }
        //
        //     // FromContext returns the User value stored in ctx, if any.
        //     func FromContext(ctx context.Context) (*User, bool) {
        //         u, ok := ctx.Value(userKey).(*User)
        //         return u, ok
        //     }
        Value(key interface{}) interface{}
}

Context接口只定义了四个方法,而且注释部分都有非常详细的说明,这里我们再简单补充介绍一下。

  1. Deadline方法。该方法返回工作结束的时间,代表这个Context可以被删除掉了。如果没有设置结束时间的话,ok的返回值为false。多次调用返回结果相同。

  2. Done方法。该方法返回一个接收channel,这个channel作为当前Context中正在运行的函数的删除信号,表示该函数应该上停止当前的工作并放回。并且Err方法返回一个错误值。

  3. Err方法。当Done方法关闭以后,Err方法返回非nil的错误值,目前该方法之定义了两个返回值:CanceledDeadlineExceeded

    // Canceled is the error returned by Context.Err when the context is canceled.
    var Canceled = errors.New("context canceled")
    
    // DeadlineExceeded is the error returned by Context.Err when the context's deadline passes.
    var DeadlineExceeded error = deadlineExceededError{}
  4. Value方法。该方法返回Context中key对应的值,如果key不存在,返回nil。多次调用返回值相同。这个主要是为了在Context中传递同一请求内可见的数据。当然这些数据必须是并发安全的。

Context主要是为了解决Go中的并发,所以Context是可以被多个goroutine并发调用的。

派生(Derived)Context

Context提供了派生机制,也就是我们可以从一个context再派生出子context,这些context组成一个树。一个context删除后,所有派生自它的context也会被删除。

Context提供了Background方法用来生成一个根context,它是不可以删除的。一般在主函数中使用:

//Background returns a non-nil, empty Context. It is never canceled, has no values, and has no deadline. It is typically used by the main function, initialization, and tests, and as the top-level Context for incoming requests.

func Background() Context

另外提供了三个函数WithCancelWithDeadlineWithTimeout ,用于派生子context,我们可以根据自己的需求选择合适的函数:

// WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// WithDeadline returns a copy of the parent context with the deadline adjusted to be no later than d. If the parent's deadline is already earlier than d, WithDeadline(parent, d) is semantically equivalent to parent. The returned context's Done channel is closed when the deadline expires, when the returned cancel function is called, or when the parent context's Done channel is closed, whichever happens first.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

// A CancelFunc tells an operation to abandon its work. A CancelFunc does not wait for the work to stop. After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()

删除context后,所有与该context关联的资源都会马上释放。所以我们一般在请求处理结束后,就应该马上删除该请求的context。

另外,还有一个函数WithValue,通过该函数,我们可以向Context中增加同一请求内可见的变量。虽然key的类型是interface{},但有一个限制就是提供的key必须是可比较的:

//WithValue returns a copy of parent in which the value associated with key is val.
//Use context Values only for request-scoped data that transits processes and APIs, not for passing optional parameters to functions.
//The provided key must be comparable.

func WithValue(parent Context, key, val interface{}) Context

实例

这个例子来源于Golang官方博客,实现的功能是通过Google Web Search API搜索并返回结果。但是Google Web Search API现在已经废弃了,所以程序无法获取到结果。不过我们看一下代码结构和Context的用法还是可以的。这个例子由三个包组成:

  • server:提供main函数和/search的处理程序

  • userip:从用户请求中获取用户IP并将其与context关联起来

  • google:提供Search函数并向Google发送查询请求。

完整程序如下:

server.go

// The server program issues Google search requests and demonstrates the use of
// the go.net Context API. It serves on port 8080.
//
// The /search endpoint accepts these query params:
//   q=the Google search query
//   timeout=a timeout for the request, in time.Duration format
//
// For example, http://localhost:8080/search?q=golang&timeout=1s serves the
// first few Google search results for "golang" or a "deadline exceeded" error
// if the timeout expires.
package main

import (
    "html/template"
    "log"
    "net/http"
    "time"

    "golang.org/x/blog/content/context/google"
    "golang.org/x/blog/content/context/userip"
    "golang.org/x/net/context"
)

func main() {
    http.HandleFunc("/search", handleSearch)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

// handleSearch handles URLs like /search?q=golang&timeout=1s by forwarding the
// query to google.Search. If the query param includes timeout, the search is
// canceled after that duration elapses.
func handleSearch(w http.ResponseWriter, req *http.Request) {
    // ctx is the Context for this handler. Calling cancel closes the
    // ctx.Done channel, which is the cancellation signal for requests
    // started by this handler.
    var (
        ctx    context.Context
        cancel context.CancelFunc
    )
    timeout, err := time.ParseDuration(req.FormValue("timeout"))
    if err == nil {
        // The request has a timeout, so create a context that is
        // canceled automatically when the timeout expires.
        ctx, cancel = context.WithTimeout(context.Background(), timeout)
    } else {
        ctx, cancel = context.WithCancel(context.Background())
    }
    defer cancel() // Cancel ctx as soon as handleSearch returns.

    // Check the search query.
    query := req.FormValue("q")
    if query == "" {
        http.Error(w, "no query", http.StatusBadRequest)
        return
    }

    // Store the user IP in ctx for use by code in other packages.
    userIP, err := userip.FromRequest(req)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    ctx = userip.NewContext(ctx, userIP)

    // Run the Google search and print the results.
    start := time.Now()
    results, err := google.Search(ctx, query)
    elapsed := time.Since(start)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    if err := resultsTemplate.Execute(w, struct {
        Results          google.Results
        Timeout, Elapsed time.Duration
    }{
        Results: results,
        Timeout: timeout,
        Elapsed: elapsed,
    }); err != nil {
        log.Print(err)
        return
    }
}

var resultsTemplate = template.Must(template.New("results").Parse(`
<html>
<head/>
<body>
  <ol>
  {{range .Results}}
    <li>{{.Title}} - <a href="{{.URL}}">{{.URL}}</a></li>
  {{end}}
  </ol>
  <p>{{len .Results}} results in {{.Elapsed}}; timeout {{.Timeout}}</p>
</body>
</html>
`))

userip.go

// Package userip provides functions for extracting a user IP address from a
// request and associating it with a Context.
//
// This package is an example to accompany https://blog.golang.org/context.
// It is not intended for use by others.
package userip

import (
    "fmt"
    "net"
    "net/http"

    "golang.org/x/net/context"
)

// FromRequest extracts the user IP address from req, if present.
func FromRequest(req *http.Request) (net.IP, error) {
    ip, _, err := net.SplitHostPort(req.RemoteAddr)
    if err != nil {
        return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
    }

    userIP := net.ParseIP(ip)
    if userIP == nil {
        return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
    }
    return userIP, nil
}

// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int

// userIPkey is the context key for the user IP address.  Its value of zero is
// arbitrary.  If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0

// NewContext returns a new Context carrying userIP.
func NewContext(ctx context.Context, userIP net.IP) context.Context {
    return context.WithValue(ctx, userIPKey, userIP)
}

// FromContext extracts the user IP address from ctx, if present.
func FromContext(ctx context.Context) (net.IP, bool) {
    // ctx.Value returns nil if ctx has no value for the key;
    // the net.IP type assertion returns ok=false for nil.
    userIP, ok := ctx.Value(userIPKey).(net.IP)
    return userIP, ok
}

google.go

// Package google provides a function to do Google searches using the Google Web
// Search API. See https://developers.google.com/web-search/docs/
//
// This package is an example to accompany https://blog.golang.org/context.
// It is not intended for use by others.
//
// Google has since disabled its search API,
// and so this package is no longer useful.
package google

import (
    "encoding/json"
    "net/http"

    "golang.org/x/blog/content/context/userip"
    "golang.org/x/net/context"
)

// Results is an ordered list of search results.
type Results []Result

// A Result contains the title and URL of a search result.
type Result struct {
    Title, URL string
}

// Search sends query to Google search and returns the results.
func Search(ctx context.Context, query string) (Results, error) {
    // Prepare the Google Search API request.
    req, err := http.NewRequest("GET", "https://ajax.googleapis.com/ajax/services/search/web?v=1.0", nil)
    if err != nil {
        return nil, err
    }
    q := req.URL.Query()
    q.Set("q", query)

    // If ctx is carrying the user IP address, forward it to the server.
    // Google APIs use the user IP to distinguish server-initiated requests
    // from end-user requests.
    if userIP, ok := userip.FromContext(ctx); ok {
        q.Set("userip", userIP.String())
    }
    req.URL.RawQuery = q.Encode()

    // Issue the HTTP request and handle the response. The httpDo function
    // cancels the request if ctx.Done is closed.
    var results Results
    err = httpDo(ctx, req, func(resp *http.Response, err error) error {
        if err != nil {
            return err
        }
        defer resp.Body.Close()

        // Parse the JSON search result.
        // https://developers.google.com/web-search/docs/#fonje
        var data struct {
            ResponseData struct {
                Results []struct {
                    TitleNoFormatting string
                    URL               string
                }
            }
        }
        if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
            return err
        }
        for _, res := range data.ResponseData.Results {
            results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
        }
        return nil
    })
    // httpDo waits for the closure we provided to return, so it's safe to
    // read results here.
    return results, err
}

// httpDo issues the HTTP request and calls f with the response. If ctx.Done is
// closed while the request or f is running, httpDo cancels the request, waits
// for f to exit, and returns ctx.Err. Otherwise, httpDo returns f's error.
func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
    // Run the HTTP request in a goroutine and pass the response to f.
    tr := &http.Transport{}
    client := &http.Client{Transport: tr}
    c := make(chan error, 1)
    go func() { c <- f(client.Do(req)) }()
    select {
    case <-ctx.Done():
        tr.CancelRequest(req)
        <-c // Wait for f to return.
        return ctx.Err()
    case err := <-c:
        return err
    }
}

代码里面都有详细注释,这里就不再赘述了。

本文主要翻译自:https://blog.golang.org/context


添加新评论

选择表情 captcha

友情提醒:不填或错填验证码会引起页面刷新,导致已填的评论内容丢失。

|