docker容器stop流程

从API route开始看StopContainer接口的调用过程。

// NewRouter initializes a new container router
func NewRouter(b Backend, decoder httputils.ContainerDecoder) router.Router {
   
	r := &containerRouter{
   
		backend: b,
		decoder: decoder,
	}
	r.initRoutes()
	return r
}
...
// initRoutes initializes the routes in container router
func (r *containerRouter) initRoutes() {
   
    r.routes = []router.Route{
   
        ...
        router.NewPostRoute("/containers/{name:.*}/stop", r.postContainersStop),
        ...
    }
}
func (s *containerRouter) postContainersStop(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
   
    ...
	if err := s.backend.ContainerStop(vars["name"], seconds); err != nil {
   
		return err
	}
	w.WriteHeader(http.StatusNoContent)
	return nil
}
func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
   
    ...
    d, err := daemon.NewDaemon(ctx, cli.Config, pluginStore)
    ...
}
// ContainerStop looks for the given container and stops it.
// In case the container fails to stop gracefully within a time duration
// specified by the timeout argument, in seconds, it is forcefully
// terminated (killed).
//
// If the timeout is nil, the container's StopTimeout value is used, if set,
// otherwise the engine default. A negative timeout value can be specified,
// meaning no timeout, i.e. no forceful termination is performed.
func (daemon *Daemon) ContainerStop(name string, timeout *int) error {
   
	container, err := daemon.GetContainer(name)
	if err != nil {
   
		return err
	}
	if !container.IsRunning() {
   
		return containerNotModifiedError{
   running: false}
	}
	if timeout == nil {
   
		stopTimeout := container.StopTimeout()
		timeout = &stopTimeout
	}
	if err := daemon.containerStop(container, *timeout); err != nil {
   
		return errdefs.System(errors.Wrapf(err, "cannot stop container: %s", name))
	}
	return nil
}
// containerStop sends a stop signal, waits, sends a kill signal.
func (daemon *Daemon) containerStop(container *containerpkg.Container, seconds int) error {
   
	if !container.IsRunning() {
   
		return nil
	}

	stopSignal := container.StopSignal()
	// 1. Send a stop signal
	if err := daemon.killPossiblyDeadProcess(container, stopSignal); err != nil {
   
		// While normally we might "return err" here we're not going to
		// because if we can't stop the container by this point then
		// it's probably because it's already stopped. Meaning, between
		// the time of the IsRunning() call above and now it stopped.
		// Also, since the err return will be environment specific we can't
		// look for any particular (common) error that would indicate
		// that the process is already dead vs something else going wrong.
		// So, instead we'll give it up to 2 more seconds to complete and if
		// by that time the container is still running, then the error
		// we got is probably valid and so we force kill it.
		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
		defer cancel()

		if status := <-container.Wait(ctx, containerpkg.WaitConditionNotRunning); status.Err() != nil {
   
			logrus.Infof("Container failed to stop after sending signal %d to the process, force killing", stopSignal)
			if err := daemon.killPossiblyDeadProcess(container, 9); err != nil {
   
				return err
			}
		}
	}

	// 2. Wait for the process to exit on its own
	ctx := context.Background()
	if seconds >= 0 {
   
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, time.Duration(seconds)*time.Second)
		defer cancel()
	}

	if status := <-container.Wait(ctx, containerpkg.WaitConditionNotRunning); status.Err() != nil {
   
		logrus.Infof("Container %v failed to exit within %d seconds of signal %d - using the force", container.ID, seconds, stopSignal)
		// 3. If it doesn't, then send SIGKILL
		if err := daemon.Kill(container); err != nil {
   
			// Wait without a timeout, ignore result.
			<-container.Wait(context.Background(), containerpkg.WaitConditionNotRunning)
			logrus.Warn(err) // Don't return error because we only care that container is stopped, not what function stopped it
		}
	}

	daemon.LogContainerEvent(container, "stop")
	return nil
}

container.StopSignal() 优先用容器指定的信号,如果没有则默认是SIGTERM, 如果2s后容器仍未退出,再按上层(kubelet)指定的超时时间等待容器退出。
如果容器始终未退出,daemon.Kill(container) 给容器发送SIGKILL信号,强制容器退出。

