Skip to content

Fix: Ensure SIGKILL is sent to unresponsive lazy job processes #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions pkg/proc/basejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,25 @@ func (job *baseJob) SignalAll(sig syscall.Signal) {
}
}

if job.cmd == nil || job.cmd.Process == nil {
if !job.HasStarted() {
errFunc(
fmt.Errorf("job is not running"),
)
return
}

log.WithField("job.name", job.Config.Name).Infof("sending signal %d to process group", sig)
errFunc(syscall.Kill(-job.cmd.Process.Pid, sig))
errFunc(job.signalAll(sig))
}

func (job *baseJob) signalAll(sig syscall.Signal) error {
if !job.HasStarted() {
return nil
}
if job.cmd.Process.Pid <= 0 { // Ensure PID is positive before negating
return syscall.Errno(syscall.ESRCH) // No such process or invalid PID
}
return syscall.Kill(-job.cmd.Process.Pid, sig)
}

func (job *baseJob) Signal(sig os.Signal) {
Expand All @@ -49,7 +59,7 @@ func (job *baseJob) Signal(sig os.Signal) {
}
}

if job.cmd == nil || job.cmd.Process == nil {
if !job.HasStarted() {
errFunc(
fmt.Errorf("job is not running"),
)
Expand All @@ -58,10 +68,21 @@ func (job *baseJob) Signal(sig os.Signal) {

log.WithField("job.name", job.Config.Name).Infof("sending signal %d to process", sig)
errFunc(
job.cmd.Process.Signal(sig),
job.signal(sig),
)
}

func (job *baseJob) signal(sig os.Signal) error {
if !job.HasStarted() {
return nil
}
process, err := os.FindProcess(job.cmd.Process.Pid)
if err != nil {
return err
}
return process.Signal(sig)
}

func (job *baseJob) Reset() {
job.phase = JobPhase{}
}
Expand All @@ -74,6 +95,10 @@ func (job *baseJob) IsControllable() bool {
return job.Config.Controllable
}

func (job *baseJob) HasStarted() bool {
return job.cmd != nil && job.cmd.Process != nil
}

func (job *baseJob) GetPhase() *JobPhase {
return &job.phase
}
Expand Down Expand Up @@ -168,11 +193,11 @@ func (job *baseJob) startOnce(ctx context.Context, process chan<- *os.Process) e
select {
// job errChan or failed
case err := <-errChan:
if err := syscall.Kill(-job.cmd.Process.Pid, syscall.SIGTERM); err != nil {
if e, ok := err.(syscall.Errno); ok && e == 3 {
// this is fine; error 3 means that the process group does not exist anymore
if termErr := job.signalAll(syscall.SIGTERM); termErr != nil {
if e, ok := termErr.(syscall.Errno); ok && e == syscall.ESRCH {
// ESRCH (Error No Such Process) is fine, means process group already gone
} else {
l.WithError(err).Error("failed to send SIGTERM to job's process group")
l.WithError(termErr).Error("failed to send SIGTERM to job's process group")
}
}

Expand All @@ -192,21 +217,25 @@ func (job *baseJob) startOnce(ctx context.Context, process chan<- *os.Process) e
}
return err
case <-ctx.Done():
// ctx canceled, try to terminate job
_ = syscall.Kill(-job.cmd.Process.Pid, syscall.SIGTERM)
l.WithField("job.name", job.Config.Name).Info("sent SIGTERM to job's process group")
if job.HasStarted() {
// ctx canceled, try to terminate job
_ = job.signalAll(syscall.SIGTERM)
l.WithField("job.name", job.Config.Name).Info("sent SIGTERM to job's process group on ctx.Done")

select {
case <-time.After(time.Second * ShutdownWaitingTimeSeconds):
// process seems to hang, kill process
_ = syscall.Kill(-job.cmd.Process.Pid, syscall.SIGKILL)
l.WithField("job.name", job.Config.Name).Error("forcefully killed job")
return nil

case err := <-errChan:
// all good
return err
select {
case <-time.After(time.Second * ShutdownWaitingTimeSeconds):
// process seems to hang, kill process
_ = job.signalAll(syscall.SIGKILL)
l.WithField("job.name", job.Config.Name).Error("forcefully killed job")
return nil

case err := <-errChan:
// all good
return err
}
}
l.WithField("job.name", job.Config.Name).Info("context done, but process was not running or already cleaned up.")
return ctx.Err()
}
}

Expand Down
23 changes: 11 additions & 12 deletions pkg/proc/job_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,39 +177,38 @@ func (job *CommonJob) Watch() {
}

func (job *CommonJob) IsRunning() bool {
if job.cmd == nil {
if !job.HasStarted() || job.cmd.Process.Pid <= 0 {
return false
}
if job.cmd.Process == nil {
return false
}
if job.cmd.Process.Pid > 0 {
return syscall.Kill(job.cmd.Process.Pid, syscall.Signal(0)) == nil
}
return true
err := job.signal(syscall.Signal(0))
return err == nil
}

func (job *CommonJob) Restart() {
job.restart = true
job.SignalAll(syscall.SIGTERM)
job.interrupt()
if job.interrupt != nil {
job.interrupt()
}
}

func (job *CommonJob) Stop() {
job.stop = true
job.SignalAll(syscall.SIGTERM)
job.interrupt()
if job.interrupt != nil {
job.interrupt()
}
}

func (job *CommonJob) Status() *CommonJobStatus {
running := job.IsRunning()
running := job.HasStarted() && job.IsRunning()
var pid int
if running {
pid = job.cmd.Process.Pid
}
return &CommonJobStatus{
Pid: pid,
Running: job.IsRunning(),
Running: running,
Phase: job.phase,
Config: job.Config,
}
Expand Down
101 changes: 84 additions & 17 deletions pkg/proc/job_lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
log "github.com/sirupsen/logrus"
)

const lazyJobReapGracePeriod = 10 * time.Second

func (job *LazyJob) AssertStarted(ctx context.Context) error {
l := log.WithField("job.name", job.Config.Name)

Expand Down Expand Up @@ -83,32 +85,97 @@ func (job *LazyJob) Run(ctx context.Context, errors chan<- error) error {
return nil
}

func (job *LazyJob) startProcessGracePeriod(ctx context.Context, pid int) {
l := log.WithField("job.name", job.Config.Name)
graceTimer := time.NewTimer(lazyJobReapGracePeriod)
defer graceTimer.Stop()

select {
case <-ctx.Done():
l.Infof("context done during SIGTERM grace period for PID %d", pid)
return
case <-graceTimer.C:
job.lazyStartLock.Lock()
defer job.lazyStartLock.Unlock()

// Check if the process we sent SIGTERM to is still running
if job.process != nil && job.HasStarted() && job.cmd.Process.Pid == pid {
l.Warnf("process PID %d did not exit after SIGTERM and grace period; sending SIGKILL", pid)
if err := job.signalAll(syscall.SIGKILL); err != nil {
l.WithError(err).Errorf("failed to send1 SIGKILL to PID %d", pid)
}
} else if job.process != nil {
// Process is not nil, but it's not the one we targeted.
// This could happen if the job was quickly restarted.
currentPid := -1
if job.HasStarted() {
currentPid = job.cmd.Process.Pid
}
l.Warnf("original process PID %d seems to have exited or changed; current PID is %d. Skipping SIGKILL.", pid, currentPid)
} else {
// job.process is nil, so it was cleaned up.
l.Infof("process PID %d exited gracefully after SIGTERM", pid)
}
}
}

func (job *LazyJob) reapProcess() int {
l := log.WithField("job.name", job.Config.Name)
if job.activeConnections > 0 {
return 0
}
if diff := time.Since(job.lastConnectionClosed); diff < job.coolDownTimeout {
return 0
}
if job.process == nil {
return 0
}

job.lazyStartLock.Lock()
defer job.lazyStartLock.Unlock()

// Verify all conditions again inside the lock
if job.process == nil || job.activeConnections != 0 || job.lastConnectionClosed.IsZero() || time.Since(job.lastConnectionClosed) < job.coolDownTimeout {
return 0
}
if !job.HasStarted() {
l.Warn("job.process is not nil, but job.cmd or job.cmd.Process is nil; skipping reap cycle")
return 0
}

pidToReap := job.cmd.Process.Pid
l.Infof("sending SIGTERM to idle process PID %d", pidToReap)
if err := job.signal(syscall.SIGTERM); err != nil {
l.WithError(err).Warnf("failed to send SIGTERM to PID %d", pidToReap)
return 0
}

return pidToReap
}

func (job *LazyJob) startProcessReaper(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
reaperInterval := job.coolDownTimeout / 2
if reaperInterval < 1*time.Second {
reaperInterval = 1 * time.Second
}
ticker := time.NewTicker(reaperInterval)
go func() {
l := log.WithField("job.name", job.Config.Name)
l.Info("starting lazy job process reaper")
defer ticker.Stop()
defer l.Info("stopping lazy job process reaper")

for {
select {
case <-ctx.Done():
l.Info("context done, stopping lazy job process reaper")
return
case <-ticker.C:
if job.activeConnections > 0 {
pidToReap := job.reapProcess()
if pidToReap == 0 {
continue
}

diff := time.Since(job.lastConnectionClosed)
if diff < job.coolDownTimeout {
continue
}

if job.process == nil {
continue
}

job.lazyStartLock.Lock()

job.Signal(syscall.SIGTERM)

job.lazyStartLock.Unlock()
job.startProcessGracePeriod(ctx, pidToReap)
}
}
}()
Expand Down
Loading