当前位置: > > > > Goroutine-停止正在运行的进程
Goroutine-停止正在运行的进程
来源:stackoverflow
2024-04-21 15:12:32
0浏览
收藏
Golang不知道大家是否熟悉?今天我将给大家介绍《Goroutine-停止正在运行的进程》,这篇文章主要会讲到等等知识点,如果你在看完本篇文章后,有更好的建议或者发现哪里有问题,希望大家都能积极评论指出,谢谢!希望我们能一起加油进步!
问题内容
我使用以下代码,在大多数情况下都可以正常工作,以防我们使用一些长时间运行的进程,该进程不会在程序内停止不会结束(这里我将时间限制为 60 秒)示例)
我希望每个作业都会在5秒后终止(即使它没有完成工作也终止进程), 如何不更改函数 mylongrunningfunc 来做到这一点。
我知道在 go 中解决这个问题并不简单,有什么我可以使用的技巧吗?
这是一些最小的可重现示例
https://play.golang.org/p/a0rwy4bywmt
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/gammazero/workerpool"
)
func main() {
// here define a timeout for 5 sec,
// the task should be terminate after 5 sec
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
runner := newRunner(ctx, 10)
runner.do(job{
Name: "a",
Task: func() jobResult {
select {
case <-ctx.Done():
return jobResult{Error: errors.New("Timedout, exiting")}
default:
myLongRunningFunc("A job")
}
return jobResult{Data: "from a"}
},
})
runner.do(job{
Name: "b",
Task: func() jobResult {
select {
case <-ctx.Done():
return jobResult{Error: errors.New("Timeouts, exiting")}
default:
myLongRunningFunc("B job")
}
return jobResult{Data: "from b"}
},
})
results := runner.getjobResults()
fmt.Println(results)
time.Sleep(time.Second * 60)
}
func myLongRunningFunc(name string) {
for i := 0; i < 100000; i++ {
time.Sleep(time.Second * 1)
msg := "job" + name + " running..\n"
fmt.Println(msg)
}
}
type runner struct {
*workerpool.WorkerPool
ctx context.Context
kill chan struct{}
result chan jobResult
results []jobResult
}
func (r *runner) processResults() {
for {
select {
case res, ok := <-r.result:
if !ok {
goto Done
}
r.results = append(r.results, res)
}
}
Done:
<-r.kill
}
func newRunner(ctx context.Context, numRunners int) *runner {
r := &runner{
WorkerPool: workerpool.New(numRunners),
ctx: ctx,
kill: make(chan struct{}),
result: make(chan jobResult),
}
go r.processResults()
return r
}
func (r *runner) do(j job) {
r.Submit(r.wrap(&j))
}
func (r *runner) getjobResults() []jobResult {
r.StopWait()
close(r.result)
r.kill <- struct{}{}
return r.results
}
func (r *runner) wrap(job *job) func() {
return func() {
job.result = make(chan jobResult)
go job.Run()
select {
case res := <-job.result:
r.result <- res
case <-r.ctx.Done():
fmt.Printf("Job '%s' should stop here\n", job.Name)
r.result <- jobResult{name: job.Name, Error: r.ctx.Err()}
}
}
}
type job struct {
Name string
Task func() jobResult
Context context.Context
result chan jobResult
stopped chan struct{}
done context.CancelFunc
}
func (j *job) Run() {
result := j.Task()
result.name = j.Name
j.result <- result
}
type jobResult struct {
name string
Error error
Data interface{}
}
由于我使用chancel频道,因此编辑不相关
解决方案
我希望每个作业都被终止(杀死进程,即使它是 没有完成工作)5秒后,我怎样才能在不改变的情况下完成它 函数 mylongrunningfunc。
然后你只需添加一个5秒的服务员然后退出。
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/gammazero/workerpool"
)
func main() {
go func() {
// here define a timeout for 5 sec,
// the task should be terminate after 5 sec
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
runner := newRunner(ctx, 10)
runner.do(job{
Name: "a",
Task: func() jobResult {
select {
case <-ctx.Done():
return jobResult{Error: errors.New("Timedout, exiting")}
default:
myLongRunningFunc("A job")
}
return jobResult{Data: "from a"}
},
})
runner.do(job{
Name: "b",
Task: func() jobResult {
select {
case <-ctx.Done():
return jobResult{Error: errors.New("Timeouts, exiting")}
default:
myLongRunningFunc("B job")
}
return jobResult{Data: "from b"}
},
})
results := runner.getjobResults()
fmt.Println(results)
time.Sleep(time.Second * 60)
}()
<-time.After(time.Second * 5)
}
func myLongRunningFunc(name string) {
for i := 0; i < 100000; i++ {
time.Sleep(time.Second * 1)
msg := "job" + name + " running..\n"
fmt.Println(msg)
}
}
type runner struct {
*workerpool.WorkerPool
ctx context.Context
kill chan struct{}
result chan jobResult
results []jobResult
}
func (r *runner) processResults() {
for {
select {
case res, ok := <-r.result:
if !ok {
goto Done
}
r.results = append(r.results, res)
}
}
Done:
<-r.kill
}
func newRunner(ctx context.Context, numRunners int) *runner {
r := &runner{
WorkerPool: workerpool.New(numRunners),
ctx: ctx,
kill: make(chan struct{}),
result: make(chan jobResult),
}
go r.processResults()
return r
}
func (r *runner) do(j job) {
r.Submit(r.wrap(&j))
}
func (r *runner) getjobResults() []jobResult {
r.StopWait()
close(r.result)
r.kill <- struct{}{}
return r.results
}
func (r *runner) wrap(job *job) func() {
return func() {
job.result = make(chan jobResult)
go job.Run()
select {
case res := <-job.result:
r.result <- res
case <-r.ctx.Done():
fmt.Printf("Job '%s' should stop here\n", job.Name)
r.result <- jobResult{name: job.Name, Error: r.ctx.Err()}
}
}
}
type job struct {
Name string
Task func() jobResult
Context context.Context
result chan jobResult
stopped chan struct{}
done context.CancelFunc
}
func (j *job) Run() {
result := j.Task()
result.name = j.Name
j.result <- result
}
type jobResult struct {
name string
Error error
Data interface{}
}
我认为不可能从外部 goroutine 停止 goroutine。您可以检查它是否超时,但是您无法阻止它。
您可以做的是通过通道向 goroutine 发送一条消息,该消息可以被监视并在这种情况下停止。
今天关于《Goroutine-停止正在运行的进程》的内容介绍就到此结束,如果有什么疑问或者建议,可以在米云公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!