这里涉及容器的两种启动方式:

  • shell格式

PID1进程为 /bin/sh -c,
因为/bin/sh不会转发信号至任何子进程。所以我们的应用将永远不会收到SIGTERM信号。显然要解决这个问题,就需要将我们的进程作为PID1进程运行。

  • exec格式

PID进程为应用程序执行文件(脚本或二进制), 我们的程序捕获了docker stop命令发送的SIGTERM信号

优先看下强制删除的过程

// Kill forcefully terminates a container.
func (daemon *Daemon) Kill(container *containerpkg.Container) error {
   
	if !container.IsRunning() {
   
		return errNotRunning(container.ID)
	}

	// 1. Send SIGKILL
	if err := daemon.killPossiblyDeadProcess(container, int(syscall.SIGKILL)); err != nil {
   
		// While normally we might "return err" here we're not going to
		// because if we can't stop the container by this point then
		// it's probably because it's already stopped. Meaning, between
		// the time of the IsRunning() call above and now it stopped.
		// Also, since the err return will be environment specific we can't
		// look for any particular (common) error that would indicate
		// that the process is already dead vs something else going wrong.
		// So, instead we'll give it up to 2 more seconds to complete and if
		// by that time the container is still running, then the error
		// we got is probably valid and so we return it to the caller.
		if isErrNoSuchProcess(err) {
   
			return nil
		}

		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
		defer cancel()

		if status := <-container.Wait(ctx, containerpkg.WaitConditionNotRunning); status.Err() != nil {
   
			return err
		}
	}

	// 2. Wait for the process to die, in last resort, try to kill the process directly
	if err := killProcessDirectly(container); err != nil {
   
		if isErrNoSuchProcess(err) {
   
			return nil
		}
		return err
	}

	// Wait for exit with no timeout.
	// Ignore returned status.
	<-container.Wait(context.Background(), containerpkg.WaitConditionNotRunning)

	return nil
}

killWithSignal() 先从容器层面尝试停止容器,如果容器是 Restarting 状态,就放弃这次的Kill操作。
如果容器时 Paused 状态,先执行Resume,在容器恢复后,立刻发送SIGKILL。

等待2s,容器状态没有转成 NotRunning, 就直接给容器中的进程发送SIGKILL。到这里再等上10s,如果容器还不退,就查询容器的1号进程,发送SIGKILL。

<-container.Wait 发送完SIGKILL后,开始阻塞等, 这次没有设置超时,就是死等, 这时当前goroutine 握着一把容器级别的锁(state.Lock()) 。

TODO: daemon.containerd.Resume()

// killWithSignal sends the container the given signal. This wrapper for the
// host specific kill command prepares the container before attempting
// to send the signal. An error is returned if the container is paused
// or not running, or if there is a problem returned from the
// underlying kill command.
func (daemon *Daemon) killWithSignal(container *containerpkg.Container, sig int) error {
   
	logrus.Debugf("Sending kill signal %d to container %s", sig, container.ID)
	container.Lock()
	defer container.Unlock()

	if !container.Running {
   
		return errNotRunning(container.ID)
	}

	var unpause bool
	if container.Config.StopSignal != "" && syscall.Signal(sig) != syscall.SIGKILL {
   
		...
	} else {
   
		container.ExitOnNext()
		unpause = container.Paused
	}

	if !daemon.IsShuttingDown() {
   
		container.HasBeenManuallyStopped = true
		container.CheckpointTo(daemon.containersReplica)
	}

	// if the container is currently restarting we do not need to send the signal
	// to the process. Telling the monitor that it should exit on its next event
	// loop is enough
	if container.Restarting {
   
		return nil
	}

	if err := daemon.kill(container, sig); err != nil {
   
		if errdefs.IsNotFound(err) {
   
			unpause = false
			logrus.WithError(err).WithField("container", container.ID).WithField("action", "kill").Debug("container kill failed because of 'container not found' or 'no such process'")
		} else {
   
			return errors.Wrapf(err, "Cannot kill container %s", container.ID)
		}
	}

	if unpause {
   
		// above kill signal will be sent once resume is finished
		if err := daemon.containerd.Resume(context.Background(), container.ID); err != nil {
   
			logrus.Warnf("Cannot unpause container %s: %s", container.ID, err)
		}
	}

	attributes := map[string]string{
   
		"signal": fmt.Sprintf("%d", sig),
	}
	daemon.LogContainerEventWithAttributes(container, "kill", attributes)
	return nil
}

