原文链接:Writing worker queues, in Go

1.work.go

[root@wangjq queue]# cat work.go
package main import "time" type WorkRequest struct {
Name string
Delay time.Duration
}

2.collector.go

[root@wangjq queue]# cat collector.go
package main import (
"fmt"
"net/http"
"time"
) // A buffered channel that we can send work requests on.
var WorkQueue = make(chan WorkRequest, ) func Collector(w http.ResponseWriter, r *http.Request) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
} // Parse the delay.
delay, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
} // Check to make sure the delay is anywhere from 1 to 10 seconds.
if delay.Seconds() < || delay.Seconds() > {
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
} // Now, we retrieve the person's name from the request.
name := r.FormValue("name") // Just do a quick bit of sanity checking to make sure the client actually provided us with a name.
if name == "" {
http.Error(w, "You must specify a name.", http.StatusBadRequest)
return
} // Now, we take the delay, and the person's name, and make a WorkRequest out of them.
work := WorkRequest{Name: name, Delay: delay} // Push the work onto the queue.
WorkQueue <- work
fmt.Println("Work request queued") // And let the user know their work request was created.
w.WriteHeader(http.StatusCreated)
return
}

3.worker.go

[root@wangjq queue]# cat worker.go
package main import (
"fmt"
"time"
) // NewWorker creates, and returns a new Worker object. Its only argument
// is a channel that the worker can add itself to whenever it is done its
// work.
func NewWorker(id int, workerQueue chan chan WorkRequest) Worker {
// Create, and return the worker.
worker := Worker{
ID: id,
Work: make(chan WorkRequest),
WorkerQueue: workerQueue,
QuitChan: make(chan bool)} return worker
} type Worker struct {
ID int
Work chan WorkRequest
WorkerQueue chan chan WorkRequest
QuitChan chan bool
} // This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w *Worker) Start() {
go func() {
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w.Work select {
case work := <-w.Work:
// Receive a work request.
fmt.Printf("worker%d: Received work request, delaying for %f seconds\n", w.ID, work.Delay.Seconds()) time.Sleep(work.Delay)
fmt.Printf("worker%d: Hello, %s!\n", w.ID, work.Name) case <-w.QuitChan:
// We have been asked to stop.
fmt.Printf("worker%d stopping\n", w.ID)
return
}
}
}()
} // Stop tells the worker to stop listening for work requests.
//
// Note that the worker will only stop *after* it has finished its work.
func (w *Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}

4.dispatcher.go

[root@wangjq queue]# cat dispatcher.go
package main import "fmt" var WorkerQueue chan chan WorkRequest func StartDispatcher(nworkers int) {
// First, initialize the channel we are going to but the workers' work channels into.
WorkerQueue = make(chan chan WorkRequest, nworkers) // Now, create all of our workers.
for i := ; i < nworkers; i++ {
fmt.Println("Starting worker", i+)
worker := NewWorker(i+, WorkerQueue)
worker.Start()
} go func() {
for {
select {
case work := <-WorkQueue:
fmt.Println("Received work requeust")
go func() {
worker := <-WorkerQueue fmt.Println("Dispatching work request")
worker <- work
}()
}
}
}()
}

5.main.go

[root@wangjq queue]# cat main.go
package main import (
"flag"
"fmt"
"net/http"
) var (
NWorkers = flag.Int("n", , "The number of workers to start")
HTTPAddr = flag.String("http", "127.0.0.1:8000", "Address to listen for HTTP requests on")
) func main() {
// Parse the command-line flags.
flag.Parse() // Start the dispatcher.
fmt.Println("Starting the dispatcher")
StartDispatcher(*NWorkers) // Register our collector as an HTTP handler function.
fmt.Println("Registering the collector")
http.HandleFunc("/work", Collector) // Start the HTTP server!
fmt.Println("HTTP server listening on", *HTTPAddr)
if err := http.ListenAndServe(*HTTPAddr, nil); err != nil {
fmt.Println(err.Error())
}
}

6.编译

[root@wangjq queue]# go build -o queued *.go

7.运行

[root@wangjq queue]# ./queued -n
Starting the dispatcher
Starting worker
Starting worker
Starting worker
Starting worker
Starting worker
Registering the collector
HTTP server listening on 127.0.0.1:

8.测试

[root@wangjq ~]# for i in {..}; do curl localhost:/work -d name=$USER -d delay=$(expr $i % )s; done

9.效果

[root@wangjq queue]# ./queued -n
Starting the dispatcher
Starting worker
Starting worker
Starting worker
Starting worker
Starting worker
Registering the collector
HTTP server listening on 127.0.0.1:
Work request queued
Received work requeust
Dispatching work request
worker1: Received work request, delaying for 1.000000 seconds
Work request queued
Received work requeust
Dispatching work request
worker2: Received work request, delaying for 2.000000 seconds
Work request queued
Received work requeust
Dispatching work request
worker4: Received work request, delaying for 3.000000 seconds
worker1: Hello, root!
worker2: Hello, root!
worker4: Hello, root!

最新文章

  1. Poj2676
  2. MySQL中find_in_set()和in的区别
  3. qt qml ajax 获取 json 天气数据示例
  4. MFC编程 | 非模态对话框的定义
  5. 主成分分析(PCA)特征选择算法详解
  6. 解决mac升级后,出现的 xcrun: error: invalid active developer path, missing xcrun 错误
  7. ubantu安装sogou输入法
  8. hbase shell 常用命令
  9. MongoDB概述&amp;语法
  10. PHP文件操作常用函数总结
  11. CodeSmith exclude global 文件和文件夹问题 与 输入中文显示乱码问题
  12. postman: 用于网页调试和发送Http请求的chrome插件
  13. Oracle数据库应用
  14. UML第二次作业:类在类图中的表示
  15. rabbit基本原理 转
  16. BUG描述规范管理
  17. shell编程学习笔记(十二):Shell中的break/continue跳出循环
  18. 让Java线程池实现任务阻塞执行的一种可行方案
  19. zabbix_server 报警
  20. 滥用基于资源约束委派来攻击Active Directory

热门文章

  1. webserver 返回json 如何去掉 &lt;string xmlns=&quot;http://tempuri.org/&quot;&gt;
  2. Shell分析服务器日志,解锁各种新姿势
  3. HTML5其他标签应用
  4. linux下快速列出局域网中所有主机名(计算机名)的脚本
  5. 在excel中如何给一列数据批量加上双引号
  6. Checkbutton基本写法
  7. pandas | DataFrame基础运算以及空值填充
  8. 如何验证 names(名称), e-mails(邮件), 和 URLs
  9. 使用pdf.js实现前端页面预览pdf文档,解决了跨域请求
  10. SpringClould进行Devtools热部署