func killProcessDirectly(cntr *container.Container) error {
   
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// Block until the container to stops or timeout.
	status := <-cntr.Wait(ctx, container.WaitConditionNotRunning)
	if status.Err() != nil {
   
		// Ensure that we don't kill ourselves
		if pid := cntr.GetPID(); pid != 0 {
   
			logrus.Infof("Container %s failed to exit within 10 seconds of kill - trying direct SIGKILL", stringid.TruncateID(cntr.ID))
			if err := unix.Kill(pid, 9); err != nil {
   
				if err != unix.ESRCH {
   
					return err
				}
				e := errNoSuchProcess{
   pid, 9}
				logrus.Debug(e)
				return e
			}
		}
	}
	return nil
}
// Wait waits until the container is in a certain state indicated by the given
// condition. A context must be used for cancelling the request, controlling
// timeouts, and avoiding goroutine leaks. Wait must be called without holding
// the state lock. Returns a channel from which the caller will receive the
// result. If the container exited on its own, the result's Err() method will
// be nil and its ExitCode() method will return the container's exit code,
// otherwise, the results Err() method will return an error indicating why the
// wait operation failed.
func (s *State) Wait(ctx context.Context, condition WaitCondition) <-chan StateStatus {
   
	s.Lock()
	defer s.Unlock()

	if condition == WaitConditionNotRunning && !s.Running {
   
		// Buffer so we can put it in the channel now.
		resultC := make(chan StateStatus, 1)

		// Send the current status.
		resultC <- StateStatus{
   
			exitCode: s.ExitCode(),
			err:      s.Err(),
		}

		return resultC
	}

	// If we are waiting only for removal, the waitStop channel should
	// remain nil and block forever.
	var waitStop chan struct{
   }
	if condition < WaitConditionRemoved {
   
		waitStop = s.waitStop
	}

	// Always wait for removal, just in case the container gets removed
	// while it is still in a "created" state, in which case it is never
	// actually stopped.
	waitRemove := s.waitRemove

	resultC := make(chan StateStatus)

	go func() {
   
		select {
   
		case <-ctx.Done():
			// Context timeout or cancellation.
			resultC <- StateStatus{
   
				exitCode: -1,
				err:      ctx.Err(),
			}
			return
		case <-waitStop:
		case <-waitRemove:
		}

		s.Lock()
		result := StateStatus{
   
			exitCode: s.ExitCode(),
			err:      s.Err(),
		}
		s.Unlock()

		resultC <- result
	}()

	return resultC
}

Kill() 死等的对象,要么容器的waitStop信道醒来,要么waitRemove信道醒来。

// SetStopped sets the container state to "stopped" without locking.
func (s *State) SetStopped(exitStatus *ExitStatus) {
   
	s.Running = false
	s.Paused = false
	s.Restarting = false
	s.Pid = 0
	if exitStatus.ExitedAt.IsZero() {
   
		s.FinishedAt = time.Now().UTC()
	} else {
   
		s.FinishedAt = exitStatus.ExitedAt
	}
	s.ExitCodeValue = exitStatus.ExitCode
	s.OOMKilled = exitStatus.OOMKilled
	close(s.waitStop) // fire waiters for stop
	s.waitStop = make(chan struct{
   })
}
...
// SetRestarting sets the container state to "restarting" without locking.
// It also sets the container PID to 0.
func (s *State) SetRestarting(exitStatus *ExitStatus) {
   
	// we should consider the container running when it is restarting because of
	// all the checks in docker around rm/stop/etc
	s.Running = true
	s.Restarting = true
	s.Paused = false
	s.Pid = 0
	s.FinishedAt = time.Now().UTC()
	s.ExitCodeValue = exitStatus.ExitCode
	s.OOMKilled = exitStatus.OOMKilled
	close(s.waitStop) // fire waiters for stop
	s.waitStop = make(chan struct{
   })
}

先看看waitStop,在 SetStoppedSetRestarting 时会重置,可以让 Kill()结束等待,释放那把锁。

  1. 在docker服务重启恢复时,会批量处理一波, 从containerd查询容器的状态,如果containerd反馈容器已死,会执行一次SetStopped()

需要注意的是,如果如果容器活着,但是dockerd未开启 --live-restore, 会执行一次 daemon.kill(), 直接给容器的1号进程发送结束信号。

func (daemon *Daemon) restore() error {
   
    ...
    for _, c := range containers {
   
		group.Add(1)
		go func(c *container.Container) {
   
            ...
            alive, _, process, err = daemon.containerd.Restore(context.Background(), c.ID, c.InitializeStdio)
            ...
            if !alive && process != nil {
   
				ec, exitedAt, err = process.Delete(context.Background())
				if err != nil && !errdefs.IsNotFound(err) {
   
					logrus.WithError(err).Errorf("Failed to delete container %s from containerd", c.ID)
					return
				}
			} else if !daemon.configStore.LiveRestoreEnabled {
   
				if err := daemon.kill(c, c.StopSignal()); err != nil && !errdefs.IsNotFound(err) {
   
					logrus.WithError(err).WithField("container", c.ID).Error("error shutting down container")
					return
				}
			}
            ...
            if !alive {
   
                c.Lock()
                c.SetStopped(&container.ExitStatus{
   ExitCode: int(ec), ExitedAt: exitedAt})
                daemon.Cleanup(c)
                if err := c.CheckpointTo(daemon.containersReplica); err != nil {
   
                    logrus.Errorf("Failed to update stopped container %s state: %v", c.ID, err)
                }
                c.Unlock()
            }
            ...
  1. 在docker的事件处理中,有两个地方调用了 SetStopped

当docker收到退出事件后,拿住一把 容器级别的锁 (container.Lock()), 通知containerd删除对应的task,就等2秒钟,然后继续。

如果断定容器不需要需要重启,会调用一次SetStopped

如果需要重启,但重启失败了,也会调用一次SetStopped,此前已经放掉手里的锁。

// ProcessEvent is called by libcontainerd whenever an event occurs
func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
   
    c, err := daemon.GetContainer(id)
	if err != nil {
   
		return errors.Wrapf(err, "could not find container %s", id)
	}

	switch e {
   
    ...
    case libcontainerdtypes.EventExit:
        if int(ei.Pid) == c.Pid {
   
            c.Lock()
            _, _, err := daemon.containerd.DeleteTask(context.Background(), c.ID)
            ...
            ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
			c.StreamConfig.Wait(ctx)
			cancel()
			c.Reset(false)

			exitStatus := container.ExitStatus{
   
				ExitCode:  int(ei.ExitCode),
				ExitedAt:  ei.ExitedAt,
				OOMKilled: ei.OOMKilled,
			}
			restart, wait, err := c.RestartManager().ShouldRestart(ei.ExitCode, daemon.IsShuttingDown() || c.HasBeenManuallyStopped, time.Since(c.StartedAt))
			if err == nil && restart {
   
				c.RestartCount++
				c.SetRestarting(&exitStatus)
			} else {
   
				if ei.Error != nil {
   
					c.SetError(ei.Error)
				}
				c.SetStopped(&exitStatus)
				defer daemon.autoRemove(c)
			}
			defer c.Unlock() 
            ...
            if err == nil && restart {
   
                go func() {
   
                    err := <-wait
                    if err == nil {
   
                        // daemon.netController is initialized when daemon is restoring containers.
                        // But containerStart will use daemon.netController segment.
                        // So to avoid panic at startup process, here must wait util daemon restore done.
                        daemon.waitForStartupDone()
                        if err = daemon.containerStart(c, "", "", false); err != nil {
   
                            logrus.Debugf("failed to restart container: %+v", err)
                        }
                    }
                    if err != nil {
   
                        c.Lock()
                        c.SetStopped(&exitStatus)
                        daemon.setStateCounter(c)
                        c.CheckpointTo(daemon.containersReplica)
                        c.Unlock()
                        defer daemon.autoRemove(c)
                        if err != restartmanager.ErrRestartCanceled {
   
                            logrus.Errorf("restartmanger wait error: %+v", err)
                        }
                    }
                }()
            }
            ...
func (c *client) processEventStream(ctx context.Context, ns string) {
   
    ...
    // Filter on both namespace *and* topic. To create an "and" filter,
	// this must be a single, comma-separated string
	eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|")
    ...
    for {
   
		var oomKilled bool
		select {
   
        ...
		case ev = <-eventStream:
            ...
            switch t := v.(type) {
   
            ...
			case *apievents.TaskExit:
                et = libcontainerdtypes.EventExit
				ei = libcontainerdtypes.EventInfo{
   
					ContainerID: t.ContainerID,
					ProcessID:   t.ID,
					Pid:         t.Pid,
					ExitCode:    t.ExitStatus,
					ExitedAt:    t.ExitedAt,
				}
            ...
            }
            ...
            c.processEvent(ctx, et, ei)
        }
    }
}
//libcontainerd/remote/client.go
func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
   
	c.eventQ.Append(ei.ContainerID, func() {
   
		err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
		...

		if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID {
   
			p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID)
			...

			ctr, err := c.getContainer(ctx, ei.ContainerID)
			if err != nil {
   
				c.logger.WithFields(logrus.Fields{
   
					"container": ei.ContainerID,
					"error":     err,
				}).Error("failed to find container")
			} else {
   
				labels, err := ctr.Labels(ctx)
				if err != nil {
   
					c.logger.WithFields(logrus.Fields{
   
						"container": ei.ContainerID,
						"error":     err,
					}).Error("failed to get container labels")
					return
				}
				newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
			}
			_, err = p.Delete(context.Background())
			...
		}
	})
}
// plugin/executor/containerd/containerd.go

// deleteTaskAndContainer deletes plugin task and then plugin container from containerd
func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) {
   
	if p != nil {
   
		if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
   
			logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
		}
	} else {
   
		if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) {
   
			logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
		}
	}

	if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) {
   
		logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
	}
}

...
// ProcessEvent handles events from containerd
// All events are ignored except the exit event, which is sent of to the stored handler
func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
   
	switch et {
   
	case libcontainerdtypes.EventExit:
		deleteTaskAndContainer(context.Background(), e.client, id, nil)
		return e.exitHandler.HandleExitEvent(ei.ContainerID)
	}
	return nil
}

dockerd订阅了containerd服务的 /tasks/exit 事件, 那么交接棒就到了 containerd ?

containerd里发送TaskExit事件的地方:

  • containerd-shim 主动发布退出事件
func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string) error {
   
	...
	// Notify Client
	exitedAt := time.Now().UTC()
	r.events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
   
		ContainerID: id,
		ID:          id,
		Pid:         uint32(pid),
		ExitStatus:  128 + uint32(unix.SIGKILL),
		ExitedAt:    exitedAt,
	})

	r.tasks.Delete(ctx, id)
	...
}
  • containerd-shim服务收到SIGCHLD信号后,且为Init进程退出时,发布退出事件
func (s *Service) checkProcesses(e runc.Exit) {
   
	for _, p := range s.allProcesses() {
   
		if p.Pid() != e.Pid {
   
			continue
		}

		if ip, ok := p.(*process.Init); ok {
   
			shouldKillAll, err := shouldKillAllOnExit(s.bundle)
			if err != nil {
   
				log.G(s.context).WithError(err).Error("failed to check shouldKillAll")
			}

			// Ensure all children are killed
			if shouldKillAll {
   
				if err := ip.KillAll(s.context); err != nil {
   
					log.G(s.context).WithError(err).WithField("id", ip.ID()).
						Error("failed to kill init's children")
				}
			}
		}

		p.SetExited(e.Status)
		s.events <- &eventstypes.TaskExit{
   
			ContainerID: s.id,
			ID:          p.ID(),
			Pid:         uint32(e.Pid),
			ExitStatus:  uint32(e.Status),
			ExitedAt:    p.ExitedAt(),
		}
		return
	}
}

调用 cleanupAfterDeadShim() 地方:

  • 创建Task时,设置exitHandler
// Create a new task
func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
   
	namespace, err := namespaces.NamespaceRequired(ctx)
	...
	ropts, err := r.getRuncOptions(ctx, id)
	if err != nil {
   
		return nil, err
	}

	bundle, err := newBundle(id,
		filepath.Join(r.state, namespace),
		filepath.Join(r.root, namespace),
		opts.Spec.Value)
	...
	shimopt := ShimLocal(r.config, r.events)
	if !r.config.NoShim {
   
		...
		exitHandler := func() {
   
			log.G(ctx).WithField("id", id).Info("shim reaped")

			if _, err := r.tasks.Get(ctx, id); err != nil {
   
				// Task was never started or was already successfully deleted
				return
			}

			if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id); err != nil {
   
				log.G(ctx).WithError(err).WithFields(logrus.Fields{
   
					"id":        id,
					"namespace": namespace,
				}).Warn("failed to clean up after killed shim")
			}
		}
		shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler)
	}

	s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts)
	if err != nil {
   
		return nil, err
	}
	defer func() {
   
		if err != nil {
   
			deferCtx, deferCancel := context.WithTimeout(
				namespaces.WithNamespace(context.TODO(), namespace), cleanupTimeout)
			defer deferCancel()
			if kerr := s.KillShim(deferCtx); kerr != nil {
   
				log.G(ctx).WithError(err).Error("failed to kill shim")
			}
		}
	}()

	rt := r.config.Runtime
	if ropts != nil && ropts.Runtime != "" {
   
		rt = ropts.Runtime
	}
	...
	cr, err := s.Create(ctx, sopts)
	...
	t, err := newTask(id, namespace, int(cr.Pid), s, r.events, r.tasks, bundle)
	...
	r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventstypes.TaskCreate{
   
		ContainerID: sopts.ID,
		Bundle:      sopts.Bundle,
		Rootfs:      sopts.Rootfs,
		IO: &eventstypes.TaskIO{
   
			Stdin:    sopts.Stdin,
			Stdout:   sopts.Stdout,
			Stderr:   sopts.Stderr,
			Terminal: sopts.Terminal,
		},
		Checkpoint: sopts.Checkpoint,
		Pid:        uint32(t.pid),
	})

	return t, nil
}
  • containerd恢复时重新加载所有Tasks
func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
   
	dir, err := ioutil.ReadDir(filepath.Join(r.state, ns))
	if err != nil {
   
		return nil, err
	}
	var o []*Task
	for _, path := range dir {
   
		ctx = namespaces.WithNamespace(ctx, ns)
		pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
		shimExit := make(chan struct{
   })
		s, err := bundle.NewShimClient(ctx, ns, ShimConnect(r.config, func() {
   
			defer close(shimExit)
			if _, err := r.tasks.Get(ctx, id); err != nil {
   
				// Task was never started or was already successfully deleted
				return
			}

			if err := r.cleanupAfterDeadShim(ctx, bundle, ns, id); err != nil {
   
				...
			}
		}), nil)
		if err != nil {
   
			log.G(ctx).WithError(err).WithFields(logrus.Fields{
   
				"id":        id,
				"namespace": ns,
			}).Error("connecting to shim")
			err := r.cleanupAfterDeadShim(ctx, bundle, ns, id)
			if err != nil {
   
				log.G(ctx).WithError(err).WithField("bundle", bundle.path).
					Error("cleaning up after dead shim")
			}
			continue
		}

func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
   
	dir, err := ioutil.ReadDir(r.state)
	...
	for _, namespace := range dir {
   
		...
		log.G(ctx).WithField("namespace", name).Debug("loading tasks in namespace")
		tasks, err := r.loadTasks(ctx, name)
		if err != nil {
   
			return nil, err
		}
		o = append(o, tasks...)
	}
	return o, nil
}
// New returns a configured runtime
func New(ic *plugin.InitContext) (interface{
   }, error) {
   
	...
	tasks, err := r.restoreTasks(ic.Context)
	if err != nil {
   
		return nil, err
	}
	...

containerd-shim收到SIGCHLD信号时,生成一个runc.Exit事件,推送所有订阅者,这里订阅者基本就是containerd-shim自己了,

在协程processExit里调用checkProcesses, 然后向containerd推送TaskExit事件。

func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.Server, sv *shim.Service) error {
   
	var (
		termOnce sync.Once
		done     = make(chan struct{
   })
	)

	for {
   
		select {
   
		case <-done:
			return nil
		case s := <-signals:
			switch s {
   
			case unix.SIGCHLD:
				if err := reaper.Reap(); err != nil {
   
					logger.WithError(err).Error("reap exit status")
				}
				...
// Reap should be called when the process receives an SIGCHLD.  Reap will reap
// all exited processes and close their wait channels
func Reap() error {
   
	now := time.Now()
	exits, err := sys.Reap(false)
	for _, e := range exits {
   
		done := Default.notify(runc.Exit{
   
			Timestamp: now,
			Pid:       e.Pid,
			Status:    e.Status,
		})

		select {
   
		case <-done:
		case <-time.After(1 * time.Second):
		}
	}
	return err
}
...
func (m *Monitor) notify(e runc.Exit) chan struct{
   } {
   
	const timeout = 1 * time.Millisecond
	var (
		done    = make(chan struct{
   }, 1)
		timer   = time.NewTimer(timeout)
		success = make(map[chan runc.Exit]struct{
   })
	)
	stop(timer, true)

	go func() {
   
		defer close(done)

		for {
   
			var (
				failed      int
				subscribers = m.getSubscribers()
			)
			for _, s := range subscribers {
   
				s.do(func() {
   
					if s.closed {
   
						return
					}
					if _, ok := success[s.c]; ok {
   
						return
					}
					timer.Reset(timeout)
					recv := true
					select {
   
					case s.c <- e:
						success[s.c] = struct{
   }{
   }
					case <-timer.C:
						recv = false
						failed++
					}
					stop(timer, recv)
				})
			}
			// all subscribers received the message
			if failed == 0 {
   
				return
			}
		}
	}()
	return done
}

相关推荐

  1. docker容器stop流程

    2024-02-06 03:18:01       45 阅读
  2. Docker无法stop或者rm指定容器

    2024-02-06 03:18:01       29 阅读
  3. Docker】常用命令 docker stop

    2024-02-06 03:18:01       44 阅读
  4. 如何使用Docker容器化改善你的开发流程

    2024-02-06 03:18:01       38 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-02-06 03:18:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-02-06 03:18:01       100 阅读
  3. 在Django里面运行非项目文件

    2024-02-06 03:18:01       82 阅读
  4. Python语言-面向对象

    2024-02-06 03:18:01       91 阅读

热门阅读

  1. Linux cp命令(cp指令)解析

    2024-02-06 03:18:01       50 阅读
  2. 每日一题 力扣1696跳跃游戏

    2024-02-06 03:18:01       60 阅读
  3. 【数学1】基础数学问题

    2024-02-06 03:18:01       60 阅读
  4. 【Android】代码混淆简单介绍

    2024-02-06 03:18:01       62 阅读
  5. 异或加密原理及简单应用(C语言版)

    2024-02-06 03:18:01       56 阅读
  6. Docker Compose下载

    2024-02-06 03:18:01       52 阅读
  7. 【lesson12】高并发内存池项目最终完整版代码

    2024-02-06 03:18:01       45 阅读
  8. Simulink仿真中Simulink.ConfigSet用法

    2024-02-06 03:18:01       53 阅读
  9. 流量控制原理

    2024-02-06 03:18:01       60 阅读
  10. Android~集成opencv问题

    2024-02-06 03:18:01       50 阅读
  11. 蓝桥杯刷题day05——2023

    2024-02-06 03:18:01       52 阅读
  12. 【C语言】语句细节理解 超详细 易懂简单

    2024-02-06 03:18:01       54 阅读
  13. Flink-1.18.1环境搭建

    2024-02-06 03:18:01       60 阅读
  14. element-plus 更换主题色

    2024-02-06 03:18:01       56 阅读
  15. C Primer Plus(第六版)15.9 编程练习 第6题

    2024-02-06 03:18:01       49 阅读